This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 263a44e262 ARTEMIS-4421 Page counters should work before page rebuild 
is done
263a44e262 is described below

commit 263a44e262a6761b670842ba88dcae405946ec5e
Author: Clebert Suconic <clebertsuco...@apache.org>
AuthorDate: Wed Sep 6 19:01:07 2023 -0400

    ARTEMIS-4421 Page counters should work before page rebuild is done
---
 .../journal/AbstractJournalStorageManager.java     |  3 ++
 .../integration/paging/PagingCounterTest.java      | 51 ++++++++++++++++++++--
 2 files changed, 50 insertions(+), 4 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 23800a5577..efa6d2832c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1234,6 +1234,9 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
 
                      if (sub != null) {
                         sub.getCounter().loadValue(record.id, 
encoding.getValue(), encoding.getPersistentSize());
+                        if (encoding.getValue() > 0) {
+                           sub.notEmpty();
+                        }
                      } else {
                         
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPage(encoding.getQueueID());
                         messageJournal.tryAppendDeleteRecord(record.id, 
this::recordNotFoundCallback, false);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
index 97b86f7e11..298749c66a 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
@@ -16,8 +16,11 @@
  */
 package org.apache.activemq.artemis.tests.integration.paging;
 
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
 import javax.transaction.xa.Xid;
-
 import java.lang.invoke.MethodHandles;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
@@ -27,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
@@ -37,13 +41,13 @@ import 
org.apache.activemq.artemis.core.persistence.StorageManager;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.junit.After;
 import org.junit.Assert;
@@ -481,6 +485,45 @@ public class PagingCounterTest extends ActiveMQTestBase {
 
    }
 
+
+   @Test
+   public void testSendNoRebuild() throws Exception {
+      Queue queue = server.createQueue(new QueueConfiguration(new 
SimpleString("A1")).setRoutingType(RoutingType.ANYCAST));
+
+      queue.getPagingStore().startPaging();
+
+      PageSubscriptionCounter counter = locateCounter(queue);
+
+      ConnectionFactory cf = CFUtil.createConnectionFactory("core", 
"tcp://localhost:61616");
+      try (Connection connection = cf.createConnection()) {
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         MessageProducer producer = 
session.createProducer(session.createQueue("A1"));
+         for (int i = 0; i < 3000; i++) {
+            producer.send(session.createTextMessage("i" + i));
+         }
+         session.commit();
+      }
+
+      server.stop();
+
+      server = newActiveMQServer();
+
+      server.setRebuildCounters(false);
+
+      server.start();
+
+      queue = server.locateQueue(new SimpleString("A1"));
+
+      assertNotNull(queue);
+
+      counter = locateCounter(queue);
+
+      logger.debug("Counter:: {}", queue.getMessageCount());
+
+      Wait.assertEquals(3000, counter::getValue);
+      Wait.assertEquals(3000L, queue::getMessageCount, 1000, 100);
+   }
+
    @Override
    @Before
    public void setUp() throws Exception {
@@ -495,9 +538,9 @@ public class PagingCounterTest extends ActiveMQTestBase {
 
       OperationContextImpl.clearContext();
 
-      ActiveMQServer server = super.createServer(true, false);
+      ActiveMQServer server = super.createServer(true, true);
 
-      AddressSettings defaultSetting = new 
AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(20 * 1024);
+      AddressSettings defaultSetting = new 
AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(20 * 
1024).setMaxReadPageMessages(10);
 
       server.getAddressSettingsRepository().addMatch("#", defaultSetting);
 

Reply via email to