Author: chirino
Date: Wed Mar 19 18:59:39 2008
New Revision: 639111
URL: http://svn.apache.org/viewvc?rev=639111&view=rev
Log:
If a connection is close() make sure we error out any outstanding requests it
may be doing so that it can properly shutdown.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java?rev=639111&r1=639110&r2=639111&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
Wed Mar 19 18:59:39 2008
@@ -42,6 +42,7 @@
private final Map<Integer, FutureResponse> requestMap = new
HashMap<Integer, FutureResponse>();
private IntSequenceGenerator sequenceGenerator;
private final boolean debug = LOG.isDebugEnabled();
+ private IOException error;
public ResponseCorrelator(Transport next) {
this(next, new IntSequenceGenerator());
@@ -65,6 +66,9 @@
command.setResponseRequired(true);
FutureResponse future = new FutureResponse(responseCallback);
synchronized (requestMap) {
+ if( this.error !=null ) {
+ throw error;
+ }
requestMap.put(new Integer(command.getCommandId()), future);
}
next.oneway(command);
@@ -106,14 +110,31 @@
* any of current requests. Lets let them know of the problem.
*/
public void onException(IOException error) {
- // Copy and Clear the request Map
- ArrayList<FutureResponse> requests = new
ArrayList<FutureResponse>(requestMap.values());
- requestMap.clear();
- for (Iterator<FutureResponse> iter = requests.iterator();
iter.hasNext();) {
- FutureResponse fr = iter.next();
- fr.set(new ExceptionResponse(error));
- }
+ dispose(error);
super.onException(error);
+ }
+
+ @Override
+ public void stop() throws Exception {
+ dispose(new IOException("Stopped."));
+ super.stop();
+ }
+
+ private void dispose(IOException error) {
+ ArrayList<FutureResponse> requests=null;
+ synchronized(requestMap) {
+ if( this.error==null) {
+ this.error = error;
+ requests = new ArrayList<FutureResponse>(requestMap.values());
+ requestMap.clear();
+ }
+ }
+ if( requests!=null ) {
+ for (Iterator<FutureResponse> iter = requests.iterator();
iter.hasNext();) {
+ FutureResponse fr = iter.next();
+ fr.set(new ExceptionResponse(error));
+ }
+ }
}
public IntSequenceGenerator getSequenceGenerator() {