Repository: activemq-artemis
Updated Branches:
  refs/heads/master 57b9d979f -> 619b2cb2a


ARTEMIS-1776 Blocked Bridge is not resuming after reconnect

This is still part of ARTEMIS-1776 fix, which still part of the same release as 
we are on now.
Hence I'm not opening a new JIRA for this one.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e5bce133
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e5bce133
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e5bce133

Branch: refs/heads/master
Commit: e5bce13316f7e81bb15a12592622df2ea2632a35
Parents: 57b9d97
Author: Clebert Suconic <clebertsuco...@apache.org>
Authored: Fri Apr 6 10:00:43 2018 -0400
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Fri Apr 6 13:17:13 2018 -0400

----------------------------------------------------------------------
 .../core/server/cluster/impl/BridgeImpl.java    |   9 ++
 .../integration/cluster/bridge/BridgeTest.java  | 148 +++++++++++++++++++
 2 files changed, 157 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e5bce133/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index e40bc46..48f59f4 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -220,6 +220,11 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
       this.server = server;
    }
 
+   /** For tests mainly */
+   public boolean isBlockedOnFlowControl() {
+      return blockedOnFlowControl;
+   }
+
    public static final byte[] getDuplicateBytes(final UUID nodeUUID, final 
long messageID) {
       byte[] bytes = new byte[24];
 
@@ -924,6 +929,10 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
                }
             }
 
+            // need to reset blockedOnFlowControl after creating a new producer
+            // otherwise in case the bridge was blocked before a previous 
failure
+            // this would never resume
+            blockedOnFlowControl = false;
             producer = session.createProducer();
             session.addFailureListener(BridgeImpl.this);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e5bce133/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
index ae60a61..2d6add7 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
@@ -75,10 +75,13 @@ import 
org.apache.activemq.artemis.core.server.transformer.Transformer;
 import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.apache.activemq.artemis.utils.ReusableLatch;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
@@ -251,6 +254,151 @@ public class BridgeTest extends ActiveMQTestBase {
       System.out.println(timeTaken + "ms");
    }
 
+   @Test
+   public void testBlockedBridgeAndReconnect() throws Exception {
+      long time = System.currentTimeMillis();
+      Map<String, Object> server0Params = new HashMap<>();
+      server0 = createClusteredServerWithParams(isNetty(), 0, true, 
server0Params);
+
+      Map<String, Object> server1Params = new HashMap<>();
+      addTargetParameters(server1Params);
+      server1 = createClusteredServerWithParams(isNetty(), 1, true, 
server1Params);
+      server1.getAddressSettingsRepository().clear();
+      server1.getAddressSettingsRepository().addMatch("#", new 
AddressSettings().setMaxSizeBytes(10124 * 
10).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK));
+
+      server0.getAddressSettingsRepository().clear();
+      server0.getAddressSettingsRepository().addMatch("#", new 
AddressSettings().setMaxSizeBytes(Long.MAX_VALUE).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK));
+
+      final String testAddress = "testAddress";
+      final String queueName0 = "queue0";
+      final String forwardAddress = "forwardAddress";
+      final String queueName1 = "queue1";
+
+      TransportConfiguration server0tc = new 
TransportConfiguration(getConnector(), server0Params);
+
+      TransportConfiguration server1tc = new 
TransportConfiguration(getConnector(), server1Params);
+
+      HashMap<String, TransportConfiguration> connectors = new HashMap<>();
+      connectors.put(server1tc.getName(), server1tc);
+      server0.getConfiguration().setConnectorConfigurations(connectors);
+
+      final int messageSize = 1024;
+
+      final int numMessages = 1000;
+
+      ArrayList<String> connectorConfig = new ArrayList<>();
+      connectorConfig.add(server1tc.getName());
+      BridgeConfiguration bridgeConfiguration = new 
BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(100).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(true).setConfirmationWindowSize(numMessages
 * messageSize / 
2).setStaticConnectors(connectorConfig).setProducerWindowSize(1024);
+
+      List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
+      bridgeConfigs.add(bridgeConfiguration);
+      server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+      CoreQueueConfiguration queueConfig0 = new 
CoreQueueConfiguration().setAddress(testAddress).setName(queueName0);
+      List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<>();
+      queueConfigs0.add(queueConfig0);
+      server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+      CoreQueueConfiguration queueConfig1 = new 
CoreQueueConfiguration().setAddress(forwardAddress).setName(queueName1);
+      List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<>();
+      queueConfigs1.add(queueConfig1);
+      server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+      server1.start();
+      server0.start();
+      locator = 
addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, 
server1tc));
+      ClientSessionFactory sf0 = 
addSessionFactory(locator.createSessionFactory(server0tc));
+
+      ClientSessionFactory sf1 = 
addSessionFactory(locator.createSessionFactory(server1tc));
+
+      ClientSession session0 = sf0.createSession(false, true, 0);
+      ClientProducer producer0 = session0.createProducer(new 
SimpleString(testAddress));
+
+      ClientSession session1 = sf1.createSession(true, true, 0);
+      ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+
+      session1.start();
+
+      final byte[] bytes = new byte[messageSize];
+
+      final SimpleString propKey = new SimpleString("testkey");
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = session0.createMessage(true);
+
+         message.putIntProperty(propKey, i);
+
+         message.getBodyBuffer().writeBytes(bytes);
+
+         producer0.send(message);
+
+         if (i % 100 == 0) {
+            session0.commit();
+         }
+      }
+      session0.commit();
+
+      for (int i = 0; i < numMessages / 2; i++) {
+         ClientMessage message = consumer1.receive(5000);
+
+         Assert.assertNotNull(message);
+
+         Assert.assertEquals(i, message.getObjectProperty(propKey));
+
+         message.acknowledge();
+      }
+      session1.commit();
+
+      BridgeImpl bridge = 
(BridgeImpl)server0.getClusterManager().getBridges().get("bridge1");
+
+      // stop in the middle. wait the bridge to block
+      Wait.assertTrue("bridge is never blocked", 
bridge::isBlockedOnFlowControl);
+
+      session1.close();
+      sf1.close();
+
+      // now restart the server.. the bridge should be reconnecting now
+      server1.stop();
+      server1.start();
+
+      sf1 = addSessionFactory(locator.createSessionFactory(server1tc));
+      session1 = sf1.createSession(true, true, 0);
+      consumer1 = session1.createConsumer(queueName1);
+      session1.start();
+
+      // consume the rest of the messages
+      for (int i = numMessages / 2; i < numMessages; i++) {
+         ClientMessage message = consumer1.receive(5000);
+
+         Assert.assertNotNull(message);
+
+         Assert.assertEquals(i, message.getObjectProperty(propKey));
+
+         message.acknowledge();
+      }
+
+
+      Wait.assertEquals(0, 
server0.locateQueue(SimpleString.toSimpleString("queue0"))::getMessageCount);
+
+      Assert.assertNull(consumer1.receiveImmediate());
+
+      session0.close();
+
+      session1.close();
+
+      sf0.close();
+
+      sf1.close();
+
+      closeFields();
+      if (server0.getConfiguration().isPersistenceEnabled()) {
+         assertEquals(0, loadQueues(server0).size());
+      }
+      long timeTaken = System.currentTimeMillis() - time;
+      System.out.println(timeTaken + "ms");
+   }
+
    public void internaltestSimpleBridge(final boolean largeMessage, final 
boolean useFiles) throws Exception {
       Map<String, Object> server0Params = new HashMap<>();
       server0 = createClusteredServerWithParams(isNetty(), 0, useFiles, 
server0Params);

Reply via email to