Author: chirino
Date: Mon Mar 24 16:19:50 2008
New Revision: 640641
URL: http://svn.apache.org/viewvc?rev=640641&view=rev
Log:
Better failover error handling and now we pass on the max initial inactivity
timeout to the timeout used by the intial wire format negociation.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java?rev=640641&r1=640640&r2=640641&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
Mon Mar 24 16:19:50 2008
@@ -57,6 +57,13 @@
minimumVersion = 1;
}
this.minimumVersion = minimumVersion;
+
+ // Setup the initial negociation timeout to be the same as the inital
max inactivity delay specified on the wireformat
+ // Does not make sense for us to take longer.
+ try {
+
setNegotiateTimeout(wireFormat.getPreferedWireFormatInfo().getMaxInactivityDurationInitalDelay());
+ } catch (IOException e) {
+ }
}
public void start() throws Exception {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=640641&r1=640640&r2=640641&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Mon Mar 24 16:19:50 2008
@@ -94,7 +94,7 @@
private int backupPoolSize=1;
private boolean trackMessages = false;
private int maxCacheSize = 128 * 1024;
- private TransportListener disposedListener = new
DefaultTransportListener();
+ private TransportListener disposedListener = new
DefaultTransportListener() {};
private final TransportListener myTransportListener =
createTransportListener();
@@ -189,42 +189,33 @@
public final void handleTransportFailure(IOException e) throws
InterruptedException {
- Transport transport = connectedTransport.get();
+ Transport transport = connectedTransport.getAndSet(null);
if( transport!=null ) {
+
+ transport.setTransportListener(disposedListener);
ServiceSupport.dispose(transport);
- }
-
- boolean wasConnected=false;
- synchronized (reconnectMutex) {
- boolean reconnectOk = false;
- if(started) {
- LOG.warn("Transport failed, attempting to automatically
reconnect due to: " + e);
- LOG.debug("Transport failed with the following exception:", e);
- reconnectOk = true;
- }
- if (connectedTransport.get() != null) {
- wasConnected=true;
+ synchronized (reconnectMutex) {
+ boolean reconnectOk = false;
+ if(started) {
+ LOG.warn("Transport failed, attempting to automatically
reconnect due to: " + e);
+ LOG.debug("Transport failed with the following
exception:", e);
+ reconnectOk = true;
+ }
+
initialized = false;
failedConnectTransportURI=connectedTransportURI;
- Transport old = connectedTransport.get();
- if(old != null) {
- //don't want errors from old transport
- old.setTransportListener(disposedListener);
- }
- connectedTransport.set(null);
connectedTransportURI = null;
connected=false;
+
+ if(reconnectOk) {
+ reconnectTask.wakeup();
+ }
+ }
+
+ if (transportListener != null) {
+ transportListener.transportInterupted();
}
-
- if(reconnectOk) {
- reconnectTask.wakeup();
- }
- }
-
- // Avoid double firing a transportInterupted() event due to an extra
IOException
- if (transportListener != null && wasConnected) {
- transportListener.transportInterupted();
}
}