Whoops; wrong commit message - got my patches to AbstractConnection and ActiveMQConnection mixed up :)
This patch was actually 2 different things. * a fix for AMQ-623 to ensure that we don't try to send further commands after an EOFException has been caught * implement AMQ-642 to allow folks to add a TransportListener to an ActiveMQConnection to listen to the connection start/stop suspend/resume etc On 3/17/06, [EMAIL PROTECTED] <[EMAIL PROTECTED]> wrote: > > Author: jstrachan > Date: Fri Mar 17 03:14:11 2006 > New Revision: 386608 > > URL: http://svn.apache.org/viewcvs?rev=386608&view=rev > Log: > patch for AMQ-600 to catch IOException caused by attempts to dispatch > synchronously to a connection on a dead socket and treat them as a transport > exception (rather than service exception), disposing the connection so that > clientID's can be reused > > Modified: > > > incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java > > Modified: > incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java > URL: > http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=386608&r1=386607&r2=386608&view=diff > > ============================================================================== > --- > incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java > (original) > +++ > incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java > Fri Mar 17 03:14:11 2006 > @@ -72,6 +72,7 @@ > import org.apache.activemq.thread.TaskRunnerFactory; > import org.apache.activemq.transport.DefaultTransportListener; > import org.apache.activemq.transport.Transport; > +import org.apache.activemq.transport.TransportListener; > import org.apache.activemq.util.IdGenerator; > import org.apache.activemq.util.IntrospectionSupport; > import org.apache.activemq.util.JMSExceptionSupport; > @@ -85,7 +86,7 @@ > import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean > ; > > > -public class ActiveMQConnection extends DefaultTransportListener > implements Connection, TopicConnection, QueueConnection, StatsCapable, > Closeable, StreamConnection { > +public class ActiveMQConnection implements Connection, TopicConnection, > QueueConnection, StatsCapable, Closeable, StreamConnection, > TransportListener { > > public static final TaskRunnerFactory SESSION_TASK_RUNNER = new > TaskRunnerFactory("session Task",ThreadPriorities.INBOUND_CLIENT_SESSION > ,true,1000); > > @@ -130,6 +131,7 @@ > private final CopyOnWriteArrayList connectionConsumers = new > CopyOnWriteArrayList(); > private final CopyOnWriteArrayList inputStreams = new > CopyOnWriteArrayList(); > private final CopyOnWriteArrayList outputStreams = new > CopyOnWriteArrayList(); > + private final CopyOnWriteArrayList transportListeners = new > CopyOnWriteArrayList(); > > // Maps ConsumerIds to ActiveMQConsumer objects > private final ConcurrentHashMap dispatchers = new > ConcurrentHashMap(); > @@ -147,7 +149,6 @@ > private IOException firstFailureError; > > > - > /** > * Construct an <code>ActiveMQConnection</code> > * @param transport > @@ -790,6 +791,17 @@ > this.useRetroactiveConsumer = useRetroactiveConsumer; > } > > + /** > + * Adds a transport listener so that a client can be notified of > events in the underlying > + * transport > + */ > + public void addTransportListener(TransportListener transportListener) > { > + transportListeners.add(transportListener); > + } > + > + public void removeTransportListener(TransportListener > transportListener) { > + transportListeners.remove(transportListener); > + } > > // Implementation methods > // > ------------------------------------------------------------------------- > @@ -1175,7 +1187,7 @@ > */ > protected void ensureConnectionInfoSent() throws JMSException { > // Can we skip sending the ConnectionInfo packet?? > - if (isConnectionInfoSentToBroker) { > + if (isConnectionInfoSentToBroker || closed.get()) { > return; > } > > @@ -1241,7 +1253,7 @@ > } > > if(isConnectionInfoSentToBroker){ > - if(!transportFailed.get()){ > + if(!transportFailed.get() && !closing.get()){ > asyncSendPacket(info.createRemoveCommand()); > } > isConnectionInfoSentToBroker=false; > @@ -1368,6 +1380,10 @@ > > onAsyncException(((ConnectionError)command).getException()); > } > } > + for (Iterator iter = transportListeners.iterator(); iter.hasNext();) > { > + TransportListener listener = (TransportListener) iter.next(); > + listener.onCommand(command); > + } > } > > /** > @@ -1386,14 +1402,33 @@ > } > } > } > - > > public void onException(IOException error) { > onAsyncException(error); > transportFailed(error); > ServiceSupport.dispose(this.transport); > brokerInfoReceived.countDown(); > + > + for (Iterator iter = transportListeners.iterator(); iter.hasNext();) > { > + TransportListener listener = (TransportListener) iter.next(); > + listener.onException(error); > + } > + } > + > + public void transportInterupted() { > + for (Iterator iter = transportListeners.iterator(); iter.hasNext();) > { > + TransportListener listener = (TransportListener) iter.next(); > + listener.transportInterupted(); > + } > } > + > + public void transportResumed() { > + for (Iterator iter = transportListeners.iterator(); iter.hasNext();) > { > + TransportListener listener = (TransportListener) iter.next(); > + listener.transportResumed(); > + } > + } > + > > /** > * Create the DestinationInfo object for the temporary destination. > > > -- James ------- http://radio.weblogs.com/0112098/
