Author: orudyy
Date: Wed May 11 15:02:00 2016
New Revision: 1743386

URL: http://svn.apache.org/viewvc?rev=1743386&view=rev
Log:
QPID-7253: [Java Client] [0-10] Ensure session creation awaits failover 
completion

          merged from trunk using
          svn merge -c 1742544,1742926 ^/qpid/java/trunk

Modified:
    qpid/java/branches/6.0.x/   (props changed)
    
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
    
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java

Propchange: qpid/java/branches/6.0.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed May 11 15:02:00 2016
@@ -9,5 +9,5 @@
 /qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
 /qpid/branches/java-network-refactor/qpid/java:805429-821809
 /qpid/branches/qpid-2935/qpid/java:1061302-1072333
-/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1718889,1718893,1718918,1718922,1719026,1719028,1719033,1719037,1719047,1719051,1720340,1720664,1721151,1721198,1722019-1722020,1722246,1722339,1722416,1722674,1722678,1722683,1722711,1723064,1723194,1723563,1724216,1724251,1724257,1724292,1724375,1724397,1724432,1724582,1724603,1724780,1724843-1724844,1725295,1725569,1725760,1726176,1726244-1726246,1726249,1726358,1726436,1726449,1726456,1726646,1726653,1726755,1726778,1727532,1727555,1727608,1727951,1727954,1728089,1728167,1728302,1728497,1728501,1728524,1728639,1728651,1728772,1729215,1729297,1729347,1729356,1729406,1729408,1729412,1729515,1729638,1729656-1729
 
657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732184,1732452,1732461,1732525,1732812,1733467,1734452,1736478,1736751,1736838,1737804,1737835,1737853,1737984,1737992,1738119,1738135,1738231,1738271,1738607,1738610,1738731,1738914,1741702,1742257,1742284
+/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1718889,1718893,1718918,1718922,1719026,1719028,1719033,1719037,1719047,1719051,1720340,1720664,1721151,1721198,1722019-1722020,1722246,1722339,1722416,1722674,1722678,1722683,1722711,1723064,1723194,1723563,1724216,1724251,1724257,1724292,1724375,1724397,1724432,1724582,1724603,1724780,1724843-1724844,1725295,1725569,1725760,1726176,1726244-1726246,1726249,1726358,1726436,1726449,1726456,1726646,1726653,1726755,1726778,1727532,1727555,1727608,1727951,1727954,1728089,1728167,1728302,1728497,1728501,1728524,1728639,1728651,1728772,1729215,1729297,1729347,1729356,1729406,1729408,1729412,1729515,1729638,1729656-1729
 
657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732184,1732452,1732461,1732525,1732812,1733467,1734452,1736478,1736751,1736838,1737804,1737835,1737853,1737984,1737992,1738119,1738135,1738231,1738271,1738607,1738610,1738731,1738914,1741702,1742257,1742284,1742544,1742926
 /qpid/trunk/qpid:796646-796653

Modified: 
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1743386&r1=1743385&r2=1743386&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
 (original)
+++ 
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
 Wed May 11 15:02:00 2016
@@ -40,6 +40,7 @@ import org.apache.qpid.QpidException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.client.failover.FailoverRetrySupport;
 import org.apache.qpid.client.transport.ClientConnectionDelegate;
 import org.apache.qpid.client.util.JMSExceptionHelper;
 import org.apache.qpid.common.ServerPropertyNames;
@@ -100,7 +101,7 @@ public class AMQConnectionDelegate_0_10
         return 
createSession(transacted,acknowledgeMode,prefetchHigh,prefetchLow,null);
     }
 
-    public Session createSession(boolean transacted, int acknowledgeMode, int 
prefetchHigh, int prefetchLow, String name)
+    private Session createSession(final boolean transacted, final int 
acknowledgeMode, final int prefetchHigh, final int prefetchLow, final String 
name)
             throws JMSException
     {
         _conn.checkNotClosed();
@@ -110,24 +111,36 @@ public class AMQConnectionDelegate_0_10
             throw new 
ChannelLimitReachedException(_conn.getMaximumChannelCount());
         }
 
-        int channelId = _conn.getNextChannelID();
-        AMQSession session;
-        try
+        return new FailoverRetrySupport<>(new 
FailoverProtectedOperation<Session, JMSException>()
         {
-            session = new AMQSession_0_10(_qpidConnection, _conn, channelId, 
transacted, acknowledgeMode, prefetchHigh,
-                    prefetchLow,name);
-            _conn.registerSession(channelId, session);
-            if (_conn.started())
+            @Override
+            public Session execute() throws JMSException, FailoverException
             {
-                session.start();
+                int channelId = _conn.getNextChannelID();
+                try
+                {
+                    AMQSession session = new AMQSession_0_10(_qpidConnection,
+                                                             _conn,
+                                                             channelId,
+                                                             transacted,
+                                                             acknowledgeMode,
+                                                             prefetchHigh,
+                                                             prefetchLow,
+                                                             name);
+                    _conn.registerSession(channelId, session);
+                    if (_conn.started())
+                    {
+                        session.start();
+                    }
+                    return session;
+                }
+                catch (Exception e)
+                {
+                    _logger.error("exception creating session:", e);
+                    throw JMSExceptionHelper.chainJMSException(new 
JMSException("cannot create session"), e);
+                }
             }
-        }
-        catch (Exception e)
-        {
-            _logger.error("exception creating session:", e);
-            throw JMSExceptionHelper.chainJMSException(new 
JMSException("cannot create session"), e);
-        }
-        return session;
+        }, _conn).execute();
     }
 
     /**
@@ -271,10 +284,10 @@ public class AMQConnectionDelegate_0_10
 
     public void resubscribeSessions() throws JMSException, QpidException, 
FailoverException
     {
-        _logger.info("Resuming connection");
+        _logger.debug("Resuming connection");
         getQpidConnection().resume();
-        List<AMQSession> sessions = new 
ArrayList<AMQSession>(_conn.getSessions().values());
-        _logger.info(String.format("Resubscribing sessions = %s 
sessions.size=%d", sessions, sessions.size()));
+        List<AMQSession> sessions = _conn.getSessions().values();
+        _logger.debug("Resubscribing sessions = {} sessions.size = {}", 
sessions, sessions.size());
         for (AMQSession s : sessions)
         {
             s.resubscribe();
@@ -468,17 +481,22 @@ public class AMQConnectionDelegate_0_10
             }
             catch (InterruptedException e)
             {
-                //ignore
+                Thread.currentThread().interrupt();
+                return null;
             }
         }
 
-        try
+        synchronized (_conn.getFailoverMutex())
         {
-            return operation.execute();
-        }
-        catch (FailoverException e)
-        {
-            throw new RuntimeException(e);
+            try
+            {
+                return operation.execute();
+            }
+            catch (FailoverException e)
+            {
+                // FailoverException never thrown on 0-10 path
+                throw new RuntimeException(e);
+            }
         }
     }
 

Modified: 
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1743386&r1=1743385&r2=1743386&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
 (original)
+++ 
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
 Wed May 11 15:02:00 2016
@@ -23,7 +23,6 @@ package org.apache.qpid.client;
 import java.net.ConnectException;
 import java.nio.ByteBuffer;
 import java.nio.channels.UnresolvedAddressException;
-import java.text.MessageFormat;
 import java.util.*;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -67,7 +66,7 @@ public class AMQConnectionDelegate_8_0 i
 {
     private static final Logger _logger = 
LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class);
 
-    // deprectaed legacy name for the option
+    // deprecated legacy name for the option
     private static final String AMQJ_DEFAULT_SYNCWRITE_TIMEOUT = 
"amqj.default_syncwrite_timeout";
 
     private final AMQConnection _conn;
@@ -363,8 +362,8 @@ public class AMQConnectionDelegate_8_0 i
      */
     public void resubscribeSessions() throws JMSException, QpidException, 
FailoverException
     {
-        ArrayList sessions = new ArrayList(_conn.getSessions().values());
-        _logger.info(MessageFormat.format("Resubscribing sessions = {0} 
sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
+        List<AMQSession> sessions = _conn.getSessions().values();
+        _logger.debug("Resubscribing sessions = {} sessions.size = {}", 
sessions, sessions.size());
         for (Iterator it = sessions.iterator(); it.hasNext();)
         {
             AMQSession_0_8 s = (AMQSession_0_8) it.next();
@@ -405,7 +404,7 @@ public class AMQConnectionDelegate_8_0 i
             catch (InterruptedException e)
             {
                 _logger.debug("Interrupted: " + e, e);
-
+                Thread.currentThread().interrupt();
                 return null;
             }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to