This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 263a44e262 ARTEMIS-4421 Page counters should work before page rebuild
is done
263a44e262 is described below
commit 263a44e262a6761b670842ba88dcae405946ec5e
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Sep 6 19:01:07 2023 -0400
ARTEMIS-4421 Page counters should work before page rebuild is done
---
.../journal/AbstractJournalStorageManager.java | 3 ++
.../integration/paging/PagingCounterTest.java | 51 ++++++++++++++++++++--
2 files changed, 50 insertions(+), 4 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 23800a5577..efa6d2832c 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1234,6 +1234,9 @@ public abstract class AbstractJournalStorageManager
extends CriticalComponentImp
if (sub != null) {
sub.getCounter().loadValue(record.id,
encoding.getValue(), encoding.getPersistentSize());
+ if (encoding.getValue() > 0) {
+ sub.notEmpty();
+ }
} else {
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPage(encoding.getQueueID());
messageJournal.tryAppendDeleteRecord(record.id,
this::recordNotFoundCallback, false);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
index 97b86f7e11..298749c66a 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
@@ -16,8 +16,11 @@
*/
package org.apache.activemq.artemis.tests.integration.paging;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
import javax.transaction.xa.Xid;
-
import java.lang.invoke.MethodHandles;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
@@ -27,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
@@ -37,13 +41,13 @@ import
org.apache.activemq.artemis.core.persistence.StorageManager;
import
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.After;
import org.junit.Assert;
@@ -481,6 +485,45 @@ public class PagingCounterTest extends ActiveMQTestBase {
}
+
+ @Test
+ public void testSendNoRebuild() throws Exception {
+ Queue queue = server.createQueue(new QueueConfiguration(new
SimpleString("A1")).setRoutingType(RoutingType.ANYCAST));
+
+ queue.getPagingStore().startPaging();
+
+ PageSubscriptionCounter counter = locateCounter(queue);
+
+ ConnectionFactory cf = CFUtil.createConnectionFactory("core",
"tcp://localhost:61616");
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageProducer producer =
session.createProducer(session.createQueue("A1"));
+ for (int i = 0; i < 3000; i++) {
+ producer.send(session.createTextMessage("i" + i));
+ }
+ session.commit();
+ }
+
+ server.stop();
+
+ server = newActiveMQServer();
+
+ server.setRebuildCounters(false);
+
+ server.start();
+
+ queue = server.locateQueue(new SimpleString("A1"));
+
+ assertNotNull(queue);
+
+ counter = locateCounter(queue);
+
+ logger.debug("Counter:: {}", queue.getMessageCount());
+
+ Wait.assertEquals(3000, counter::getValue);
+ Wait.assertEquals(3000L, queue::getMessageCount, 1000, 100);
+ }
+
@Override
@Before
public void setUp() throws Exception {
@@ -495,9 +538,9 @@ public class PagingCounterTest extends ActiveMQTestBase {
OperationContextImpl.clearContext();
- ActiveMQServer server = super.createServer(true, false);
+ ActiveMQServer server = super.createServer(true, true);
- AddressSettings defaultSetting = new
AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(20 * 1024);
+ AddressSettings defaultSetting = new
AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(20 *
1024).setMaxReadPageMessages(10);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);