This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new abb1a7739e Reverting ARTEMIS-4765
abb1a7739e is described below

commit abb1a7739eca6f7961a3421cda12d57252168f19
Author: Clebert Suconic <[email protected]>
AuthorDate: Tue May 7 22:09:24 2024 -0400

    Reverting ARTEMIS-4765
    
    I am gettig a few test failures when running the server in soak with 
Mirroring.
    I will reapply the changes after some testing.
---
 .../artemis/core/postoffice/DuplicateIDCache.java  |  4 --
 .../postoffice/impl/InMemoryDuplicateIDCache.java  | 14 +----
 .../core/postoffice/impl/NoOpDuplicateIDCache.java | 10 ----
 .../impl/PersistentDuplicateIDCache.java           | 16 +-----
 .../core/postoffice/impl/PostOfficeImpl.java       |  4 --
 .../persistence/ResizeDuplicateCacheTest.java      | 64 ----------------------
 .../mirror/SingleMirrorSoakTest.java               | 60 ++++----------------
 7 files changed, 13 insertions(+), 159 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
index 731015f99f..75cc17b7d3 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
@@ -31,10 +31,6 @@ public interface DuplicateIDCache {
 
    void addToCache(byte[] duplicateID, Transaction tx) throws Exception;
 
-   int getSize();
-
-   DuplicateIDCache resize(int newSize);
-
    /**
     * it will add the data to the cache.
     * If TX == null it won't use a transaction.
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java
index 79a7dec905..63c94d7123 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java
@@ -58,7 +58,7 @@ final class InMemoryDuplicateIDCache implements 
DuplicateIDCache {
 
    private int pos;
 
-   private int cacheSize;
+   private final int cacheSize;
 
    InMemoryDuplicateIDCache(final SimpleString address, final int size) {
       this.address = address;
@@ -75,18 +75,6 @@ final class InMemoryDuplicateIDCache implements 
DuplicateIDCache {
       logger.debug("address = {} ignore loading ids: in memory cache won't 
load previously stored ids", address);
    }
 
-   @Override
-   public int getSize() {
-      return cacheSize;
-   }
-
-   @Override
-   public synchronized DuplicateIDCache resize(int newSize) {
-      newSize = Math.max(cache.size(), newSize);
-      this.cacheSize = newSize;
-      return this;
-   }
-
    @Override
    public void deleteFromCache(byte[] duplicateID) {
       deleteFromCache(new ByteArray(duplicateID));
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/NoOpDuplicateIDCache.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/NoOpDuplicateIDCache.java
index 1eaccb858e..998074ed05 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/NoOpDuplicateIDCache.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/NoOpDuplicateIDCache.java
@@ -70,16 +70,6 @@ public final class NoOpDuplicateIDCache implements 
DuplicateIDCache {
 
    }
 
-   @Override
-   public int getSize() {
-      return 0;
-   }
-
-   @Override
-   public DuplicateIDCache resize(int newSize) {
-      return null;
-   }
-
    @Override
    public List<Pair<byte[], Long>> getMap() {
       return Collections.emptyList();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java
index 837077d752..dcaed7c2fe 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java
@@ -62,7 +62,7 @@ final class PersistentDuplicateIDCache implements 
DuplicateIDCache {
 
    private int pos;
 
-   private int cacheSize;
+   private final int cacheSize;
 
    private final StorageManager storageManager;
 
@@ -173,20 +173,6 @@ final class PersistentDuplicateIDCache implements 
DuplicateIDCache {
 
    }
 
-   @Override
-   public int getSize() {
-      return cacheSize;
-   }
-
-   @Override
-   public synchronized DuplicateIDCache resize(int newSize) {
-      // We won't be shrinking items here
-      newSize = Math.max(cache.size(), newSize);
-      this.cacheSize = newSize;
-      logger.trace("newSize = {} after math.min check", newSize);
-      return this;
-   }
-
    private static String describeID(byte[] duplicateID) {
       return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + 
ByteUtil.toSimpleString(duplicateID);
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index c834cf6b8e..1e21bb2639 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1464,10 +1464,6 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
          }
       }
 
-      if (cache.getSize() != cacheSizeToUse) {
-         cache.resize(cacheSizeToUse);
-      }
-
       return cache;
    }
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ResizeDuplicateCacheTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ResizeDuplicateCacheTest.java
deleted file mode 100644
index f4df489e56..0000000000
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ResizeDuplicateCacheTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.persistence;
-
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
-import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
-import org.apache.activemq.artemis.utils.RandomUtil;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class ResizeDuplicateCacheTest extends ActiveMQTestBase {
-
-   @Test
-   public void testResizeCache() throws Exception {
-      int duplicateSize = 30;
-      SimpleString randomString = RandomUtil.randomSimpleString();
-
-      ActiveMQServer server = createServer(true, false);
-      server.start();
-
-      DuplicateIDCache duplicateIDCache = 
server.getPostOffice().getDuplicateIDCache(randomString, duplicateSize);
-
-      for (int i = 0; i < duplicateSize * 2; i++) {
-         duplicateIDCache.addToCache(("a" + 
i).getBytes(StandardCharsets.UTF_8));
-      }
-
-      server.stop();
-      server.start();
-
-      duplicateIDCache = 
server.getPostOffice().getDuplicateIDCache(randomString, duplicateSize);
-
-      Assert.assertEquals(duplicateSize, duplicateIDCache.getSize());
-
-      
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000);
-      HashMap<Integer, AtomicInteger> records = 
countJournal(server.getConfiguration());
-
-      AtomicInteger duplicateRecordsCount = records.get((int) 
JournalRecordIds.DUPLICATE_ID);
-      Assert.assertNotNull(duplicateRecordsCount);
-      Assert.assertEquals(duplicateSize, duplicateRecordsCount.get());
-
-      server.stop();
-   }
-}
\ No newline at end of file
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java
index 196bbc4767..c93d419ff9 100644
--- 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java
@@ -27,18 +27,14 @@ import javax.jms.Topic;
 import java.io.File;
 import java.io.StringWriter;
 import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.management.SimpleManagement;
 import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
-import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.tests.soak.SoakTestBase;
 import org.apache.activemq.artemis.tests.util.CFUtil;
 import org.apache.activemq.artemis.util.ServerUtil;
@@ -66,13 +62,6 @@ public class SingleMirrorSoakTest extends SoakTestBase {
    private static final int SEND_COMMIT = 
TestParameters.testProperty(TEST_NAME, "SEND_COMMIT", 100);
    private static final int KILL_INTERNAL =  
TestParameters.testProperty(TEST_NAME, "KILL_INTERVAL", 500);
    private static final int SNF_TIMEOUT =  
TestParameters.testProperty(TEST_NAME, "SNF_TIMEOUT", 60_000);
-   private static final int GENERAL_WAIT_TIMEOUT =  
TestParameters.testProperty(TEST_NAME, "GENERAL_TIMEOUT", 10_000);
-
-   /*
-    * Time each consumer takes to process a message received to allow some 
messages accumulating.
-    * This sleep happens right before the commit.
-    */
-   private static final int CONSUMER_PROCESSING_TIME = 
TestParameters.testProperty(TEST_NAME, "CONSUMER_PROCESSING_TIME", 0);
 
    private static final String TOPIC_NAME = "topicTest";
 
@@ -93,16 +82,12 @@ public class SingleMirrorSoakTest extends SoakTestBase {
    volatile Process processDC2;
 
    @After
-   public void destroyServers() throws Exception {
+   public void destroyServers() {
       if (processDC1 != null) {
          processDC1.destroyForcibly();
-         processDC1.waitFor(1, TimeUnit.MINUTES);
-         processDC1 = null;
       }
       if (processDC2 != null) {
          processDC2.destroyForcibly();
-         processDC2.waitFor(1, TimeUnit.MINUTES);
-         processDC2 = null;
       }
 
    }
@@ -136,15 +121,6 @@ public class SingleMirrorSoakTest extends SoakTestBase {
       brokerProperties.put("largeMessageSync", "false");
       brokerProperties.put("mirrorAckManagerPageAttempts", "10");
       brokerProperties.put("mirrorAckManagerRetryDelay", "1000");
-
-      if (paging) {
-         brokerProperties.put("addressSettings.#.maxSizeMessages", "1");
-         brokerProperties.put("addressSettings.#.maxReadPageMessages", "1000");
-         brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
-         brokerProperties.put("addressSettings.#.prefetchPageMessages", "100");
-         // un-comment this line if you want to rather use the work around 
without the fix on the PostOfficeImpl
-         // brokerProperties.put("addressSettings.#.iDCacheSize", "1000");
-      }
       // if we don't use pageTransactions we may eventually get a few 
duplicates
       brokerProperties.put("mirrorPageTransaction", "true");
       File brokerPropertiesFile = new File(serverLocation, 
"broker.properties");
@@ -154,6 +130,11 @@ public class SingleMirrorSoakTest extends SoakTestBase {
       Assert.assertTrue(brokerXml.exists());
       // Adding redistribution delay to broker configuration
       Assert.assertTrue(FileUtil.findReplace(brokerXml, "<address-setting 
match=\"#\">", "<address-setting match=\"#\">\n\n" + "            
<redistribution-delay>0</redistribution-delay> <!-- added by 
SimpleMirrorSoakTest.java --> \n"));
+      if (paging) {
+         Assert.assertTrue(FileUtil.findReplace(brokerXml, 
"<max-size-messages>-1</max-size-messages>", 
"<max-size-messages>1</max-size-messages>"));
+         Assert.assertTrue(FileUtil.findReplace(brokerXml, 
"<max-read-page-bytes>20M</max-read-page-bytes>", 
"<max-read-page-bytes>-1</max-read-page-bytes>"));
+         Assert.assertTrue(FileUtil.findReplace(brokerXml, 
"<max-read-page-messages>-1</max-read-page-messages>", 
"<max-read-page-messages>100000</max-read-page-messages>\n" + "            
<prefetch-page-messages>10000</prefetch-page-messages>"));
+      }
 
       if (TRACE_LOGS) {
          File log4j = new File(serverLocation, "/etc/log4j2.properties");
@@ -267,27 +248,10 @@ public class SingleMirrorSoakTest extends SoakTestBase {
 
       Wait.assertEquals(0, () -> getCount(managementDC1, snfQueue), 
SNF_TIMEOUT);
       Wait.assertEquals(0, () -> getCount(managementDC2, snfQueue), 
SNF_TIMEOUT);
-      Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + 
subscriptionID), GENERAL_WAIT_TIMEOUT);
-      Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + 
subscriptionID), GENERAL_WAIT_TIMEOUT);
-      Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + 
subscriptionID), GENERAL_WAIT_TIMEOUT);
-      Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + 
subscriptionID), GENERAL_WAIT_TIMEOUT);
-
-      destroyServers();
-
-      // counting the number of records on duplicate cache
-      // to validate if ARTEMIS-4765 is fixed
-      ActiveMQServer server = createServer(true, false);
-      
server.getConfiguration().setJournalDirectory(getServerLocation(DC2_NODE) + 
"/data/journal");
-      
server.getConfiguration().setBindingsDirectory(getServerLocation(DC2_NODE) + 
"/data/bindings");
-      server.getConfiguration().setPagingDirectory(getServerLocation(DC2_NODE) 
+ "/data/paging");
-      server.start();
-      
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000);
-      HashMap<Integer, AtomicInteger> records = 
countJournal(server.getConfiguration());
-      AtomicInteger duplicateRecordsCount = records.get((int) 
JournalRecordIds.DUPLICATE_ID);
-      Assert.assertNotNull(duplicateRecordsCount);
-      // 1000 credits by default
-      Assert.assertTrue(duplicateRecordsCount.get() <= 1000);
-
+      Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + 
subscriptionID), 10_000);
+      Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + 
subscriptionID), 10_000);
+      Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + 
subscriptionID), 10_000);
+      Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + 
subscriptionID), 10_000);
    }
 
    private static void consume(ConnectionFactory factory,
@@ -319,9 +283,6 @@ public class SingleMirrorSoakTest extends SoakTestBase {
             logger.debug("Consumed {}, large={}", i, 
message.getBooleanProperty("large"));
             pendingCommit++;
             if (pendingCommit >= batchCommit) {
-               if (CONSUMER_PROCESSING_TIME > 0) {
-                  Thread.sleep(CONSUMER_PROCESSING_TIME);
-               }
                logger.info("received {}", i);
                session.commit();
                pendingCommit = 0;
@@ -340,6 +301,7 @@ public class SingleMirrorSoakTest extends SoakTestBase {
    public long getCount(SimpleManagement simpleManagement, String queue) 
throws Exception {
       try {
          long value = simpleManagement.getMessageCountOnQueue(queue);
+         logger.debug("count on queue {} is {}", queue, value);
          return value;
       } catch (Exception e) {
          logger.warn(e.getMessage(), e);

Reply via email to