Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x 0791ee274 -> b703ca315


ARTEMIS-1966 Replication channel closed but not connection if flow controlled 
during replication

(cherry picked from commit 9f8288c0156072b3ae02efd04e1adbcd0abca2c2)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8647ef65
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8647ef65
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8647ef65

Branch: refs/heads/2.6.x
Commit: 8647ef65c1ff88c8f50366b12798bb573ce1f331
Parents: 0791ee2
Author: yang wei <wy96...@gmail.com>
Authored: Thu Jul 5 17:48:11 2018 +0800
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Thu Jul 12 14:50:59 2018 -0400

----------------------------------------------------------------------
 .../core/replication/ReplicationManager.java    |   1 +
 ...SharedNothingReplicationFlowControlTest.java | 295 +++++++++++++++++++
 2 files changed, 296 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8647ef65/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index be5963a..fbf7c6c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -302,6 +302,7 @@ public final class ReplicationManager implements 
ActiveMQComponent {
       RemotingConnection toStop = remotingConnection;
       if (toStop != null) {
          toStop.removeFailureListener(failureListener);
+         toStop.destroy();
       }
       remotingConnection = null;
       started = false;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8647ef65/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java
new file mode 100644
index 0000000..381b617
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.replication;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
+import 
org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.LoaderCallback;
+import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.junit.Wait;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class SharedNothingReplicationFlowControlTest {
+
+   private static final Logger logger = 
Logger.getLogger(SharedNothingReplicationFlowControlTest.class);
+
+   @Rule
+   public TemporaryFolder brokersFolder = new TemporaryFolder();
+
+   @Test
+   public void testReplicationIfFlowControlled() throws Exception {
+      // start live
+      Configuration liveConfiguration = createLiveConfiguration();
+      ActiveMQServer liveServer = 
ActiveMQServers.newActiveMQServer(liveConfiguration);
+      liveServer.start();
+
+      Wait.waitFor(() -> liveServer.isStarted());
+
+      ServerLocator locator = 
ServerLocatorImpl.newLocator("tcp://localhost:61616");
+      locator.setCallTimeout(60_000L);
+      locator.setConnectionTTL(60_000L);
+
+      final ClientSessionFactory csf = locator.createSessionFactory();
+      ClientSession sess = csf.createSession();
+      sess.createQueue("flowcontrol", RoutingType.ANYCAST, "flowcontrol", 
true);
+      sess.close();
+      Executor sendMessageExecutor = Executors.newCachedThreadPool();
+
+      int i = 0;
+      final int j = 100;
+      final CountDownLatch allMessageSent = new CountDownLatch(j);
+
+      // start backup
+      Configuration backupConfiguration = createBackupConfiguration();
+      ActiveMQServer backupServer = 
ActiveMQServers.newActiveMQServer(backupConfiguration);
+      backupServer.start();
+
+      Wait.waitFor(() -> backupServer.isStarted());
+
+      Wait.waitFor(backupServer::isReplicaSync, 30000);
+
+      TestInterceptor interceptor = new TestInterceptor(30000);
+      // Increase latency of handling packet on backup side and flow control 
would work
+      
backupServer.getClusterManager().getClusterController().addIncomingInterceptorForReplication(interceptor);
+
+      byte[] body = new byte[32 * 1024];
+      while (i < j) {
+         sendMessageExecutor.execute(() -> {
+            try {
+               ClientSession session = csf.createSession(true, true);
+               ClientProducer producer = session.createProducer("flowcontrol");
+               ClientMessage message = session.createMessage(true);
+               message.writeBodyBufferBytes(body);
+               logger.infof("try to send a message after replicated");
+               producer.send(message);
+               logger.info("send message done");
+               producer.close();
+               session.close();
+
+               allMessageSent.countDown();
+            } catch (ActiveMQException e) {
+               logger.error("send message", e);
+            }
+         });
+         i++;
+      }
+
+      Assert.assertTrue("all message sent", allMessageSent.await(30, 
TimeUnit.SECONDS));
+      interceptor.setSleepTime(0);
+
+      csf.close();
+      locator.close();
+      Assert.assertTrue("Waiting for replica sync timeout", 
Wait.waitFor(liveServer::isReplicaSync, 30000));
+      backupServer.stop(true);
+      liveServer.stop(true);
+
+      SequentialFileFactory fileFactory;
+
+      File liveJournalDir = 
brokersFolder.getRoot().toPath().resolve("live").resolve("data").resolve("journal").toFile();
+      fileFactory = new 
MappedSequentialFileFactory(liveConfiguration.getJournalLocation(), 
liveConfiguration.getJournalFileSize(), false, 
liveConfiguration.getJournalBufferSize_NIO(), 
liveConfiguration.getJournalBufferTimeout_NIO(), null);
+
+      JournalImpl liveMessageJournal = new 
JournalImpl(liveConfiguration.getJournalFileSize(), 
liveConfiguration.getJournalMinFiles(), 
liveConfiguration.getJournalPoolFiles(), 
liveConfiguration.getJournalCompactMinFiles(), 
liveConfiguration.getJournalCompactPercentage(), fileFactory, "activemq-data", 
"amq", fileFactory.getMaxIO());
+
+      liveMessageJournal.start();
+      final AtomicInteger liveJournalCounter = new AtomicInteger();
+      liveMessageJournal.load(new 
SharedNothingReplicationFlowControlTest.AddRecordLoaderCallback() {
+         @Override
+         public void addRecord(RecordInfo info) {
+            if (!(info.userRecordType == 
JournalRecordIds.ADD_MESSAGE_PROTOCOL)) {
+               // ignore
+            }
+            logger.infof("got live message %d %d", info.id, 
info.userRecordType);
+            liveJournalCounter.incrementAndGet();
+         }
+      });
+
+      // read backup's journal
+      File backupJournalDir = 
brokersFolder.getRoot().toPath().resolve("backup").resolve("data").resolve("journal").toFile();
+      fileFactory = new 
MappedSequentialFileFactory(backupConfiguration.getJournalLocation(), 
backupConfiguration.getJournalFileSize(), false, 
backupConfiguration.getJournalBufferSize_NIO(), 
backupConfiguration.getJournalBufferTimeout_NIO(), null);
+
+      JournalImpl backupMessageJournal = new 
JournalImpl(backupConfiguration.getJournalFileSize(), 
backupConfiguration.getJournalMinFiles(), 
backupConfiguration.getJournalPoolFiles(), 
backupConfiguration.getJournalCompactMinFiles(), 
backupConfiguration.getJournalCompactPercentage(), fileFactory, 
"activemq-data", "amq", fileFactory.getMaxIO());
+
+      backupMessageJournal.start();
+
+      final AtomicInteger replicationCounter = new AtomicInteger();
+      backupMessageJournal.load(new 
SharedNothingReplicationFlowControlTest.AddRecordLoaderCallback() {
+         @Override
+         public void addRecord(RecordInfo info) {
+            if (!(info.userRecordType == 
JournalRecordIds.ADD_MESSAGE_PROTOCOL)) {
+               // ignore
+            }
+            logger.infof("replicated message %d", info.id);
+            replicationCounter.incrementAndGet();
+         }
+      });
+
+      logger.infof("expected %d messages, live=%d, backup=%d", j, 
liveJournalCounter.get(), replicationCounter.get());
+      Assert.assertEquals("Live lost journal record", j, 
liveJournalCounter.get());
+      Assert.assertEquals("Backup did not replicated all journal", j, 
replicationCounter.get());
+   }
+
+   // Set a small call timeout and write buffer high water mark value to 
trigger replication flow control
+   private Configuration createLiveConfiguration() throws Exception {
+      Configuration conf = new ConfigurationImpl();
+      conf.setName("localhost::live");
+
+      File liveDir = brokersFolder.newFolder("live");
+      conf.setBrokerInstance(liveDir);
+
+      conf.addAcceptorConfiguration("live", 
"tcp://localhost:61616?writeBufferHighWaterMark=2048&writeBufferLowWaterMark=2048");
+      conf.addConnectorConfiguration("backup", "tcp://localhost:61617");
+      conf.addConnectorConfiguration("live", "tcp://localhost:61616");
+
+      conf.setClusterUser("mycluster");
+      conf.setClusterPassword("mypassword");
+
+      ReplicatedPolicyConfiguration haPolicy = new 
ReplicatedPolicyConfiguration();
+      haPolicy.setVoteOnReplicationFailure(false);
+      haPolicy.setCheckForLiveServer(false);
+      conf.setHAPolicyConfiguration(haPolicy);
+
+      ClusterConnectionConfiguration ccconf = new 
ClusterConnectionConfiguration();
+      ccconf.setStaticConnectors(new 
ArrayList<>()).getStaticConnectors().add("backup");
+      ccconf.setName("cluster");
+      ccconf.setConnectorName("live");
+      ccconf.setCallTimeout(4000);
+      conf.addClusterConfiguration(ccconf);
+
+      
conf.setSecurityEnabled(false).setJMXManagementEnabled(false).setJournalType(JournalType.MAPPED).setJournalFileSize(1024
 * 512).setConnectionTTLOverride(60_000L);
+
+      return conf;
+   }
+
+   private Configuration createBackupConfiguration() throws Exception {
+      Configuration conf = new ConfigurationImpl();
+      conf.setName("localhost::backup");
+
+      File backupDir = brokersFolder.newFolder("backup");
+      conf.setBrokerInstance(backupDir);
+
+      ReplicaPolicyConfiguration haPolicy = new ReplicaPolicyConfiguration();
+      haPolicy.setClusterName("cluster");
+      conf.setHAPolicyConfiguration(haPolicy);
+
+      conf.addAcceptorConfiguration("backup", "tcp://localhost:61617");
+      conf.addConnectorConfiguration("live", "tcp://localhost:61616");
+      conf.addConnectorConfiguration("backup", "tcp://localhost:61617");
+
+      conf.setClusterUser("mycluster");
+      conf.setClusterPassword("mypassword");
+
+      ClusterConnectionConfiguration ccconf = new 
ClusterConnectionConfiguration();
+      ccconf.setStaticConnectors(new 
ArrayList<>()).getStaticConnectors().add("live");
+      ccconf.setName("cluster");
+      ccconf.setConnectorName("backup");
+      conf.addClusterConfiguration(ccconf);
+
+      /**
+       * Set a bad url then, as a result the backup node would make a decision
+       * of replicating from live node in the case of connection failure.
+       * Set big check period to not schedule checking which would stop server.
+       */
+      
conf.setNetworkCheckPeriod(1000000).setNetworkCheckURLList("http://localhost:28787";).setSecurityEnabled(false).setJMXManagementEnabled(false).setJournalType(JournalType.MAPPED).setJournalFileSize(1024
 * 512).setConnectionTTLOverride(60_000L);
+
+      return conf;
+   }
+
+   abstract class AddRecordLoaderCallback implements LoaderCallback {
+
+      @Override
+      public void addPreparedTransaction(PreparedTransactionInfo 
preparedTransaction) {
+
+      }
+
+      @Override
+      public void deleteRecord(long id) {
+
+      }
+
+      @Override
+      public void updateRecord(RecordInfo info) {
+
+      }
+
+      @Override
+      public void failedTransaction(long transactionID, List<RecordInfo> 
records, List<RecordInfo> recordsToDelete) {
+
+      }
+   }
+
+   public static final class TestInterceptor implements Interceptor {
+
+      private long sleepTime;
+
+      public TestInterceptor(long sleepTime) {
+         this.sleepTime = sleepTime;
+      }
+
+      public void setSleepTime(long sleepTime) {
+         this.sleepTime = sleepTime;
+      }
+
+      @Override
+      public boolean intercept(Packet packet, RemotingConnection connection) 
throws ActiveMQException {
+         try {
+            long startTime = System.currentTimeMillis();
+            while (System.currentTimeMillis() < startTime + sleepTime) {
+               Thread.sleep(100);
+            }
+         } catch (InterruptedException e) {
+            e.printStackTrace();
+         }
+         return true;
+      }
+   }
+}

Reply via email to