Author: jstrachan
Date: Fri Mar 17 03:14:11 2006
New Revision: 386608
URL: http://svn.apache.org/viewcvs?rev=386608&view=rev
Log:
patch for AMQ-600 to catch IOException caused by attempts to dispatch
synchronously to a connection on a dead socket and treat them as a transport
exception (rather than service exception), disposing the connection so that
clientID's can be reused
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=386608&r1=386607&r2=386608&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Fri Mar 17 03:14:11 2006
@@ -72,6 +72,7 @@
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.JMSExceptionSupport;
@@ -85,7 +86,7 @@
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
-public class ActiveMQConnection extends DefaultTransportListener implements
Connection, TopicConnection, QueueConnection, StatsCapable, Closeable,
StreamConnection {
+public class ActiveMQConnection implements Connection, TopicConnection,
QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener {
public static final TaskRunnerFactory SESSION_TASK_RUNNER = new
TaskRunnerFactory("session
Task",ThreadPriorities.INBOUND_CLIENT_SESSION,true,1000);
@@ -130,6 +131,7 @@
private final CopyOnWriteArrayList connectionConsumers = new
CopyOnWriteArrayList();
private final CopyOnWriteArrayList inputStreams = new
CopyOnWriteArrayList();
private final CopyOnWriteArrayList outputStreams = new
CopyOnWriteArrayList();
+ private final CopyOnWriteArrayList transportListeners = new
CopyOnWriteArrayList();
// Maps ConsumerIds to ActiveMQConsumer objects
private final ConcurrentHashMap dispatchers = new ConcurrentHashMap();
@@ -147,7 +149,6 @@
private IOException firstFailureError;
-
/**
* Construct an <code>ActiveMQConnection</code>
* @param transport
@@ -790,6 +791,17 @@
this.useRetroactiveConsumer = useRetroactiveConsumer;
}
+ /**
+ * Adds a transport listener so that a client can be notified of events in
the underlying
+ * transport
+ */
+ public void addTransportListener(TransportListener transportListener) {
+ transportListeners.add(transportListener);
+ }
+
+ public void removeTransportListener(TransportListener transportListener) {
+ transportListeners.remove(transportListener);
+ }
// Implementation methods
//
-------------------------------------------------------------------------
@@ -1175,7 +1187,7 @@
*/
protected void ensureConnectionInfoSent() throws JMSException {
// Can we skip sending the ConnectionInfo packet??
- if (isConnectionInfoSentToBroker) {
+ if (isConnectionInfoSentToBroker || closed.get()) {
return;
}
@@ -1241,7 +1253,7 @@
}
if(isConnectionInfoSentToBroker){
- if(!transportFailed.get()){
+ if(!transportFailed.get() && !closing.get()){
asyncSendPacket(info.createRemoveCommand());
}
isConnectionInfoSentToBroker=false;
@@ -1368,6 +1380,10 @@
onAsyncException(((ConnectionError)command).getException());
}
}
+ for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
+ TransportListener listener = (TransportListener) iter.next();
+ listener.onCommand(command);
+ }
}
/**
@@ -1386,14 +1402,33 @@
}
}
}
-
public void onException(IOException error) {
onAsyncException(error);
transportFailed(error);
ServiceSupport.dispose(this.transport);
brokerInfoReceived.countDown();
+
+ for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
+ TransportListener listener = (TransportListener) iter.next();
+ listener.onException(error);
+ }
+ }
+
+ public void transportInterupted() {
+ for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
+ TransportListener listener = (TransportListener) iter.next();
+ listener.transportInterupted();
+ }
}
+
+ public void transportResumed() {
+ for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
+ TransportListener listener = (TransportListener) iter.next();
+ listener.transportResumed();
+ }
+ }
+
/**
* Create the DestinationInfo object for the temporary destination.