This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 49189cd7e6 ARTEMIS-4776 Pages may leak as open on Replicated Target
49189cd7e6 is described below

commit 49189cd7e63a64fcda947dbd72fd7849348b71c9
Author: Clebert Suconic <[email protected]>
AuthorDate: Sat May 18 10:48:48 2024 -0400

    ARTEMIS-4776 Pages may leak as open on Replicated Target
    
    PagingStore is supposed to send an event to replica on every file that is 
closed.
    There are a few situation where the sendClose is being missed and that 
could generate leaks on the target
---
 .../activemq/artemis/core/paging/impl/Page.java    |   8 +-
 .../artemis/core/paging/impl/PagingStoreImpl.java  |   4 +-
 .../core/replication/ReplicationEndpoint.java      |   4 +
 .../core/replication/ReplicationManager.java       |   3 +
 .../artemis/utils/cli/helper/HelperCreate.java     |  16 +-
 .../failover/NettyReplicatedFailoverTest.java      |  88 +++++
 .../artemis/tests/smoke/checkTest/CheckTest.java   |   4 +-
 .../mirror/ReplicatedMirrorTargetTest.java         | 419 +++++++++++++++++++++
 8 files changed, 530 insertions(+), 16 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
index b2c01852a8..ffd6e4e064 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
@@ -231,21 +231,21 @@ public final class Page  {
       return isOpen;
    }
 
-   public void close(boolean sendEvent) throws Exception {
-      close(sendEvent, true);
+   public void close(boolean sendReplicaClose) throws Exception {
+      close(sendReplicaClose, true);
    }
 
    /**
     * sendEvent means it's a close happening from a major event such moveNext.
     * While reading the cache we don't need (and shouldn't inform the backup
     */
-   public synchronized void close(boolean sendEvent, boolean waitSync) throws 
Exception {
+   public synchronized void close(boolean sendReplicaClose, boolean waitSync) 
throws Exception {
       if (readFileBuffer != null) {
          fileFactory.releaseDirectBuffer(readFileBuffer);
          readFileBuffer = null;
       }
 
-      if (sendEvent && storageManager != null) {
+      if (sendReplicaClose && storageManager != null) {
          storageManager.pageClosed(storeName, pageId);
       }
       file.close(waitSync, waitSync);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 63baad12c4..28e7de80e6 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -577,7 +577,7 @@ public class PagingStoreImpl implements PagingStore {
 
       final Page page = currentPage;
       if (page != null) {
-         page.close(false);
+         page.close(true);
          currentPage = null;
       }
    }
@@ -994,7 +994,7 @@ public class PagingStoreImpl implements PagingStore {
                }
 
                returnPage = currentPage;
-               returnPage.close(false);
+               returnPage.close(true);
                resetCurrentPage(null);
 
                // The current page is empty... which means we reached the end 
of the pages
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index b3951c9d1c..4810ba9e9e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -902,6 +902,10 @@ public final class ReplicationEndpoint implements 
ChannelHandler, ActiveMQCompon
       this.executor = executor2;
    }
 
+   public ConcurrentMap<SimpleString, ConcurrentMap<Long, Page>> 
getPageIndex() {
+      return pageIndex;
+   }
+
    /**
     * This is for tests basically, do not use it as its API is not guaranteed 
for future usage.
     */
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index 9bbbf46e3a..3906af597e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -778,6 +778,9 @@ public final class ReplicationManager implements 
ActiveMQComponent {
       } finally {
          if (file.isOpen())
             file.close();
+         if (pageStore != null) {
+            sendReplicatePacket(new ReplicationPageEventMessage(pageStore, id, 
false, remotingConnection.isVersionUsingLongOnPageReplication()));
+         }
       }
    }
 
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/cli/helper/HelperCreate.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/cli/helper/HelperCreate.java
index a2e25267b0..293138e186 100644
--- 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/cli/helper/HelperCreate.java
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/cli/helper/HelperCreate.java
@@ -1,4 +1,4 @@
-/*
+/*false
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -70,7 +70,7 @@ public class HelperCreate extends HelperBase {
 
    private boolean clustered = false;
 
-   private boolean slave = false;
+   private boolean backup = false;
 
    private String staticCluster;
 
@@ -184,12 +184,12 @@ public class HelperCreate extends HelperBase {
       return this;
    }
 
-   public boolean isSlave() {
-      return slave;
+   public boolean isBackup() {
+      return backup;
    }
 
-   public HelperCreate setSlave(boolean slave) {
-      this.slave = slave;
+   public HelperCreate setBackup(boolean backup) {
+      this.backup = backup;
       return this;
    }
 
@@ -301,8 +301,8 @@ public class HelperCreate extends HelperBase {
          add(listCommands, "--no-web");
       }
 
-      if (slave) {
-         add(listCommands, "--slave");
+      if (backup) {
+         add(listCommands, "--backup");
       }
 
       if (replicated) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java
index 36deb2ddea..65277e97e1 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java
@@ -16,13 +16,34 @@
  */
 package org.apache.activemq.artemis.tests.integration.cluster.failover;
 
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import 
org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
 import 
org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer;
 import 
org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class NettyReplicatedFailoverTest extends NettyFailoverTest {
 
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
    @Override
    protected TestableServer createTestableServer(Configuration config) {
       return new SameProcessActiveMQServer(createServer(true, config));
@@ -49,4 +70,71 @@ public class NettyReplicatedFailoverTest extends 
NettyFailoverTest {
    protected final void crash(ClientSession... sessions) throws Exception {
       crash(true, sessions);
    }
+
+   @Test
+   public void testPagedInSync() throws Exception {
+
+      String queueName = "testPagedInSync";
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory("core", 
"tcp://localhost:61616");
+      try (Connection conn = factory.createConnection()) {
+         Session session = conn.createSession(true, 
Session.SESSION_TRANSACTED);
+         Queue queue = session.createQueue(queueName);
+         MessageProducer producer = session.createProducer(queue);
+         producer.send(session.createTextMessage("hello"));
+         session.commit();
+
+         org.apache.activemq.artemis.core.server.Queue serverQueue = 
primaryServer.getServer().locateQueue(queueName);
+         Assert.assertNotNull(serverQueue);
+
+         serverQueue.getPagingStore().startPaging();
+
+         for (int i = 0; i < 50; i++) {
+            producer.send(session.createTextMessage("hello"));
+            session.commit();
+            serverQueue.getPagingStore().forceAnotherPage();
+         }
+         backupServer.stop();
+         backupServer.start();
+         Wait.assertTrue(backupServer.getServer()::isReplicaSync);
+
+         SharedNothingBackupActivation activation = 
(SharedNothingBackupActivation) backupServer.getServer().getActivation();
+         Map<Long, Page> currentPages = 
activation.getReplicationEndpoint().getPageIndex().get(SimpleString.toSimpleString(queueName));
+
+         logger.info("There are {} page files open", currentPages.size());
+         Wait.assertTrue(() -> currentPages.size() <= 1, 10_000);
+
+         producer.send(session.createTextMessage("on currentPage"));
+         session.commit();
+
+         PagingStore store = 
primaryServer.getServer().getPagingManager().getPageStore(SimpleString.toSimpleString(queueName));
+         Page currentPage = store.getCurrentPage();
+         logger.info("Page {}", currentPage.getPageId());
+
+         Page depaged = null;
+         for (; ; ) {
+            depaged = store.depage();
+            if (depaged == null || currentPage.getPageId() == 
depaged.getPageId()) {
+               break;
+            }
+            logger.info("depage :: {} and currentPageID={}", 
depaged.getPageId(), currentPage.getPageId());
+         }
+
+         Assert.assertNotNull(depaged);
+
+         logger.info("Depaged:: {}", depaged.getPageId());
+
+         for (int i = 0;  i < 10; i++) {
+            producer.send(session.createTextMessage("on current page"));
+            session.commit();
+            store.depage();
+         }
+
+         logger.info("Size:: {}", currentPages.size());
+
+         Wait.assertTrue(() -> currentPages.size() <= 1, 10_000);
+
+      }
+   }
+
 }
diff --git 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/checkTest/CheckTest.java
 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/checkTest/CheckTest.java
index 3a13b4d3fa..e0b5bbdae7 100644
--- 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/checkTest/CheckTest.java
+++ 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/checkTest/CheckTest.java
@@ -69,13 +69,13 @@ public class CheckTest extends SmokeTestBase {
 
       {
          HelperCreate cliCreateServer = new HelperCreate();
-         
cliCreateServer.setSharedStore(true).setSlave(false).setSharedStore(true).setDataFolder("./target/check-test/data").setFailoverOnShutdown(true).setStaticCluster("tcp://localhost:61716").setArtemisInstance(server0Location);
+         
cliCreateServer.setSharedStore(true).setBackup(false).setSharedStore(true).setDataFolder("./target/check-test/data").setFailoverOnShutdown(true).setStaticCluster("tcp://localhost:61716").setArtemisInstance(server0Location);
          cliCreateServer.createServer();
       }
 
       {
          HelperCreate cliCreateServer = new HelperCreate();
-         
cliCreateServer.setSharedStore(true).setSlave(true).setSharedStore(true).setDataFolder("./target/check-test/data").setFailoverOnShutdown(true).setStaticCluster("tcp://localhost:61616").setPortOffset(100).setArtemisInstance(server1Location);
+         
cliCreateServer.setSharedStore(true).setBackup(true).setSharedStore(true).setDataFolder("./target/check-test/data").setFailoverOnShutdown(true).setStaticCluster("tcp://localhost:61616").setPortOffset(100).setArtemisInstance(server1Location);
          cliCreateServer.createServer();
       }
    }
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ReplicatedMirrorTargetTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ReplicatedMirrorTargetTest.java
new file mode 100644
index 0000000000..3a1cea632b
--- /dev/null
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ReplicatedMirrorTargetTest.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.brokerConnection.mirror;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.io.File;
+import java.io.StringWriter;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.management.SimpleManagement;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.soak.SoakTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.util.ServerUtil;
+import org.apache.activemq.artemis.utils.FileUtil;
+import org.apache.activemq.artemis.utils.TestParameters;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.actors.OrderedExecutor;
+import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicatedMirrorTargetTest extends SoakTestBase {
+
+   private static final String TEST_NAME = "REPLICATED_MIRROR_SOAK";
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   // Set this to true and log4j will be configured with some relevant 
log.trace for the AckManager at the server's
+   private static final boolean TRACE_LOGS = 
Boolean.parseBoolean(TestParameters.testProperty(TEST_NAME, "TRACE_LOGS", 
"false"));
+   private static final int NUMBER_MESSAGES = 
TestParameters.testProperty(TEST_NAME, "NUMBER_MESSAGES", 2_000);
+
+   // By default consuming 90% of the messages
+   private static final int NUMBER_MESSAGES_RECEIVE = 
TestParameters.testProperty(TEST_NAME, "NUMBER_MESSAGES_RECEIVE", 2_000);
+   private static final int RECEIVE_COMMIT = 
TestParameters.testProperty(TEST_NAME, "RECEIVE_COMMIT", 200);
+   private static final int SEND_COMMIT = 
TestParameters.testProperty(TEST_NAME, "SEND_COMMIT", 200);
+
+   // If -1 means to never kill the target broker
+   private static final int KILL_INTERVAL =  
TestParameters.testProperty(TEST_NAME, "KILL_INTERVAL", 1_000);
+   private static final int SNF_TIMEOUT =  
TestParameters.testProperty(TEST_NAME, "SNF_TIMEOUT", 300_000);
+   private static final int GENERAL_WAIT_TIMEOUT =  
TestParameters.testProperty(TEST_NAME, "GENERAL_TIMEOUT", 10_000);
+
+   /*
+    * Time each consumer takes to process a message received to allow some 
messages accumulating.
+    * This sleep happens right before the commit.
+    */
+   private static final int CONSUMER_PROCESSING_TIME = 
TestParameters.testProperty(TEST_NAME, "CONSUMER_PROCESSING_TIME", 0);
+
+   private static final String TOPIC_NAME = "topicTest";
+
+   private static String body;
+
+   static {
+      StringWriter writer = new StringWriter();
+      while (writer.getBuffer().length() < 30 * 1024) {
+         writer.append("The sky is blue, ..... watch out for poop from the 
birds though!...");
+      }
+      body = writer.toString();
+   }
+
+   public static final String DC1_NODE = "ReplicatedMirrorTargetTest/DC1";
+   public static final String DC2_NODE = "ReplicatedMirrorTargetTest/DC2";
+   public static final String DC2_REPLICA_NODE = 
"ReplicatedMirrorTargetTest/DC2_REPLICA";
+
+   volatile Process processDC1;
+   volatile Process processDC2;
+   volatile Process processDC2_REPLICA;
+
+   @After
+   public void destroyServers() throws Exception {
+      if (processDC1 != null) {
+         processDC1.destroyForcibly();
+         processDC1.waitFor(1, TimeUnit.MINUTES);
+         processDC1 = null;
+      }
+      if (processDC2 != null) {
+         processDC2.destroyForcibly();
+         processDC2.waitFor(1, TimeUnit.MINUTES);
+         processDC2 = null;
+      }
+
+      if (processDC2_REPLICA != null) {
+         processDC2_REPLICA.destroyForcibly();
+         processDC2_REPLICA.waitFor(1, TimeUnit.MINUTES);
+         processDC2_REPLICA = null;
+      }
+
+   }
+
+   private static final String DC1_URI = "tcp://localhost:61616";
+   private static final String DC2_URI = "tcp://localhost:61618";
+
+   private static final String DC2_REPLICA_URI = "tcp://localhost:61619";
+
+   private static void createServer(String serverName,
+                                    String connectionName,
+                                    String mirrorURI,
+                                    int porOffset,
+                                    boolean paging,
+                                    boolean replicated,
+                                    String clusterStatic) throws Exception {
+      File serverLocation = getFileServerLocation(serverName);
+      deleteDirectory(serverLocation);
+
+      HelperCreate cliCreateServer = new HelperCreate();
+      
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
+      cliCreateServer.setNoWeb(false);
+      cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", 
"--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", 
DC1_NODE);
+      cliCreateServer.addArgs("--addresses", TOPIC_NAME);
+      cliCreateServer.setPortOffset(porOffset);
+      if (replicated) {
+         cliCreateServer.setReplicated(true);
+         cliCreateServer.setStaticCluster(clusterStatic);
+         cliCreateServer.setClustered(true);
+      } else {
+         cliCreateServer.setClustered(false);
+      }
+
+      cliCreateServer.createServer();
+
+      Properties brokerProperties = new Properties();
+      brokerProperties.put("AMQPConnections." + connectionName + ".uri", 
mirrorURI);
+      brokerProperties.put("AMQPConnections." + connectionName + 
".retryInterval", "1000");
+      brokerProperties.put("AMQPConnections." + connectionName + ".type", 
AMQPBrokerConnectionAddressType.MIRROR.toString());
+      brokerProperties.put("AMQPConnections." + connectionName + 
".connectionElements.mirror.sync", "false");
+      brokerProperties.put("largeMessageSync", "false");
+
+      if (paging) {
+         brokerProperties.put("addressSettings.#.maxSizeMessages", "1");
+         brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000");
+         brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
+         brokerProperties.put("addressSettings.#.prefetchPageMessages", "500");
+      }
+      // if we don't use pageTransactions we may eventually get a few 
duplicates
+      brokerProperties.put("mirrorPageTransaction", "true");
+      File brokerPropertiesFile = new File(serverLocation, 
"broker.properties");
+      saveProperties(brokerProperties, brokerPropertiesFile);
+
+      File brokerXml = new File(serverLocation, "/etc/broker.xml");
+      Assert.assertTrue(brokerXml.exists());
+      // Adding redistribution delay to broker configuration
+      Assert.assertTrue(FileUtil.findReplace(brokerXml, "<address-setting 
match=\"#\">", "<address-setting match=\"#\">\n\n" + "            
<redistribution-delay>0</redistribution-delay> <!-- added by 
SimpleMirrorSoakTest.java --> \n"));
+
+      if (TRACE_LOGS) {
+         replaceLogs(serverLocation);
+      }
+
+   }
+
+   private static void replaceLogs(File serverLocation) throws Exception {
+      File log4j = new File(serverLocation, "/etc/log4j2.properties");
+      Assert.assertTrue(FileUtil.findReplace(log4j, 
"logger.artemis_utils.level=INFO", "logger.artemis_utils.level=INFO\n" +
+         "\n" + 
"logger.endpoint.name=org.apache.activemq.artemis.core.replication.ReplicationEndpoint\n"
+         + "logger.endpoint.level=DEBUG\n"
+         + "appender.console.filter.threshold.type = ThresholdFilter\n"
+         + "appender.console.filter.threshold.level = info"));
+   }
+
+   private static void createBackupServer(String serverName,
+                                    int porOffset,
+                                    String clusterStatic) throws Exception {
+      File serverLocation = getFileServerLocation(serverName);
+      deleteDirectory(serverLocation);
+
+      HelperCreate cliCreateServer = new HelperCreate();
+      
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
+      cliCreateServer.setMessageLoadBalancing("ON_DEMAND");
+      cliCreateServer.setNoWeb(false);
+      cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", 
"--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", 
DC1_NODE);
+      cliCreateServer.addArgs("--addresses", TOPIC_NAME);
+      cliCreateServer.setPortOffset(porOffset);
+      cliCreateServer.setClustered(true);
+      cliCreateServer.setReplicated(true);
+      cliCreateServer.setBackup(true);
+      cliCreateServer.setStaticCluster(clusterStatic);
+      cliCreateServer.createServer();
+
+      File brokerXml = new File(serverLocation, "/etc/broker.xml");
+      Assert.assertTrue(brokerXml.exists());
+      // Adding redistribution delay to broker configuration
+      Assert.assertTrue(FileUtil.findReplace(brokerXml, "<address-setting 
match=\"#\">", "<address-setting match=\"#\">\n\n" + "            
<redistribution-delay>0</redistribution-delay> <!-- added by 
SimpleMirrorSoakTest.java --> \n"));
+
+      if (TRACE_LOGS) {
+         replaceLogs(serverLocation);
+      }
+   }
+
+   public static void createRealServers(boolean paging) throws Exception {
+      createServer(DC1_NODE, "mirror", DC2_URI, 0, paging, false, null);
+      createServer(DC2_NODE, "mirror", DC1_URI, 2, paging, true, 
DC2_REPLICA_URI);
+      createBackupServer(DC2_REPLICA_NODE, 3, DC2_URI);
+   }
+
+   private void startServers() throws Exception {
+      processDC1 = startServer(DC1_NODE, -1, -1, new 
File(getServerLocation(DC1_NODE), "broker.properties"));
+      processDC2 = startServer(DC2_NODE, -1, -1, new 
File(getServerLocation(DC2_NODE), "broker.properties"));
+
+      ServerUtil.waitForServerToStart(0, 10_000);
+      ServerUtil.waitForServerToStart(2, 10_000);
+
+      processDC2_REPLICA = startServer(DC2_REPLICA_NODE, -1, -1, new 
File(getServerLocation(DC2_NODE), "broker.properties"));
+
+      Thread.sleep(5000);
+   }
+
+   @Test
+   public void testMirrorOnReplica() throws Exception {
+      createRealServers(true);
+      startServers();
+
+
+      Assert.assertTrue(KILL_INTERVAL > SEND_COMMIT || KILL_INTERVAL < 0);
+
+      String clientIDA = "nodeA";
+      String clientIDB = "nodeB";
+      String subscriptionID = "my-order";
+      String snfQueue = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror";
+
+      ConnectionFactory connectionFactoryDC1A = 
CFUtil.createConnectionFactory("amqp", DC1_URI);
+
+      consume(connectionFactoryDC1A, clientIDA, subscriptionID, 0, 0, false, 
false, RECEIVE_COMMIT);
+      consume(connectionFactoryDC1A, clientIDB, subscriptionID, 0, 0, false, 
false, RECEIVE_COMMIT);
+
+      SimpleManagement managementDC1 = new SimpleManagement(DC1_URI, null, 
null);
+      SimpleManagement managementDC2 = new SimpleManagement(DC2_URI, null, 
null);
+
+      runAfter(() -> managementDC1.close());
+      runAfter(() -> managementDC2.close());
+
+      Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + 
subscriptionID));
+      Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + 
subscriptionID));
+      Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + 
subscriptionID));
+      Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + 
subscriptionID));
+
+      ExecutorService executorService = Executors.newFixedThreadPool(3);
+      runAfter(executorService::shutdownNow);
+      CountDownLatch consumerDone = new CountDownLatch(2);
+      executorService.execute(() -> {
+         try {
+            consume(connectionFactoryDC1A, clientIDA, subscriptionID, 0, 
NUMBER_MESSAGES_RECEIVE, false, false, RECEIVE_COMMIT);
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+         } finally {
+            consumerDone.countDown();
+         }
+      });
+      executorService.execute(() -> {
+         try {
+            consume(connectionFactoryDC1A, clientIDB, subscriptionID, 0, 
NUMBER_MESSAGES_RECEIVE, false, false, RECEIVE_COMMIT);
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+         } finally {
+            consumerDone.countDown();
+         }
+      });
+
+      OrderedExecutor restartExeuctor = new OrderedExecutor(executorService);
+      AtomicBoolean running = new AtomicBoolean(true);
+      runAfter(() -> running.set(false));
+
+      try (Connection connection = connectionFactoryDC1A.createConnection()) {
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         MessageProducer producer = 
session.createProducer(session.createTopic(TOPIC_NAME));
+         for (int i = 0; i < NUMBER_MESSAGES; i++) {
+            TextMessage message = session.createTextMessage(body);
+            message.setIntProperty("i", i);
+            message.setBooleanProperty("large", false);
+            producer.send(message);
+            if (i > 0 && i % SEND_COMMIT == 0) {
+               logger.info("Sent {} messages", i);
+               session.commit();
+            }
+            if (KILL_INTERVAL > 0 && i > 0 && i % KILL_INTERVAL == 0) {
+               restartExeuctor.execute(() -> {
+                  if (running.get()) {
+                     try {
+                        logger.info("Restarting target server (DC2)");
+                        if (processDC2_REPLICA != null) {
+                           processDC2_REPLICA.destroyForcibly();
+                           processDC2_REPLICA.waitFor(1, TimeUnit.MINUTES);
+                           processDC2_REPLICA = null;
+                        }
+                        if (processDC2 != null) {
+                           processDC2.destroyForcibly();
+                           processDC2.waitFor(1, TimeUnit.MINUTES);
+                           processDC2 = null;
+                        }
+                        processDC2 = startServer(DC2_NODE, 2, 10_000, new 
File(getServerLocation(DC2_NODE), "broker.properties"));
+                        processDC2_REPLICA = startServer(DC2_REPLICA_NODE, -1, 
-1);
+                     } catch (Exception e) {
+                        logger.warn(e.getMessage(), e);
+                     }
+                  }
+               });
+            }
+         }
+         session.commit();
+         running.set(false);
+      }
+
+      consumerDone.await(SNF_TIMEOUT, TimeUnit.MILLISECONDS);
+
+      Wait.assertEquals(0, () -> getCount(managementDC1, snfQueue), 
SNF_TIMEOUT);
+      Wait.assertEquals(0, () -> getCount(managementDC2, snfQueue), 
SNF_TIMEOUT);
+      Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> 
getCount(managementDC1, clientIDA + "." + subscriptionID), 
GENERAL_WAIT_TIMEOUT);
+      Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> 
getCount(managementDC1, clientIDB + "." + subscriptionID), 
GENERAL_WAIT_TIMEOUT);
+      Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> 
getCount(managementDC2, clientIDA + "." + subscriptionID), 
GENERAL_WAIT_TIMEOUT);
+      Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> 
getCount(managementDC2, clientIDB + "." + subscriptionID), 
GENERAL_WAIT_TIMEOUT);
+
+      destroyServers();
+
+      // counting the number of records on duplicate cache
+      // to validate if ARTEMIS-4765 is fixed
+      ActiveMQServer server = createServer(true, false);
+      
server.getConfiguration().setJournalDirectory(getServerLocation(DC2_NODE) + 
"/data/journal");
+      
server.getConfiguration().setBindingsDirectory(getServerLocation(DC2_NODE) + 
"/data/bindings");
+      server.getConfiguration().setPagingDirectory(getServerLocation(DC2_NODE) 
+ "/data/paging");
+      server.start();
+      
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000);
+      HashMap<Integer, AtomicInteger> records = 
countJournal(server.getConfiguration());
+      AtomicInteger duplicateRecordsCount = records.get((int) 
JournalRecordIds.DUPLICATE_ID);
+      Assert.assertNotNull(duplicateRecordsCount);
+      // 1000 credits by default
+      Assert.assertTrue(duplicateRecordsCount.get() <= 1000);
+
+   }
+
+   private static void consume(ConnectionFactory factory,
+                               String clientID,
+                               String subscriptionID,
+                               int start,
+                               int numberOfMessages,
+                               boolean expectEmpty,
+                               boolean assertBody,
+                               int batchCommit) throws Exception {
+      try (Connection connection = factory.createConnection()) {
+         connection.setClientID(clientID);
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         Topic topic = session.createTopic(TOPIC_NAME);
+         connection.start();
+         MessageConsumer consumer = session.createDurableConsumer(topic, 
subscriptionID);
+         boolean failed = false;
+
+         int pendingCommit = 0;
+
+         for (int i = start; i < start + numberOfMessages; i++) {
+            TextMessage message = (TextMessage) consumer.receive(10_000);
+            Assert.assertNotNull(message);
+            logger.debug("Received message {}, large={}", 
message.getIntProperty("i"), message.getBooleanProperty("large"));
+            if (message.getIntProperty("i") != i) {
+               failed = true;
+               logger.warn("Expected message {} but got {}", i, 
message.getIntProperty("i"));
+            }
+            logger.debug("Consumed {}, large={}", i, 
message.getBooleanProperty("large"));
+            pendingCommit++;
+            if (pendingCommit >= batchCommit) {
+               if (CONSUMER_PROCESSING_TIME > 0) {
+                  Thread.sleep(CONSUMER_PROCESSING_TIME);
+               }
+               logger.info("received {}", i);
+               session.commit();
+               pendingCommit = 0;
+            }
+         }
+         session.commit();
+
+         Assert.assertFalse(failed);
+
+         if (expectEmpty) {
+            Assert.assertNull(consumer.receiveNoWait());
+         }
+      }
+   }
+
+   public long getCount(SimpleManagement simpleManagement, String queue) {
+      try {
+         long value = simpleManagement.getMessageCountOnQueue(queue);
+         logger.info("Queue {} count = {}", queue, value);
+         return value;
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+         return -1;
+      }
+   }
+}

Reply via email to