This is an automated email from the ASF dual-hosted git repository. mattrpav pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push: new ec633b3d9b [AMQ-9773] Fix for only one message being recovered from backup ec633b3d9b is described below commit ec633b3d9b0398cbe7814cd6bccf23f6c62fab15 Author: Matt Pavlovich <mattr...@apache.org> AuthorDate: Wed Sep 24 18:02:12 2025 -0500 [AMQ-9773] Fix for only one message being recovered from backup --- .../apache/activemq/store/kahadb/KahaDBStore.java | 21 +++++++++--- .../kahadb/KahaDBOffsetRecoveryListenerTest.java | 37 +++++++++++++++------- 2 files changed, 41 insertions(+), 17 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index f52c49421e..d0fa3bda84 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -754,6 +754,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, public void execute(Transaction tx) throws Exception { StoredDestination sd = getStoredDestination(dest, tx); + /* + The endSequenceOffset is used when only endMessageId is requested + there is a disconnect between iterator offset and a destination's + sequence key. + + If a destination has already processed messages, then the sequence key + value is the number of total messages processed through the queue all-time. + */ Long startSequenceOffset = null; Long endSequenceOffset = null; @@ -766,16 +774,15 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, if(messageRecoveryContext.getEndMessageId() != null && !messageRecoveryContext.getEndMessageId().isBlank()) { endSequenceOffset = Optional.ofNullable(sd.messageIdIndex.get(tx, messageRecoveryContext.getEndMessageId())) .orElse(startSequenceOffset + Long.valueOf(messageRecoveryContext.getMaxMessageCountReturned())); - } else { - endSequenceOffset = startSequenceOffset + Long.valueOf(messageRecoveryContext.getMaxMessageCountReturned()); + messageRecoveryContext.setEndSequenceId(endSequenceOffset); } - if(endSequenceOffset < startSequenceOffset) { + if(endSequenceOffset != null && + endSequenceOffset < startSequenceOffset) { LOG.warn("Invalid offset parameters start:{} end:{}", startSequenceOffset, endSequenceOffset); throw new IllegalStateException("Invalid offset parameters start:" + startSequenceOffset + " end:" + endSequenceOffset); } - messageRecoveryContext.setEndSequenceId(endSequenceOffset); Entry<Long, MessageKeys> entry = null; recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, messageRecoveryContext.getMaxMessageCountReturned(), messageRecoveryContext); Set<String> ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName()); @@ -796,7 +803,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, break; } } - sd.orderIndex.stoppedIterating(); + + // The sd.orderIndex uses the destination's cursor + if(!messageRecoveryContext.isUseDedicatedCursor()) { + sd.orderIndex.stoppedIterating(); + } } }); } finally { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java index 1b3f1e8afa..613b620d72 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java @@ -32,7 +32,9 @@ import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,9 +54,17 @@ public class KahaDBOffsetRecoveryListenerTest { protected BrokerService brokerService = null; protected BrokerService restartBrokerService = null; + @Rule + public TestName testName = new TestName(); + + protected final int PRETEST_MSG_COUNT = 17531; + @Before public void beforeEach() throws Exception { - + // Send+Recv a odd number of messages beyond cache sizes + // to confirm the queue's sequence number gets pushed off + sendMessages(PRETEST_MSG_COUNT, testName.getMethodName()); + assertEquals(Integer.valueOf(PRETEST_MSG_COUNT), Integer.valueOf(receiveMessages(testName.getMethodName()))); } @After @@ -68,7 +78,7 @@ public class KahaDBOffsetRecoveryListenerTest { broker.setUseJmx(false); broker.setPersistenceAdapter(kaha); broker.start(); - broker.waitUntilStarted(10_000l); + broker.waitUntilStarted(10_000L); return broker; } @@ -130,7 +140,7 @@ public class KahaDBOffsetRecoveryListenerTest { restartBrokerService = createBroker(createStore(false)); restartBrokerService.start(); - restartBrokerService.waitUntilStarted(30_000l); + restartBrokerService.waitUntilStarted(30_000L); assertEquals(sendCount, receiveMessages(queueName)); restartBrokerService.stop(); @@ -139,42 +149,42 @@ public class KahaDBOffsetRecoveryListenerTest { @Test public void testOffsetZero() throws Exception { - runOffsetTest(1_000, 1_000, 0, 1, 1, 0, "TEST.OFFSET.ZERO"); + runOffsetTest(1_000, 1_000, 0, 1, 1, 0, testName.getMethodName()); } @Test public void testOffsetOne() throws Exception { - runOffsetTest(1_000, 1_000, 1, 1, 1, 1, "TEST.OFFSET.ONE"); + runOffsetTest(1_000, 1_000, 1, 1, 1, 1, testName.getMethodName()); } @Test public void testOffsetLastMinusOne() throws Exception { - runOffsetTest(1_000, 1_000, 999, 1, 1, 999, "TEST.OFFSET.LASTMINUSONE"); + runOffsetTest(1_000, 1_000, 999, 1, 1, 999, testName.getMethodName()); } @Test public void testOffsetLast() throws Exception { - runOffsetTest(1_000, 1_000, 1_000, 1, 0, -1, "TEST.OFFSET.LAST"); + runOffsetTest(1_000, 1_000, 1_000, 1, 0, -1, testName.getMethodName()); } @Test public void testOffsetBeyondQueueSizeNoError() throws Exception { - runOffsetTest(1_000, 1_000, 10_000, 1, 0, -1, "TEST.OFFSET.BEYOND"); + runOffsetTest(1_000, 1_000, 10_000, 1, 0, -1, testName.getMethodName()); } @Test public void testOffsetEmptyQueue() throws Exception { - runOffsetTest(0, 0, 10_000, 1, 0, -1, "TEST.OFFSET.EMPTY"); + runOffsetTest(0, 0, 10_000, 1, 0, -1, testName.getMethodName()); } @Test public void testOffsetWalk() throws Exception { - runOffsetLoopTest(10_000, 10_000, 9_000, 200, 200, 9_000, "TEST.OFFSET.WALK", 8, false); + runOffsetLoopTest(10_000, 10_000, 9_000, 200, 200, 9_000, testName.getMethodName(), 8, false); } @Test public void testOffsetRepeat() throws Exception { - runOffsetLoopTest(10_000, 10_000, 7_000, 133, 133, 7_000, "TEST.OFFSET.REPEAT", 10, true); + runOffsetLoopTest(10_000, 10_000, 7_000, 133, 133, 7_000, testName.getMethodName(), 10, true); } private void sendMessages(int count, String queueName) throws JMSException { @@ -198,12 +208,15 @@ public class KahaDBOffsetRecoveryListenerTest { private int receiveMessages(String queueName) throws JMSException { int rc=0; ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); + cf.setWatchTopicAdvisories(false); + Connection connection = cf.createConnection(); + try { connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer messageConsumer = session.createConsumer(new ActiveMQQueue(queueName)); - while ( messageConsumer.receive(1000) !=null ) { + while (messageConsumer.receive(1_000L) != null) { rc++; } return rc; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@activemq.apache.org For additional commands, e-mail: commits-h...@activemq.apache.org For further information, visit: https://activemq.apache.org/contact