Author: grkvlt
Date: Thu Mar  3 01:56:07 2011
New Revision: 1076492

URL: http://svn.apache.org/viewvc?rev=1076492&view=rev
Log:
QPID-3008: fix 0-10 failover mechanism properly

Modified:
    
qpid/branches/grkvlt-network-20110301/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
    
qpid/branches/grkvlt-network-20110301/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Modified: 
qpid/branches/grkvlt-network-20110301/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20110301/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1076492&r1=1076491&r2=1076492&view=diff
==============================================================================
--- 
qpid/branches/grkvlt-network-20110301/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
 (original)
+++ 
qpid/branches/grkvlt-network-20110301/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
 Thu Mar  3 01:56:07 2011
@@ -31,15 +31,19 @@ import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.XASession;
 
+import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverHandler;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.client.failover.FailoverState;
 import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.ChannelLimitReachedException;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.thread.Threading;
 import org.apache.qpid.transport.Connection;
 import org.apache.qpid.transport.ConnectionClose;
 import org.apache.qpid.transport.ConnectionException;
@@ -201,6 +205,7 @@ public class AMQConnectionDelegate_0_10 
 
     public void failoverPrep()
     {
+        _logger.info("Preparing failover");
         List<AMQSession> sessions = new 
ArrayList<AMQSession>(_conn.getSessions().values());
         for (AMQSession s : sessions)
         {
@@ -210,6 +215,8 @@ public class AMQConnectionDelegate_0_10 
 
     public void resubscribeSessions() throws JMSException, AMQException, 
FailoverException
     {
+        _logger.info("Resuming connection");
+        getQpidConnection().resume();
         List<AMQSession> sessions = new 
ArrayList<AMQSession>(_conn.getSessions().values());
         _logger.info(String.format("Resubscribing sessions = %s 
sessions.size=%d", sessions, sessions.size()));
         for (AMQSession s : sessions)
@@ -255,32 +262,84 @@ public class AMQConnectionDelegate_0_10 
         ConnectionClose close = exc.getClose();
         if (close == null)
         {
-            _conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1));
-            
-            try
+            startFailoverThread();
+        }
+        else
+        {
+            _conn.exceptionReceived(exc);
+        }
+    }
+
+
+    /** See {@link FailoverHandler} to see rationale for separate thread. */
+    private void startFailoverThread()
+    {
+        final Thread failoverThread;
+        try
+        {
+            failoverThread = Threading.getThreadFactory().createThread(new 
Runnable()
             {
-                if (_conn.firePreFailover(false) && 
_conn.attemptReconnection())
+                public void run()
                 {
-                    _conn.failoverPrep();
-                    _conn.resubscribeSessions();
-                    _conn.fireFailoverComplete();
-                    return;
+                    _conn.getProtocolHandler().setFailoverLatch(new 
CountDownLatch(1));
+
+                    synchronized (_conn.getFailoverMutex())
+                    {
+                        try
+                        {
+                            if (!_conn.firePreFailover(false))
+                            {
+                                _logger.info("Failover process veto-ed by 
client");
+                                _conn.exceptionReceived(new 
AMQDisconnectedException("Failover was vetoed by client", null));
+                                return;
+                            }
+
+                            if (_conn.attemptReconnection())
+                            {
+                                _conn.failoverPrep();
+                                if (_conn.firePreResubscribe())
+                                {
+                                    _logger.info("Resubscribing on new 
connection");
+                                    _conn.resubscribeSessions();
+                                }
+                                else
+                                {
+                                    _logger.info("Client vetoed automatic 
resubscription");
+                                }
+
+                                _conn.fireFailoverComplete();
+                                _logger.info("Connection failover completed 
successfully");
+                            }
+                            else
+                            {
+                                _conn.exceptionReceived(new 
AMQDisconnectedException("Server closed connection and no failover was 
successful", null));
+                            }
+                            return;
+                        }
+                        catch (Exception e)
+                        {
+                            _logger.info("Failover process failed - exception 
being propagated by protocol handler", e);
+                            _conn.exceptionReceived(e);
+                        }
+                        finally
+                        {
+                            
_conn.getProtocolHandler().getFailoverLatch().countDown();
+                            _conn.getProtocolHandler().setFailoverLatch(null);
+                        }
+                    }
                 }
-            }
-            catch (Exception e)
-            {
-                _logger.error("error during failover", e);
-            }
-            finally
-            {
-                _conn.getProtocolHandler().getFailoverLatch().countDown();
-                _conn.getProtocolHandler().setFailoverLatch(null);
-            }
+            });
         }
-        else
+        catch (Exception e)
         {
-            _conn.exceptionReceived(exc);
+            throw new RuntimeException("Failed to create thread", e);
         }
+
+        failoverThread.setName("Failover");
+        // Do not inherit daemon-ness from current thread as this can be a 
daemon
+        // thread such as a AnonymousIoService thread.
+        failoverThread.setDaemon(false);
+        failoverThread.start();
     }
 
     public <T, E extends Exception> T 
executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E

Modified: 
qpid/branches/grkvlt-network-20110301/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20110301/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1076492&r1=1076491&r2=1076492&view=diff
==============================================================================
--- 
qpid/branches/grkvlt-network-20110301/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
qpid/branches/grkvlt-network-20110301/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Thu Mar  3 01:56:07 2011
@@ -2930,6 +2930,7 @@ public abstract class AMQSession<C exten
     private void resubscribeConsumers() throws AMQException
     {
         ArrayList<C> consumers = new ArrayList<C>(_consumers.values());
+        _logger.info(String.format("Resubscribing consumers = %s 
consumers.size=%d", consumers, consumers.size()));
         _consumers.clear();
 
         for (C consumer : consumers)
@@ -2943,7 +2944,7 @@ public abstract class AMQSession<C exten
     private void resubscribeProducers() throws AMQException
     {
         ArrayList producers = new ArrayList(_producers.values());
-        _logger.info(MessageFormat.format("Resubscribing producers = {0} 
producers.size={1}", producers, producers.size())); // FIXME: removeKey
+        _logger.info(String.format("Resubscribing producers = %s 
producers.size=%d", producers, producers.size()));
         for (Iterator it = producers.iterator(); it.hasNext();)
         {
             P producer = (P) it.next();



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to