Author: orudyy
Date: Wed May 11 15:02:00 2016
New Revision: 1743386
URL: http://svn.apache.org/viewvc?rev=1743386&view=rev
Log:
QPID-7253: [Java Client] [0-10] Ensure session creation awaits failover
completion
merged from trunk using
svn merge -c 1742544,1742926 ^/qpid/java/trunk
Modified:
qpid/java/branches/6.0.x/ (props changed)
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
Propchange: qpid/java/branches/6.0.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed May 11 15:02:00 2016
@@ -9,5 +9,5 @@
/qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
/qpid/branches/java-network-refactor/qpid/java:805429-821809
/qpid/branches/qpid-2935/qpid/java:1061302-1072333
-/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1718889,1718893,1718918,1718922,1719026,1719028,1719033,1719037,1719047,1719051,1720340,1720664,1721151,1721198,1722019-1722020,1722246,1722339,1722416,1722674,1722678,1722683,1722711,1723064,1723194,1723563,1724216,1724251,1724257,1724292,1724375,1724397,1724432,1724582,1724603,1724780,1724843-1724844,1725295,1725569,1725760,1726176,1726244-1726246,1726249,1726358,1726436,1726449,1726456,1726646,1726653,1726755,1726778,1727532,1727555,1727608,1727951,1727954,1728089,1728167,1728302,1728497,1728501,1728524,1728639,1728651,1728772,1729215,1729297,1729347,1729356,1729406,1729408,1729412,1729515,1729638,1729656-1729
657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732184,1732452,1732461,1732525,1732812,1733467,1734452,1736478,1736751,1736838,1737804,1737835,1737853,1737984,1737992,1738119,1738135,1738231,1738271,1738607,1738610,1738731,1738914,1741702,1742257,1742284
+/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1718889,1718893,1718918,1718922,1719026,1719028,1719033,1719037,1719047,1719051,1720340,1720664,1721151,1721198,1722019-1722020,1722246,1722339,1722416,1722674,1722678,1722683,1722711,1723064,1723194,1723563,1724216,1724251,1724257,1724292,1724375,1724397,1724432,1724582,1724603,1724780,1724843-1724844,1725295,1725569,1725760,1726176,1726244-1726246,1726249,1726358,1726436,1726449,1726456,1726646,1726653,1726755,1726778,1727532,1727555,1727608,1727951,1727954,1728089,1728167,1728302,1728497,1728501,1728524,1728639,1728651,1728772,1729215,1729297,1729347,1729356,1729406,1729408,1729412,1729515,1729638,1729656-1729
657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732184,1732452,1732461,1732525,1732812,1733467,1734452,1736478,1736751,1736838,1737804,1737835,1737853,1737984,1737992,1738119,1738135,1738231,1738271,1738607,1738610,1738731,1738914,1741702,1742257,1742284,1742544,1742926
/qpid/trunk/qpid:796646-796653
Modified:
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1743386&r1=1743385&r2=1743386&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
(original)
+++
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
Wed May 11 15:02:00 2016
@@ -40,6 +40,7 @@ import org.apache.qpid.QpidException;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.transport.ClientConnectionDelegate;
import org.apache.qpid.client.util.JMSExceptionHelper;
import org.apache.qpid.common.ServerPropertyNames;
@@ -100,7 +101,7 @@ public class AMQConnectionDelegate_0_10
return
createSession(transacted,acknowledgeMode,prefetchHigh,prefetchLow,null);
}
- public Session createSession(boolean transacted, int acknowledgeMode, int
prefetchHigh, int prefetchLow, String name)
+ private Session createSession(final boolean transacted, final int
acknowledgeMode, final int prefetchHigh, final int prefetchLow, final String
name)
throws JMSException
{
_conn.checkNotClosed();
@@ -110,24 +111,36 @@ public class AMQConnectionDelegate_0_10
throw new
ChannelLimitReachedException(_conn.getMaximumChannelCount());
}
- int channelId = _conn.getNextChannelID();
- AMQSession session;
- try
+ return new FailoverRetrySupport<>(new
FailoverProtectedOperation<Session, JMSException>()
{
- session = new AMQSession_0_10(_qpidConnection, _conn, channelId,
transacted, acknowledgeMode, prefetchHigh,
- prefetchLow,name);
- _conn.registerSession(channelId, session);
- if (_conn.started())
+ @Override
+ public Session execute() throws JMSException, FailoverException
{
- session.start();
+ int channelId = _conn.getNextChannelID();
+ try
+ {
+ AMQSession session = new AMQSession_0_10(_qpidConnection,
+ _conn,
+ channelId,
+ transacted,
+ acknowledgeMode,
+ prefetchHigh,
+ prefetchLow,
+ name);
+ _conn.registerSession(channelId, session);
+ if (_conn.started())
+ {
+ session.start();
+ }
+ return session;
+ }
+ catch (Exception e)
+ {
+ _logger.error("exception creating session:", e);
+ throw JMSExceptionHelper.chainJMSException(new
JMSException("cannot create session"), e);
+ }
}
- }
- catch (Exception e)
- {
- _logger.error("exception creating session:", e);
- throw JMSExceptionHelper.chainJMSException(new
JMSException("cannot create session"), e);
- }
- return session;
+ }, _conn).execute();
}
/**
@@ -271,10 +284,10 @@ public class AMQConnectionDelegate_0_10
public void resubscribeSessions() throws JMSException, QpidException,
FailoverException
{
- _logger.info("Resuming connection");
+ _logger.debug("Resuming connection");
getQpidConnection().resume();
- List<AMQSession> sessions = new
ArrayList<AMQSession>(_conn.getSessions().values());
- _logger.info(String.format("Resubscribing sessions = %s
sessions.size=%d", sessions, sessions.size()));
+ List<AMQSession> sessions = _conn.getSessions().values();
+ _logger.debug("Resubscribing sessions = {} sessions.size = {}",
sessions, sessions.size());
for (AMQSession s : sessions)
{
s.resubscribe();
@@ -468,17 +481,22 @@ public class AMQConnectionDelegate_0_10
}
catch (InterruptedException e)
{
- //ignore
+ Thread.currentThread().interrupt();
+ return null;
}
}
- try
+ synchronized (_conn.getFailoverMutex())
{
- return operation.execute();
- }
- catch (FailoverException e)
- {
- throw new RuntimeException(e);
+ try
+ {
+ return operation.execute();
+ }
+ catch (FailoverException e)
+ {
+ // FailoverException never thrown on 0-10 path
+ throw new RuntimeException(e);
+ }
}
}
Modified:
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1743386&r1=1743385&r2=1743386&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
(original)
+++
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
Wed May 11 15:02:00 2016
@@ -23,7 +23,6 @@ package org.apache.qpid.client;
import java.net.ConnectException;
import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException;
-import java.text.MessageFormat;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -67,7 +66,7 @@ public class AMQConnectionDelegate_8_0 i
{
private static final Logger _logger =
LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class);
- // deprectaed legacy name for the option
+ // deprecated legacy name for the option
private static final String AMQJ_DEFAULT_SYNCWRITE_TIMEOUT =
"amqj.default_syncwrite_timeout";
private final AMQConnection _conn;
@@ -363,8 +362,8 @@ public class AMQConnectionDelegate_8_0 i
*/
public void resubscribeSessions() throws JMSException, QpidException,
FailoverException
{
- ArrayList sessions = new ArrayList(_conn.getSessions().values());
- _logger.info(MessageFormat.format("Resubscribing sessions = {0}
sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
+ List<AMQSession> sessions = _conn.getSessions().values();
+ _logger.debug("Resubscribing sessions = {} sessions.size = {}",
sessions, sessions.size());
for (Iterator it = sessions.iterator(); it.hasNext();)
{
AMQSession_0_8 s = (AMQSession_0_8) it.next();
@@ -405,7 +404,7 @@ public class AMQConnectionDelegate_8_0 i
catch (InterruptedException e)
{
_logger.debug("Interrupted: " + e, e);
-
+ Thread.currentThread().interrupt();
return null;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]