This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 355c600ea8 ARTEMIS-4765 DuplicateIDCache on Mirror Target is using 20K 
elements instead of amqpCredits
355c600ea8 is described below

commit 355c600ea8012c5f95c9428592740824b45d8b86
Author: Clebert Suconic <[email protected]>
AuthorDate: Tue May 7 18:56:57 2024 -0400

    ARTEMIS-4765 DuplicateIDCache on Mirror Target is using 20K elements 
instead of amqpCredits
---
 .../artemis/core/postoffice/DuplicateIDCache.java  |  4 ++
 .../postoffice/impl/InMemoryDuplicateIDCache.java  | 14 ++++-
 .../core/postoffice/impl/NoOpDuplicateIDCache.java | 10 ++++
 .../impl/PersistentDuplicateIDCache.java           | 16 +++++-
 .../core/postoffice/impl/PostOfficeImpl.java       |  4 ++
 .../persistence/ResizeDuplicateCacheTest.java      | 64 ++++++++++++++++++++++
 .../mirror/SingleMirrorSoakTest.java               | 60 ++++++++++++++++----
 7 files changed, 159 insertions(+), 13 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
index 75cc17b7d3..731015f99f 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
@@ -31,6 +31,10 @@ public interface DuplicateIDCache {
 
    void addToCache(byte[] duplicateID, Transaction tx) throws Exception;
 
+   int getSize();
+
+   DuplicateIDCache resize(int newSize);
+
    /**
     * it will add the data to the cache.
     * If TX == null it won't use a transaction.
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java
index 63c94d7123..79a7dec905 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java
@@ -58,7 +58,7 @@ final class InMemoryDuplicateIDCache implements 
DuplicateIDCache {
 
    private int pos;
 
-   private final int cacheSize;
+   private int cacheSize;
 
    InMemoryDuplicateIDCache(final SimpleString address, final int size) {
       this.address = address;
@@ -75,6 +75,18 @@ final class InMemoryDuplicateIDCache implements 
DuplicateIDCache {
       logger.debug("address = {} ignore loading ids: in memory cache won't 
load previously stored ids", address);
    }
 
+   @Override
+   public int getSize() {
+      return cacheSize;
+   }
+
+   @Override
+   public synchronized DuplicateIDCache resize(int newSize) {
+      newSize = Math.max(cache.size(), newSize);
+      this.cacheSize = newSize;
+      return this;
+   }
+
    @Override
    public void deleteFromCache(byte[] duplicateID) {
       deleteFromCache(new ByteArray(duplicateID));
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/NoOpDuplicateIDCache.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/NoOpDuplicateIDCache.java
index 998074ed05..1eaccb858e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/NoOpDuplicateIDCache.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/NoOpDuplicateIDCache.java
@@ -70,6 +70,16 @@ public final class NoOpDuplicateIDCache implements 
DuplicateIDCache {
 
    }
 
+   @Override
+   public int getSize() {
+      return 0;
+   }
+
+   @Override
+   public DuplicateIDCache resize(int newSize) {
+      return null;
+   }
+
    @Override
    public List<Pair<byte[], Long>> getMap() {
       return Collections.emptyList();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java
index dcaed7c2fe..837077d752 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java
@@ -62,7 +62,7 @@ final class PersistentDuplicateIDCache implements 
DuplicateIDCache {
 
    private int pos;
 
-   private final int cacheSize;
+   private int cacheSize;
 
    private final StorageManager storageManager;
 
@@ -173,6 +173,20 @@ final class PersistentDuplicateIDCache implements 
DuplicateIDCache {
 
    }
 
+   @Override
+   public int getSize() {
+      return cacheSize;
+   }
+
+   @Override
+   public synchronized DuplicateIDCache resize(int newSize) {
+      // We won't be shrinking items here
+      newSize = Math.max(cache.size(), newSize);
+      this.cacheSize = newSize;
+      logger.trace("newSize = {} after math.min check", newSize);
+      return this;
+   }
+
    private static String describeID(byte[] duplicateID) {
       return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + 
ByteUtil.toSimpleString(duplicateID);
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 1e21bb2639..c834cf6b8e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1464,6 +1464,10 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
          }
       }
 
+      if (cache.getSize() != cacheSizeToUse) {
+         cache.resize(cacheSizeToUse);
+      }
+
       return cache;
    }
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ResizeDuplicateCacheTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ResizeDuplicateCacheTest.java
new file mode 100644
index 0000000000..f4df489e56
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ResizeDuplicateCacheTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.persistence;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ResizeDuplicateCacheTest extends ActiveMQTestBase {
+
+   @Test
+   public void testResizeCache() throws Exception {
+      int duplicateSize = 30;
+      SimpleString randomString = RandomUtil.randomSimpleString();
+
+      ActiveMQServer server = createServer(true, false);
+      server.start();
+
+      DuplicateIDCache duplicateIDCache = 
server.getPostOffice().getDuplicateIDCache(randomString, duplicateSize);
+
+      for (int i = 0; i < duplicateSize * 2; i++) {
+         duplicateIDCache.addToCache(("a" + 
i).getBytes(StandardCharsets.UTF_8));
+      }
+
+      server.stop();
+      server.start();
+
+      duplicateIDCache = 
server.getPostOffice().getDuplicateIDCache(randomString, duplicateSize);
+
+      Assert.assertEquals(duplicateSize, duplicateIDCache.getSize());
+
+      
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000);
+      HashMap<Integer, AtomicInteger> records = 
countJournal(server.getConfiguration());
+
+      AtomicInteger duplicateRecordsCount = records.get((int) 
JournalRecordIds.DUPLICATE_ID);
+      Assert.assertNotNull(duplicateRecordsCount);
+      Assert.assertEquals(duplicateSize, duplicateRecordsCount.get());
+
+      server.stop();
+   }
+}
\ No newline at end of file
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java
index c93d419ff9..74d1ff02c8 100644
--- 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java
@@ -27,14 +27,18 @@ import javax.jms.Topic;
 import java.io.File;
 import java.io.StringWriter;
 import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.management.SimpleManagement;
 import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.tests.soak.SoakTestBase;
 import org.apache.activemq.artemis.tests.util.CFUtil;
 import org.apache.activemq.artemis.util.ServerUtil;
@@ -62,6 +66,13 @@ public class SingleMirrorSoakTest extends SoakTestBase {
    private static final int SEND_COMMIT = 
TestParameters.testProperty(TEST_NAME, "SEND_COMMIT", 100);
    private static final int KILL_INTERNAL =  
TestParameters.testProperty(TEST_NAME, "KILL_INTERVAL", 500);
    private static final int SNF_TIMEOUT =  
TestParameters.testProperty(TEST_NAME, "SNF_TIMEOUT", 60_000);
+   private static final int GENERAL_WAIT_TIMEOUT =  
TestParameters.testProperty(TEST_NAME, "GENERAL_TIMEOUT", 10_000);
+
+   /*
+    * Time each consumer takes to process a message received to allow some 
messages accumulating.
+    * This sleep happens right before the commit.
+    */
+   private static final int CONSUMER_PROCESSING_TIME = 
TestParameters.testProperty(TEST_NAME, "CONSUMER_PROCESSING_TIME", 0);
 
    private static final String TOPIC_NAME = "topicTest";
 
@@ -82,12 +93,16 @@ public class SingleMirrorSoakTest extends SoakTestBase {
    volatile Process processDC2;
 
    @After
-   public void destroyServers() {
+   public void destroyServers() throws Exception {
       if (processDC1 != null) {
          processDC1.destroyForcibly();
+         processDC1.waitFor(1, TimeUnit.MINUTES);
+         processDC1 = null;
       }
       if (processDC2 != null) {
          processDC2.destroyForcibly();
+         processDC2.waitFor(1, TimeUnit.MINUTES);
+         processDC2 = null;
       }
 
    }
@@ -121,6 +136,15 @@ public class SingleMirrorSoakTest extends SoakTestBase {
       brokerProperties.put("largeMessageSync", "false");
       brokerProperties.put("mirrorAckManagerPageAttempts", "10");
       brokerProperties.put("mirrorAckManagerRetryDelay", "1000");
+
+      if (paging) {
+         brokerProperties.put("addressSettings.#.maxSizeMessages", "1");
+         brokerProperties.put("addressSettings.#.maxReadPageMessages", 
"100000");
+         brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
+         brokerProperties.put("addressSettings.#.prefetchPageMessages", 
"10000");
+         // un-comment this line if you want to rather use the work around 
without the fix on the PostOfficeImpl
+         // brokerProperties.put("addressSettings.#.iDCacheSize", "1000");
+      }
       // if we don't use pageTransactions we may eventually get a few 
duplicates
       brokerProperties.put("mirrorPageTransaction", "true");
       File brokerPropertiesFile = new File(serverLocation, 
"broker.properties");
@@ -130,11 +154,6 @@ public class SingleMirrorSoakTest extends SoakTestBase {
       Assert.assertTrue(brokerXml.exists());
       // Adding redistribution delay to broker configuration
       Assert.assertTrue(FileUtil.findReplace(brokerXml, "<address-setting 
match=\"#\">", "<address-setting match=\"#\">\n\n" + "            
<redistribution-delay>0</redistribution-delay> <!-- added by 
SimpleMirrorSoakTest.java --> \n"));
-      if (paging) {
-         Assert.assertTrue(FileUtil.findReplace(brokerXml, 
"<max-size-messages>-1</max-size-messages>", 
"<max-size-messages>1</max-size-messages>"));
-         Assert.assertTrue(FileUtil.findReplace(brokerXml, 
"<max-read-page-bytes>20M</max-read-page-bytes>", 
"<max-read-page-bytes>-1</max-read-page-bytes>"));
-         Assert.assertTrue(FileUtil.findReplace(brokerXml, 
"<max-read-page-messages>-1</max-read-page-messages>", 
"<max-read-page-messages>100000</max-read-page-messages>\n" + "            
<prefetch-page-messages>10000</prefetch-page-messages>"));
-      }
 
       if (TRACE_LOGS) {
          File log4j = new File(serverLocation, "/etc/log4j2.properties");
@@ -248,10 +267,27 @@ public class SingleMirrorSoakTest extends SoakTestBase {
 
       Wait.assertEquals(0, () -> getCount(managementDC1, snfQueue), 
SNF_TIMEOUT);
       Wait.assertEquals(0, () -> getCount(managementDC2, snfQueue), 
SNF_TIMEOUT);
-      Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + 
subscriptionID), 10_000);
-      Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + 
subscriptionID), 10_000);
-      Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + 
subscriptionID), 10_000);
-      Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + 
subscriptionID), 10_000);
+      Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + 
subscriptionID), GENERAL_WAIT_TIMEOUT);
+      Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + 
subscriptionID), GENERAL_WAIT_TIMEOUT);
+      Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + 
subscriptionID), GENERAL_WAIT_TIMEOUT);
+      Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + 
subscriptionID), GENERAL_WAIT_TIMEOUT);
+
+      destroyServers();
+
+      // counting the number of records on duplicate cache
+      // to validate if ARTEMIS-4765 is fixed
+      ActiveMQServer server = createServer(true, false);
+      
server.getConfiguration().setJournalDirectory(getServerLocation(DC2_NODE) + 
"/data/journal");
+      
server.getConfiguration().setBindingsDirectory(getServerLocation(DC2_NODE) + 
"/data/bindings");
+      server.getConfiguration().setPagingDirectory(getServerLocation(DC2_NODE) 
+ "/data/paging");
+      server.start();
+      
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000);
+      HashMap<Integer, AtomicInteger> records = 
countJournal(server.getConfiguration());
+      AtomicInteger duplicateRecordsCount = records.get((int) 
JournalRecordIds.DUPLICATE_ID);
+      Assert.assertNotNull(duplicateRecordsCount);
+      // 1000 credits by default
+      Assert.assertTrue(duplicateRecordsCount.get() <= 1000);
+
    }
 
    private static void consume(ConnectionFactory factory,
@@ -283,6 +319,9 @@ public class SingleMirrorSoakTest extends SoakTestBase {
             logger.debug("Consumed {}, large={}", i, 
message.getBooleanProperty("large"));
             pendingCommit++;
             if (pendingCommit >= batchCommit) {
+               if (CONSUMER_PROCESSING_TIME > 0) {
+                  Thread.sleep(CONSUMER_PROCESSING_TIME);
+               }
                logger.info("received {}", i);
                session.commit();
                pendingCommit = 0;
@@ -301,7 +340,6 @@ public class SingleMirrorSoakTest extends SoakTestBase {
    public long getCount(SimpleManagement simpleManagement, String queue) 
throws Exception {
       try {
          long value = simpleManagement.getMessageCountOnQueue(queue);
-         logger.debug("count on queue {} is {}", queue, value);
          return value;
       } catch (Exception e) {
          logger.warn(e.getMessage(), e);

Reply via email to