Repository: activemq Updated Branches: refs/heads/activemq-5.12.x c709831d8 -> 3619724d9
https://issues.apache.org/jira/browse/AMQ-5951 - ensure failover oneway won't retry if reconnect will not happen (cherry picked from commit ae9af4b8b29e792db213ca2cc2879ddc7c4118e5) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3619724d Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3619724d Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3619724d Branch: refs/heads/activemq-5.12.x Commit: 3619724d971c64717d304bfb1ebeba47bc904789 Parents: c709831 Author: gtully <[email protected]> Authored: Mon Aug 31 15:55:44 2015 +0100 Committer: Timothy Bish <[email protected]> Committed: Mon Aug 31 13:34:54 2015 -0400 ---------------------------------------------------------------------- .../transport/failover/FailoverTransport.java | 2 +- .../transport/failover/FailoverTimeoutTest.java | 65 +++++++++++++++++++- 2 files changed, 64 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/3619724d/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 0bccac0..4e196b3 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -674,7 +674,7 @@ public class FailoverTransport implements CompositeTransport { // If the command was not tracked.. we will retry in // this method - if (tracked == null) { + if (tracked == null && canReconnect()) { // since we will retry in this method.. take it // out of the request http://git-wip-us.apache.org/repos/asf/activemq/blob/3619724d/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java index 9ef6e28..150d528 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java @@ -20,9 +20,15 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.net.Socket; import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import javax.jms.Connection; +import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; @@ -31,6 +37,7 @@ import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.MessageAck; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -49,7 +56,7 @@ public class FailoverTimeoutTest { public void setUp() throws Exception { bs = new BrokerService(); bs.setUseJmx(false); - bs.addConnector("tcp://localhost:0"); + bs.addConnector(getTransportUri()); bs.start(); tcpUri = bs.getTransportConnectors().get(0).getConnectUri(); } @@ -115,8 +122,62 @@ public class FailoverTimeoutTest { bs.waitUntilStarted(); producer.send(message); - bs.stop(); + connection.close(); + } + + @Test + public void testInterleaveSendAndException() throws Exception { + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")?maxReconnectAttempts=0"); + final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); + connection.start(); + + connection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException exception) { + try { + LOG.info("Deal with exception - invoke op that may block pending outstanding oneway"); + // try and invoke on connection as part of handling exception + connection.asyncSendPacket(new MessageAck()); + } catch (Exception e) { + } + } + }); + + final ExecutorService executorService = Executors.newCachedThreadPool(); + + final int NUM_TASKS = 200; + final CountDownLatch enqueueOnExecutorDone = new CountDownLatch(NUM_TASKS); + + for (int i=0; i < NUM_TASKS; i++) { + + executorService.submit(new Runnable() { + @Override + public void run() { + try { + connection.asyncSendPacket(new MessageAck()); + } catch (JMSException e) { + e.printStackTrace(); + } finally { + enqueueOnExecutorDone.countDown(); + } + + } + }); + } + + while (enqueueOnExecutorDone.getCount() > (NUM_TASKS - 20)) { + enqueueOnExecutorDone.await(20, TimeUnit.MILLISECONDS); + } + + // force IOException + final Socket socket = connection.getTransport().narrow(Socket.class); + socket.close(); + + executorService.shutdown(); + + assertTrue("all ops finish", enqueueOnExecutorDone.await(15, TimeUnit.SECONDS)); } @Test
