This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 205a139  ARTEMIS-2440 Call timeout should retry the connection 
asynchronously
     new a098685  This closes #2786
205a139 is described below

commit 205a1399e7777ec6abc9f0a8f2cfe8b337a861f0
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Aug 5 10:36:21 2019 -0400

    ARTEMIS-2440 Call timeout should retry the connection asynchronously
---
 .../core/client/impl/ServerLocatorImpl.java        |  5 +-
 .../core/impl/ActiveMQClientProtocolManager.java   | 11 +++-
 .../protocol/core/impl/RemotingConnectionImpl.java | 17 +++---
 .../spi/core/remoting/ClientProtocolManager.java   |  3 +
 .../amqp/broker/AMQPConnectionCallback.java        |  8 ---
 .../broker/ActiveMQProtonRemotingConnection.java   |  8 +--
 .../amqp/client/ProtonClientProtocolManager.java   |  6 ++
 .../core/protocol/openwire/OpenWireConnection.java |  4 +-
 .../protocol/openwire/OpenWireProtocolManager.java |  2 +-
 .../artemis/core/config/Configuration.java         |  3 +
 .../protocol/core/impl/CoreProtocolManager.java    |  2 +-
 .../wireformat/ReplicationSyncFileMessageTest.java |  4 +-
 ...ListenerForConnectionTimedOutExceptionTest.java | 68 ++++++++++++++++++++++
 13 files changed, 111 insertions(+), 30 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index 8ac7c9e..a05a8af 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -587,7 +587,10 @@ public final class ServerLocatorImpl implements 
ServerLocatorInternal, Discovery
 
    @Override
    public ClientProtocolManager newProtocolManager() {
-      return getProtocolManagerFactory().newProtocolManager();
+      if (threadPool == null) {
+         throw new NullPointerException("No Thread Pool");
+      }
+      return getProtocolManagerFactory().newProtocolManager().setExecutor(new 
OrderedExecutor(threadPool));
    }
 
    @Override
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
index 9eb0ee5..b1d6cc8 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl;
 
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 
@@ -82,6 +83,8 @@ public class ActiveMQClientProtocolManager implements 
ClientProtocolManager {
 
    private ClientSessionFactoryInternal factoryInternal;
 
+   private Executor executor;
+
    /**
     * Guards assignments to {@link #inCreateSession} and {@link 
#inCreateSessionLatch}
     */
@@ -158,6 +161,12 @@ public class ActiveMQClientProtocolManager implements 
ClientProtocolManager {
    }
 
    @Override
+   public ActiveMQClientProtocolManager setExecutor(Executor executor) {
+      this.executor = executor;
+      return this;
+   }
+
+   @Override
    public Lock lockSessionCreation() {
       try {
          Lock localFailoverLock = factoryInternal.lockFailover();
@@ -412,7 +421,7 @@ public class ActiveMQClientProtocolManager implements 
ClientProtocolManager {
                                      List<Interceptor> incomingInterceptors,
                                      List<Interceptor> outgoingInterceptors,
                                      TopologyResponseHandler 
topologyResponseHandler) {
-      this.connection = new RemotingConnectionImpl(createPacketDecoder(), 
transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, 
outgoingInterceptors);
+      this.connection = new RemotingConnectionImpl(createPacketDecoder(), 
transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, 
outgoingInterceptors, executor);
 
       this.topologyResponseHandler = topologyResponseHandler;
 
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
index 065277a..418e3f1 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
@@ -92,8 +92,9 @@ public class RemotingConnectionImpl extends 
AbstractRemotingConnection implement
                                  final long blockingCallTimeout,
                                  final long blockingCallFailoverTimeout,
                                  final List<Interceptor> incomingInterceptors,
-                                 final List<Interceptor> outgoingInterceptors) 
{
-      this(packetDecoder, transportConnection, blockingCallTimeout, 
blockingCallFailoverTimeout, incomingInterceptors, outgoingInterceptors, true, 
null, null);
+                                 final List<Interceptor> outgoingInterceptors,
+                                 final Executor connectionExecutor) {
+      this(packetDecoder, transportConnection, blockingCallTimeout, 
blockingCallFailoverTimeout, incomingInterceptors, outgoingInterceptors, true, 
null, connectionExecutor);
    }
 
    /*
@@ -103,9 +104,9 @@ public class RemotingConnectionImpl extends 
AbstractRemotingConnection implement
                           final Connection transportConnection,
                           final List<Interceptor> incomingInterceptors,
                           final List<Interceptor> outgoingInterceptors,
-                          final Executor executor,
-                          final SimpleString nodeID) {
-      this(packetDecoder, transportConnection, -1, -1, incomingInterceptors, 
outgoingInterceptors, false, executor, nodeID);
+                          final SimpleString nodeID,
+                          final Executor connectionExecutor) {
+      this(packetDecoder, transportConnection, -1, -1, incomingInterceptors, 
outgoingInterceptors, false, nodeID, connectionExecutor);
    }
 
    private RemotingConnectionImpl(final PacketDecoder packetDecoder,
@@ -115,9 +116,9 @@ public class RemotingConnectionImpl extends 
AbstractRemotingConnection implement
                                   final List<Interceptor> incomingInterceptors,
                                   final List<Interceptor> outgoingInterceptors,
                                   final boolean client,
-                                  final Executor executor,
-                                  final SimpleString nodeID) {
-      super(transportConnection, executor);
+                                  final SimpleString nodeID,
+                                  final Executor connectionExecutor) {
+      super(transportConnection, connectionExecutor);
 
       this.packetDecoder = packetDecoder;
 
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java
index e2c9fc1..37e699e 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.spi.core.remoting;
 
 import java.util.List;
+import java.util.concurrent.Executor;
 import java.util.concurrent.locks.Lock;
 
 import io.netty.channel.ChannelPipeline;
@@ -27,6 +28,8 @@ import 
org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 
 public interface ClientProtocolManager {
 
+   ClientProtocolManager setExecutor(Executor executor);
+
    /// Life Cycle Methods:
 
    RemotingConnection connect(Connection transportConnection,
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
index d34ce80..1667945 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
@@ -182,14 +182,6 @@ public class AMQPConnectionCallback implements 
FailureListener, CloseListener {
       }
    }
 
-   public Executor getExeuctor() {
-      if (protonConnectionDelegate != null) {
-         return protonConnectionDelegate.getExecutor();
-      } else {
-         return null;
-      }
-   }
-
    public void setConnection(AMQPConnectionContext connection) {
       this.amqpConnection = connection;
    }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
index 5692079..971702e 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
@@ -46,17 +46,13 @@ public class ActiveMQProtonRemotingConnection extends 
AbstractRemotingConnection
    public ActiveMQProtonRemotingConnection(ProtonProtocolManager manager,
                                            AMQPConnectionContext 
amqpConnection,
                                            Connection transportConnection,
-                                           Executor executor) {
-      super(transportConnection, executor);
+                                           Executor connectionExecutor) {
+      super(transportConnection, connectionExecutor);
       this.manager = manager;
       this.amqpConnection = amqpConnection;
       transportConnection.setProtocolConnection(this);
    }
 
-   public Executor getExecutor() {
-      return this.executor;
-   }
-
    public ProtonProtocolManager getManager() {
       return manager;
    }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientProtocolManager.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientProtocolManager.java
index 54b8c67..43ae226 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientProtocolManager.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientProtocolManager.java
@@ -31,6 +31,7 @@ import 
org.apache.activemq.artemis.spi.core.remoting.TopologyResponseHandler;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Executor;
 import java.util.concurrent.locks.Lock;
 
 /**
@@ -45,6 +46,11 @@ public class ProtonClientProtocolManager extends 
ProtonProtocolManager implement
    }
 
    @Override
+   public ClientProtocolManager setExecutor(Executor executor) {
+      return null;
+   }
+
+   @Override
    public void stop() {
       throw new UnsupportedOperationException();
    }
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 0b94ab2..6c846ba 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -198,9 +198,9 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
 
    public OpenWireConnection(Connection connection,
                              ActiveMQServer server,
-                             Executor executor,
                              OpenWireProtocolManager openWireProtocolManager,
-                             OpenWireFormat wf) {
+                             OpenWireFormat wf,
+                             Executor executor) {
       super(connection, executor);
       this.server = server;
       this.operationContext = server.newOperationContext();
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 505564d..44cc8da 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -232,7 +232,7 @@ public class OpenWireProtocolManager implements 
ProtocolManager<Interceptor>, Cl
    @Override
    public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, 
Connection connection) {
       OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat();
-      OpenWireConnection owConn = new OpenWireConnection(connection, server, 
server.getExecutorFactory().getExecutor(), this, wf);
+      OpenWireConnection owConn = new OpenWireConnection(connection, server, 
this, wf, server.getExecutorFactory().getExecutor());
       owConn.sendHandshake();
 
       //first we setup ttl to -1
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index 40fb4f3..c3fb985 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -328,15 +328,18 @@ public interface Configuration {
    Configuration setAmqpUseCoreSubscriptionNaming(boolean 
amqpUseCoreSubscriptionNaming);
 
    /**
+    * deprecated: we decide based on the semantic context when to make things 
async or not
     * Returns whether code coming from connection is executed asynchronously 
or not. <br>
     * Default value is
     * {@link 
org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_ASYNC_CONNECTION_EXECUTION_ENABLED}.
     */
+   @Deprecated
    boolean isAsyncConnectionExecutionEnabled();
 
    /**
     * Sets whether code coming from connection is executed asynchronously or 
not.
     */
+   @Deprecated
    Configuration setEnabledAsyncConnectionExecution(boolean enabled);
 
    /**
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
index 7a416d9..5862003 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
@@ -116,7 +116,7 @@ public class CoreProtocolManager implements 
ProtocolManager<Interceptor> {
 
       Executor connectionExecutor = server.getExecutorFactory().getExecutor();
 
-      final CoreRemotingConnection rc = new RemotingConnectionImpl(new 
ServerPacketDecoder(), connection, incomingInterceptors, outgoingInterceptors, 
config.isAsyncConnectionExecutionEnabled() ? connectionExecutor : null, 
server.getNodeID());
+      final CoreRemotingConnection rc = new RemotingConnectionImpl(new 
ServerPacketDecoder(), connection, incomingInterceptors, outgoingInterceptors, 
server.getNodeID(), connectionExecutor);
 
       Channel channel1 = rc.getChannel(CHANNEL_ID.SESSION.id, -1);
 
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java
index 812de2c..f01e5e6 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java
@@ -49,7 +49,7 @@ public class ReplicationSyncFileMessageTest extends 
ActiveMQTestBase {
       FileChannel fileChannel = raf.getChannel();
       ReplicationSyncFileMessage replicationSyncFileMessage = new 
ReplicationSyncFileMessage(MESSAGES,
                                                                                
              null, 10, raf, fileChannel, 0, dataSize);
-      RemotingConnectionImpl remotingConnection = new 
RemotingConnectionImpl(null, conn, 10, 10, null, null);
+      RemotingConnectionImpl remotingConnection = new 
RemotingConnectionImpl(null, conn, 10, 10, null, null, null);
       ActiveMQBuffer buffer = 
replicationSyncFileMessage.encode(remotingConnection);
       Assert.assertEquals(buffer.getInt(0), 
replicationSyncFileMessage.expectedEncodeSize() - DataConstants.SIZE_INT);
       Assert.assertEquals(buffer.capacity(), 
replicationSyncFileMessage.expectedEncodeSize() - dataSize);
@@ -69,7 +69,7 @@ public class ReplicationSyncFileMessageTest extends 
ActiveMQTestBase {
       FileChannel fileChannel = raf.getChannel();
       ReplicationSyncFileMessage replicationSyncFileMessage = new 
ReplicationSyncFileMessage(MESSAGES,
                                                                                
              null, fileId, raf, fileChannel, 0, 0);
-      RemotingConnectionImpl remotingConnection = new 
RemotingConnectionImpl(null, conn, 10, 10, null, null);
+      RemotingConnectionImpl remotingConnection = new 
RemotingConnectionImpl(null, conn, 10, 10, null, null, null);
       ActiveMQBuffer buffer = 
replicationSyncFileMessage.encode(remotingConnection);
       Assert.assertEquals(buffer.readInt(), 
replicationSyncFileMessage.expectedEncodeSize() - DataConstants.SIZE_INT);
       Assert.assertEquals(buffer.capacity(), 
replicationSyncFileMessage.expectedEncodeSize());
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ExceptionListenerForConnectionTimedOutExceptionTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ExceptionListenerForConnectionTimedOutExceptionTest.java
index 6579e0b..3206c2c 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ExceptionListenerForConnectionTimedOutExceptionTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ExceptionListenerForConnectionTimedOutExceptionTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.tests.integration.jms.connection;
 
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.jms.Connection;
 import javax.jms.JMSException;
@@ -50,6 +51,18 @@ public class 
ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest
 
    @Test(timeout = 60000)
    public void testOnAcknowledge() throws Exception {
+      testOnAcknowledge(false);
+   }
+
+   @Test(timeout = 60000)
+   public void testOnAcknowledgeBlockOnFailover() throws Exception {
+      // this is validating a case where failover would block
+      // and yet the exception should already happen asynchronously
+      testOnAcknowledge(true);
+   }
+
+   public void testOnAcknowledge(boolean blockOnFailover) throws Exception {
+      mayBlock.set(blockOnFailover);
       Connection sendConnection = null;
       Connection connection = null;
       AtomicReference<JMSException> exceptionOnConnection = new 
AtomicReference<>();
@@ -86,6 +99,10 @@ public class 
ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest
          fail("JMSException expected");
 
       } catch (JMSException e) {
+         if (blockOnFailover) {
+            Wait.assertTrue(blocked::get);
+            unblock();
+         }
          assertTrue(e.getCause() instanceof 
ActiveMQConnectionTimedOutException);
          //Ensure JMS Connection ExceptionListener was also invoked
          assertTrue(Wait.waitFor(() -> exceptionOnConnection.get() != null, 
2000, 100));
@@ -102,6 +119,16 @@ public class 
ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest
 
    @Test(timeout = 60000)
    public void testOnSend() throws Exception {
+      testOnSend(false);
+   }
+
+   @Test(timeout = 60000)
+   public void testOnSendBlockOnFailover() throws Exception {
+      testOnSend(true);
+   }
+
+   public void testOnSend(boolean blockOnFailover) throws Exception {
+      mayBlock.set(blockOnFailover);
       Connection sendConnection = null;
       Connection connection = null;
       AtomicReference<JMSException> exceptionOnConnection = new 
AtomicReference<>();
@@ -125,6 +152,10 @@ public class 
ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest
          fail("JMSException expected");
 
       } catch (JMSException e) {
+         if (blockOnFailover) {
+            Wait.assertTrue(blocked::get);
+            unblock();
+         }
          assertTrue(e.getCause() instanceof 
ActiveMQConnectionTimedOutException);
          //Ensure JMS Connection ExceptionListener was also invoked
          assertTrue(Wait.waitFor(() -> exceptionOnConnection.get() != null, 
2000, 100));
@@ -140,6 +171,30 @@ public class 
ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest
       }
    }
 
+   static AtomicBoolean mayBlock = new AtomicBoolean(true);
+   static AtomicBoolean blocked = new AtomicBoolean(false);
+
+   private static void block() {
+      if (!mayBlock.get()) {
+         return;
+      }
+
+      blocked.set(true);
+
+      try {
+         long timeOut = System.currentTimeMillis() + 5000;
+         while (mayBlock.get() && System.currentTimeMillis() < timeOut) {
+            Thread.yield();
+         }
+      } finally {
+         blocked.set(false);
+      }
+   }
+
+   private static void unblock() {
+      mayBlock.set(false);
+   }
+
 
    static Packet lastPacketSent;
 
@@ -156,6 +211,12 @@ public class 
ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest
 
       @Override
       public boolean intercept(Packet packet, RemotingConnection connection) 
throws ActiveMQException {
+         // CheckForFailoverReply is ignored here, as this is simulating an 
issue where the server is completely not responding, the blocked call should 
throw an exception asynchrnously to the retry
+         if (packet.getType() == PacketImpl.CHECK_FOR_FAILOVER_REPLY) {
+            block();
+            return true;
+         }
+
          if (lastPacketSent.getType() == PacketImpl.SESS_ACKNOWLEDGE && 
packet.getType() == PacketImpl.NULL_RESPONSE) {
             return false;
          }
@@ -167,9 +228,16 @@ public class 
ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest
 
       @Override
       public boolean intercept(Packet packet, RemotingConnection connection) 
throws ActiveMQException {
+         // CheckForFailoverReply is ignored here, as this is simulating an 
issue where the server is completely not responding, the blocked call should 
throw an exception asynchrnously to the retry
+         if (packet.getType() == PacketImpl.CHECK_FOR_FAILOVER_REPLY) {
+            block();
+            return true;
+         }
+
          if (lastPacketSent.getType() == PacketImpl.SESS_SEND && 
packet.getType() == PacketImpl.NULL_RESPONSE) {
             return false;
          }
+
          return true;
       }
    }

Reply via email to