This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new abb1a7739e Reverting ARTEMIS-4765
abb1a7739e is described below
commit abb1a7739eca6f7961a3421cda12d57252168f19
Author: Clebert Suconic <[email protected]>
AuthorDate: Tue May 7 22:09:24 2024 -0400
Reverting ARTEMIS-4765
I am gettig a few test failures when running the server in soak with
Mirroring.
I will reapply the changes after some testing.
---
.../artemis/core/postoffice/DuplicateIDCache.java | 4 --
.../postoffice/impl/InMemoryDuplicateIDCache.java | 14 +----
.../core/postoffice/impl/NoOpDuplicateIDCache.java | 10 ----
.../impl/PersistentDuplicateIDCache.java | 16 +-----
.../core/postoffice/impl/PostOfficeImpl.java | 4 --
.../persistence/ResizeDuplicateCacheTest.java | 64 ----------------------
.../mirror/SingleMirrorSoakTest.java | 60 ++++----------------
7 files changed, 13 insertions(+), 159 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
index 731015f99f..75cc17b7d3 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
@@ -31,10 +31,6 @@ public interface DuplicateIDCache {
void addToCache(byte[] duplicateID, Transaction tx) throws Exception;
- int getSize();
-
- DuplicateIDCache resize(int newSize);
-
/**
* it will add the data to the cache.
* If TX == null it won't use a transaction.
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java
index 79a7dec905..63c94d7123 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java
@@ -58,7 +58,7 @@ final class InMemoryDuplicateIDCache implements
DuplicateIDCache {
private int pos;
- private int cacheSize;
+ private final int cacheSize;
InMemoryDuplicateIDCache(final SimpleString address, final int size) {
this.address = address;
@@ -75,18 +75,6 @@ final class InMemoryDuplicateIDCache implements
DuplicateIDCache {
logger.debug("address = {} ignore loading ids: in memory cache won't
load previously stored ids", address);
}
- @Override
- public int getSize() {
- return cacheSize;
- }
-
- @Override
- public synchronized DuplicateIDCache resize(int newSize) {
- newSize = Math.max(cache.size(), newSize);
- this.cacheSize = newSize;
- return this;
- }
-
@Override
public void deleteFromCache(byte[] duplicateID) {
deleteFromCache(new ByteArray(duplicateID));
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/NoOpDuplicateIDCache.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/NoOpDuplicateIDCache.java
index 1eaccb858e..998074ed05 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/NoOpDuplicateIDCache.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/NoOpDuplicateIDCache.java
@@ -70,16 +70,6 @@ public final class NoOpDuplicateIDCache implements
DuplicateIDCache {
}
- @Override
- public int getSize() {
- return 0;
- }
-
- @Override
- public DuplicateIDCache resize(int newSize) {
- return null;
- }
-
@Override
public List<Pair<byte[], Long>> getMap() {
return Collections.emptyList();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java
index 837077d752..dcaed7c2fe 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java
@@ -62,7 +62,7 @@ final class PersistentDuplicateIDCache implements
DuplicateIDCache {
private int pos;
- private int cacheSize;
+ private final int cacheSize;
private final StorageManager storageManager;
@@ -173,20 +173,6 @@ final class PersistentDuplicateIDCache implements
DuplicateIDCache {
}
- @Override
- public int getSize() {
- return cacheSize;
- }
-
- @Override
- public synchronized DuplicateIDCache resize(int newSize) {
- // We won't be shrinking items here
- newSize = Math.max(cache.size(), newSize);
- this.cacheSize = newSize;
- logger.trace("newSize = {} after math.min check", newSize);
- return this;
- }
-
private static String describeID(byte[] duplicateID) {
return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" +
ByteUtil.toSimpleString(duplicateID);
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index c834cf6b8e..1e21bb2639 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1464,10 +1464,6 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
}
}
- if (cache.getSize() != cacheSizeToUse) {
- cache.resize(cacheSizeToUse);
- }
-
return cache;
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ResizeDuplicateCacheTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ResizeDuplicateCacheTest.java
deleted file mode 100644
index f4df489e56..0000000000
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ResizeDuplicateCacheTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.persistence;
-
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-import
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
-import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
-import org.apache.activemq.artemis.utils.RandomUtil;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class ResizeDuplicateCacheTest extends ActiveMQTestBase {
-
- @Test
- public void testResizeCache() throws Exception {
- int duplicateSize = 30;
- SimpleString randomString = RandomUtil.randomSimpleString();
-
- ActiveMQServer server = createServer(true, false);
- server.start();
-
- DuplicateIDCache duplicateIDCache =
server.getPostOffice().getDuplicateIDCache(randomString, duplicateSize);
-
- for (int i = 0; i < duplicateSize * 2; i++) {
- duplicateIDCache.addToCache(("a" +
i).getBytes(StandardCharsets.UTF_8));
- }
-
- server.stop();
- server.start();
-
- duplicateIDCache =
server.getPostOffice().getDuplicateIDCache(randomString, duplicateSize);
-
- Assert.assertEquals(duplicateSize, duplicateIDCache.getSize());
-
-
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000);
- HashMap<Integer, AtomicInteger> records =
countJournal(server.getConfiguration());
-
- AtomicInteger duplicateRecordsCount = records.get((int)
JournalRecordIds.DUPLICATE_ID);
- Assert.assertNotNull(duplicateRecordsCount);
- Assert.assertEquals(duplicateSize, duplicateRecordsCount.get());
-
- server.stop();
- }
-}
\ No newline at end of file
diff --git
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java
index 196bbc4767..c93d419ff9 100644
---
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java
+++
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java
@@ -27,18 +27,14 @@ import javax.jms.Topic;
import java.io.File;
import java.io.StringWriter;
import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
-import
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.util.ServerUtil;
@@ -66,13 +62,6 @@ public class SingleMirrorSoakTest extends SoakTestBase {
private static final int SEND_COMMIT =
TestParameters.testProperty(TEST_NAME, "SEND_COMMIT", 100);
private static final int KILL_INTERNAL =
TestParameters.testProperty(TEST_NAME, "KILL_INTERVAL", 500);
private static final int SNF_TIMEOUT =
TestParameters.testProperty(TEST_NAME, "SNF_TIMEOUT", 60_000);
- private static final int GENERAL_WAIT_TIMEOUT =
TestParameters.testProperty(TEST_NAME, "GENERAL_TIMEOUT", 10_000);
-
- /*
- * Time each consumer takes to process a message received to allow some
messages accumulating.
- * This sleep happens right before the commit.
- */
- private static final int CONSUMER_PROCESSING_TIME =
TestParameters.testProperty(TEST_NAME, "CONSUMER_PROCESSING_TIME", 0);
private static final String TOPIC_NAME = "topicTest";
@@ -93,16 +82,12 @@ public class SingleMirrorSoakTest extends SoakTestBase {
volatile Process processDC2;
@After
- public void destroyServers() throws Exception {
+ public void destroyServers() {
if (processDC1 != null) {
processDC1.destroyForcibly();
- processDC1.waitFor(1, TimeUnit.MINUTES);
- processDC1 = null;
}
if (processDC2 != null) {
processDC2.destroyForcibly();
- processDC2.waitFor(1, TimeUnit.MINUTES);
- processDC2 = null;
}
}
@@ -136,15 +121,6 @@ public class SingleMirrorSoakTest extends SoakTestBase {
brokerProperties.put("largeMessageSync", "false");
brokerProperties.put("mirrorAckManagerPageAttempts", "10");
brokerProperties.put("mirrorAckManagerRetryDelay", "1000");
-
- if (paging) {
- brokerProperties.put("addressSettings.#.maxSizeMessages", "1");
- brokerProperties.put("addressSettings.#.maxReadPageMessages", "1000");
- brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
- brokerProperties.put("addressSettings.#.prefetchPageMessages", "100");
- // un-comment this line if you want to rather use the work around
without the fix on the PostOfficeImpl
- // brokerProperties.put("addressSettings.#.iDCacheSize", "1000");
- }
// if we don't use pageTransactions we may eventually get a few
duplicates
brokerProperties.put("mirrorPageTransaction", "true");
File brokerPropertiesFile = new File(serverLocation,
"broker.properties");
@@ -154,6 +130,11 @@ public class SingleMirrorSoakTest extends SoakTestBase {
Assert.assertTrue(brokerXml.exists());
// Adding redistribution delay to broker configuration
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<address-setting
match=\"#\">", "<address-setting match=\"#\">\n\n" + "
<redistribution-delay>0</redistribution-delay> <!-- added by
SimpleMirrorSoakTest.java --> \n"));
+ if (paging) {
+ Assert.assertTrue(FileUtil.findReplace(brokerXml,
"<max-size-messages>-1</max-size-messages>",
"<max-size-messages>1</max-size-messages>"));
+ Assert.assertTrue(FileUtil.findReplace(brokerXml,
"<max-read-page-bytes>20M</max-read-page-bytes>",
"<max-read-page-bytes>-1</max-read-page-bytes>"));
+ Assert.assertTrue(FileUtil.findReplace(brokerXml,
"<max-read-page-messages>-1</max-read-page-messages>",
"<max-read-page-messages>100000</max-read-page-messages>\n" + "
<prefetch-page-messages>10000</prefetch-page-messages>"));
+ }
if (TRACE_LOGS) {
File log4j = new File(serverLocation, "/etc/log4j2.properties");
@@ -267,27 +248,10 @@ public class SingleMirrorSoakTest extends SoakTestBase {
Wait.assertEquals(0, () -> getCount(managementDC1, snfQueue),
SNF_TIMEOUT);
Wait.assertEquals(0, () -> getCount(managementDC2, snfQueue),
SNF_TIMEOUT);
- Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." +
subscriptionID), GENERAL_WAIT_TIMEOUT);
- Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." +
subscriptionID), GENERAL_WAIT_TIMEOUT);
- Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." +
subscriptionID), GENERAL_WAIT_TIMEOUT);
- Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." +
subscriptionID), GENERAL_WAIT_TIMEOUT);
-
- destroyServers();
-
- // counting the number of records on duplicate cache
- // to validate if ARTEMIS-4765 is fixed
- ActiveMQServer server = createServer(true, false);
-
server.getConfiguration().setJournalDirectory(getServerLocation(DC2_NODE) +
"/data/journal");
-
server.getConfiguration().setBindingsDirectory(getServerLocation(DC2_NODE) +
"/data/bindings");
- server.getConfiguration().setPagingDirectory(getServerLocation(DC2_NODE)
+ "/data/paging");
- server.start();
-
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000);
- HashMap<Integer, AtomicInteger> records =
countJournal(server.getConfiguration());
- AtomicInteger duplicateRecordsCount = records.get((int)
JournalRecordIds.DUPLICATE_ID);
- Assert.assertNotNull(duplicateRecordsCount);
- // 1000 credits by default
- Assert.assertTrue(duplicateRecordsCount.get() <= 1000);
-
+ Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." +
subscriptionID), 10_000);
+ Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." +
subscriptionID), 10_000);
+ Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." +
subscriptionID), 10_000);
+ Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." +
subscriptionID), 10_000);
}
private static void consume(ConnectionFactory factory,
@@ -319,9 +283,6 @@ public class SingleMirrorSoakTest extends SoakTestBase {
logger.debug("Consumed {}, large={}", i,
message.getBooleanProperty("large"));
pendingCommit++;
if (pendingCommit >= batchCommit) {
- if (CONSUMER_PROCESSING_TIME > 0) {
- Thread.sleep(CONSUMER_PROCESSING_TIME);
- }
logger.info("received {}", i);
session.commit();
pendingCommit = 0;
@@ -340,6 +301,7 @@ public class SingleMirrorSoakTest extends SoakTestBase {
public long getCount(SimpleManagement simpleManagement, String queue)
throws Exception {
try {
long value = simpleManagement.getMessageCountOnQueue(queue);
+ logger.debug("count on queue {} is {}", queue, value);
return value;
} catch (Exception e) {
logger.warn(e.getMessage(), e);