This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 355c600ea8 ARTEMIS-4765 DuplicateIDCache on Mirror Target is using 20K
elements instead of amqpCredits
355c600ea8 is described below
commit 355c600ea8012c5f95c9428592740824b45d8b86
Author: Clebert Suconic <[email protected]>
AuthorDate: Tue May 7 18:56:57 2024 -0400
ARTEMIS-4765 DuplicateIDCache on Mirror Target is using 20K elements
instead of amqpCredits
---
.../artemis/core/postoffice/DuplicateIDCache.java | 4 ++
.../postoffice/impl/InMemoryDuplicateIDCache.java | 14 ++++-
.../core/postoffice/impl/NoOpDuplicateIDCache.java | 10 ++++
.../impl/PersistentDuplicateIDCache.java | 16 +++++-
.../core/postoffice/impl/PostOfficeImpl.java | 4 ++
.../persistence/ResizeDuplicateCacheTest.java | 64 ++++++++++++++++++++++
.../mirror/SingleMirrorSoakTest.java | 60 ++++++++++++++++----
7 files changed, 159 insertions(+), 13 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
index 75cc17b7d3..731015f99f 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
@@ -31,6 +31,10 @@ public interface DuplicateIDCache {
void addToCache(byte[] duplicateID, Transaction tx) throws Exception;
+ int getSize();
+
+ DuplicateIDCache resize(int newSize);
+
/**
* it will add the data to the cache.
* If TX == null it won't use a transaction.
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java
index 63c94d7123..79a7dec905 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java
@@ -58,7 +58,7 @@ final class InMemoryDuplicateIDCache implements
DuplicateIDCache {
private int pos;
- private final int cacheSize;
+ private int cacheSize;
InMemoryDuplicateIDCache(final SimpleString address, final int size) {
this.address = address;
@@ -75,6 +75,18 @@ final class InMemoryDuplicateIDCache implements
DuplicateIDCache {
logger.debug("address = {} ignore loading ids: in memory cache won't
load previously stored ids", address);
}
+ @Override
+ public int getSize() {
+ return cacheSize;
+ }
+
+ @Override
+ public synchronized DuplicateIDCache resize(int newSize) {
+ newSize = Math.max(cache.size(), newSize);
+ this.cacheSize = newSize;
+ return this;
+ }
+
@Override
public void deleteFromCache(byte[] duplicateID) {
deleteFromCache(new ByteArray(duplicateID));
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/NoOpDuplicateIDCache.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/NoOpDuplicateIDCache.java
index 998074ed05..1eaccb858e 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/NoOpDuplicateIDCache.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/NoOpDuplicateIDCache.java
@@ -70,6 +70,16 @@ public final class NoOpDuplicateIDCache implements
DuplicateIDCache {
}
+ @Override
+ public int getSize() {
+ return 0;
+ }
+
+ @Override
+ public DuplicateIDCache resize(int newSize) {
+ return null;
+ }
+
@Override
public List<Pair<byte[], Long>> getMap() {
return Collections.emptyList();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java
index dcaed7c2fe..837077d752 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java
@@ -62,7 +62,7 @@ final class PersistentDuplicateIDCache implements
DuplicateIDCache {
private int pos;
- private final int cacheSize;
+ private int cacheSize;
private final StorageManager storageManager;
@@ -173,6 +173,20 @@ final class PersistentDuplicateIDCache implements
DuplicateIDCache {
}
+ @Override
+ public int getSize() {
+ return cacheSize;
+ }
+
+ @Override
+ public synchronized DuplicateIDCache resize(int newSize) {
+ // We won't be shrinking items here
+ newSize = Math.max(cache.size(), newSize);
+ this.cacheSize = newSize;
+ logger.trace("newSize = {} after math.min check", newSize);
+ return this;
+ }
+
private static String describeID(byte[] duplicateID) {
return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" +
ByteUtil.toSimpleString(duplicateID);
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 1e21bb2639..c834cf6b8e 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1464,6 +1464,10 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
}
}
+ if (cache.getSize() != cacheSizeToUse) {
+ cache.resize(cacheSizeToUse);
+ }
+
return cache;
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ResizeDuplicateCacheTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ResizeDuplicateCacheTest.java
new file mode 100644
index 0000000000..f4df489e56
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ResizeDuplicateCacheTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.persistence;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ResizeDuplicateCacheTest extends ActiveMQTestBase {
+
+ @Test
+ public void testResizeCache() throws Exception {
+ int duplicateSize = 30;
+ SimpleString randomString = RandomUtil.randomSimpleString();
+
+ ActiveMQServer server = createServer(true, false);
+ server.start();
+
+ DuplicateIDCache duplicateIDCache =
server.getPostOffice().getDuplicateIDCache(randomString, duplicateSize);
+
+ for (int i = 0; i < duplicateSize * 2; i++) {
+ duplicateIDCache.addToCache(("a" +
i).getBytes(StandardCharsets.UTF_8));
+ }
+
+ server.stop();
+ server.start();
+
+ duplicateIDCache =
server.getPostOffice().getDuplicateIDCache(randomString, duplicateSize);
+
+ Assert.assertEquals(duplicateSize, duplicateIDCache.getSize());
+
+
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000);
+ HashMap<Integer, AtomicInteger> records =
countJournal(server.getConfiguration());
+
+ AtomicInteger duplicateRecordsCount = records.get((int)
JournalRecordIds.DUPLICATE_ID);
+ Assert.assertNotNull(duplicateRecordsCount);
+ Assert.assertEquals(duplicateSize, duplicateRecordsCount.get());
+
+ server.stop();
+ }
+}
\ No newline at end of file
diff --git
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java
index c93d419ff9..74d1ff02c8 100644
---
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java
+++
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java
@@ -27,14 +27,18 @@ import javax.jms.Topic;
import java.io.File;
import java.io.StringWriter;
import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.util.ServerUtil;
@@ -62,6 +66,13 @@ public class SingleMirrorSoakTest extends SoakTestBase {
private static final int SEND_COMMIT =
TestParameters.testProperty(TEST_NAME, "SEND_COMMIT", 100);
private static final int KILL_INTERNAL =
TestParameters.testProperty(TEST_NAME, "KILL_INTERVAL", 500);
private static final int SNF_TIMEOUT =
TestParameters.testProperty(TEST_NAME, "SNF_TIMEOUT", 60_000);
+ private static final int GENERAL_WAIT_TIMEOUT =
TestParameters.testProperty(TEST_NAME, "GENERAL_TIMEOUT", 10_000);
+
+ /*
+ * Time each consumer takes to process a message received to allow some
messages accumulating.
+ * This sleep happens right before the commit.
+ */
+ private static final int CONSUMER_PROCESSING_TIME =
TestParameters.testProperty(TEST_NAME, "CONSUMER_PROCESSING_TIME", 0);
private static final String TOPIC_NAME = "topicTest";
@@ -82,12 +93,16 @@ public class SingleMirrorSoakTest extends SoakTestBase {
volatile Process processDC2;
@After
- public void destroyServers() {
+ public void destroyServers() throws Exception {
if (processDC1 != null) {
processDC1.destroyForcibly();
+ processDC1.waitFor(1, TimeUnit.MINUTES);
+ processDC1 = null;
}
if (processDC2 != null) {
processDC2.destroyForcibly();
+ processDC2.waitFor(1, TimeUnit.MINUTES);
+ processDC2 = null;
}
}
@@ -121,6 +136,15 @@ public class SingleMirrorSoakTest extends SoakTestBase {
brokerProperties.put("largeMessageSync", "false");
brokerProperties.put("mirrorAckManagerPageAttempts", "10");
brokerProperties.put("mirrorAckManagerRetryDelay", "1000");
+
+ if (paging) {
+ brokerProperties.put("addressSettings.#.maxSizeMessages", "1");
+ brokerProperties.put("addressSettings.#.maxReadPageMessages",
"100000");
+ brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
+ brokerProperties.put("addressSettings.#.prefetchPageMessages",
"10000");
+ // un-comment this line if you want to rather use the work around
without the fix on the PostOfficeImpl
+ // brokerProperties.put("addressSettings.#.iDCacheSize", "1000");
+ }
// if we don't use pageTransactions we may eventually get a few
duplicates
brokerProperties.put("mirrorPageTransaction", "true");
File brokerPropertiesFile = new File(serverLocation,
"broker.properties");
@@ -130,11 +154,6 @@ public class SingleMirrorSoakTest extends SoakTestBase {
Assert.assertTrue(brokerXml.exists());
// Adding redistribution delay to broker configuration
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<address-setting
match=\"#\">", "<address-setting match=\"#\">\n\n" + "
<redistribution-delay>0</redistribution-delay> <!-- added by
SimpleMirrorSoakTest.java --> \n"));
- if (paging) {
- Assert.assertTrue(FileUtil.findReplace(brokerXml,
"<max-size-messages>-1</max-size-messages>",
"<max-size-messages>1</max-size-messages>"));
- Assert.assertTrue(FileUtil.findReplace(brokerXml,
"<max-read-page-bytes>20M</max-read-page-bytes>",
"<max-read-page-bytes>-1</max-read-page-bytes>"));
- Assert.assertTrue(FileUtil.findReplace(brokerXml,
"<max-read-page-messages>-1</max-read-page-messages>",
"<max-read-page-messages>100000</max-read-page-messages>\n" + "
<prefetch-page-messages>10000</prefetch-page-messages>"));
- }
if (TRACE_LOGS) {
File log4j = new File(serverLocation, "/etc/log4j2.properties");
@@ -248,10 +267,27 @@ public class SingleMirrorSoakTest extends SoakTestBase {
Wait.assertEquals(0, () -> getCount(managementDC1, snfQueue),
SNF_TIMEOUT);
Wait.assertEquals(0, () -> getCount(managementDC2, snfQueue),
SNF_TIMEOUT);
- Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." +
subscriptionID), 10_000);
- Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." +
subscriptionID), 10_000);
- Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." +
subscriptionID), 10_000);
- Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." +
subscriptionID), 10_000);
+ Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." +
subscriptionID), GENERAL_WAIT_TIMEOUT);
+ Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." +
subscriptionID), GENERAL_WAIT_TIMEOUT);
+ Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." +
subscriptionID), GENERAL_WAIT_TIMEOUT);
+ Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." +
subscriptionID), GENERAL_WAIT_TIMEOUT);
+
+ destroyServers();
+
+ // counting the number of records on duplicate cache
+ // to validate if ARTEMIS-4765 is fixed
+ ActiveMQServer server = createServer(true, false);
+
server.getConfiguration().setJournalDirectory(getServerLocation(DC2_NODE) +
"/data/journal");
+
server.getConfiguration().setBindingsDirectory(getServerLocation(DC2_NODE) +
"/data/bindings");
+ server.getConfiguration().setPagingDirectory(getServerLocation(DC2_NODE)
+ "/data/paging");
+ server.start();
+
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000);
+ HashMap<Integer, AtomicInteger> records =
countJournal(server.getConfiguration());
+ AtomicInteger duplicateRecordsCount = records.get((int)
JournalRecordIds.DUPLICATE_ID);
+ Assert.assertNotNull(duplicateRecordsCount);
+ // 1000 credits by default
+ Assert.assertTrue(duplicateRecordsCount.get() <= 1000);
+
}
private static void consume(ConnectionFactory factory,
@@ -283,6 +319,9 @@ public class SingleMirrorSoakTest extends SoakTestBase {
logger.debug("Consumed {}, large={}", i,
message.getBooleanProperty("large"));
pendingCommit++;
if (pendingCommit >= batchCommit) {
+ if (CONSUMER_PROCESSING_TIME > 0) {
+ Thread.sleep(CONSUMER_PROCESSING_TIME);
+ }
logger.info("received {}", i);
session.commit();
pendingCommit = 0;
@@ -301,7 +340,6 @@ public class SingleMirrorSoakTest extends SoakTestBase {
public long getCount(SimpleManagement simpleManagement, String queue)
throws Exception {
try {
long value = simpleManagement.getMessageCountOnQueue(queue);
- logger.debug("count on queue {} is {}", queue, value);
return value;
} catch (Exception e) {
logger.warn(e.getMessage(), e);