This is an automated email from the ASF dual-hosted git repository.

mattrpav pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new ec633b3d9b [AMQ-9773] Fix for only one message being recovered from 
backup
ec633b3d9b is described below

commit ec633b3d9b0398cbe7814cd6bccf23f6c62fab15
Author: Matt Pavlovich <mattr...@apache.org>
AuthorDate: Wed Sep 24 18:02:12 2025 -0500

    [AMQ-9773] Fix for only one message being recovered from backup
---
 .../apache/activemq/store/kahadb/KahaDBStore.java  | 21 +++++++++---
 .../kahadb/KahaDBOffsetRecoveryListenerTest.java   | 37 +++++++++++++++-------
 2 files changed, 41 insertions(+), 17 deletions(-)

diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index f52c49421e..d0fa3bda84 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -754,6 +754,14 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
 
+                        /*
+                         The endSequenceOffset is used when only endMessageId 
is requested
+                         there is a disconnect between iterator offset and a 
destination's
+                         sequence key.
+
+                         If a destination has already processed messages, then 
the sequence key
+                         value is the number of total messages processed 
through the queue all-time.
+                         */
                         Long startSequenceOffset = null;
                         Long endSequenceOffset = null;
 
@@ -766,16 +774,15 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
                         if(messageRecoveryContext.getEndMessageId() != null && 
!messageRecoveryContext.getEndMessageId().isBlank()) {
                             endSequenceOffset = 
Optional.ofNullable(sd.messageIdIndex.get(tx, 
messageRecoveryContext.getEndMessageId()))
                                                         
.orElse(startSequenceOffset + 
Long.valueOf(messageRecoveryContext.getMaxMessageCountReturned()));
-                        } else {
-                            endSequenceOffset = startSequenceOffset + 
Long.valueOf(messageRecoveryContext.getMaxMessageCountReturned());
+                            
messageRecoveryContext.setEndSequenceId(endSequenceOffset);
                         }
 
-                        if(endSequenceOffset < startSequenceOffset) {
+                        if(endSequenceOffset != null &&
+                            endSequenceOffset < startSequenceOffset) {
                             LOG.warn("Invalid offset parameters start:{} 
end:{}", startSequenceOffset, endSequenceOffset);
                             throw new IllegalStateException("Invalid offset 
parameters start:" + startSequenceOffset + " end:" + endSequenceOffset);
                         }
 
-                        
messageRecoveryContext.setEndSequenceId(endSequenceOffset);
                         Entry<Long, MessageKeys> entry = null;
                         recoverRolledBackAcks(destination.getPhysicalName(), 
sd, tx, messageRecoveryContext.getMaxMessageCountReturned(), 
messageRecoveryContext);
                         Set<String> ackedAndPrepared = 
ackedAndPreparedMap.get(destination.getPhysicalName());
@@ -796,7 +803,11 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
                                 break;
                             }
                         }
-                        sd.orderIndex.stoppedIterating();
+
+                        // The sd.orderIndex uses the destination's cursor
+                        if(!messageRecoveryContext.isUseDedicatedCursor()) {
+                            sd.orderIndex.stoppedIterating();
+                        }
                     }
                 });
             } finally {
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java
index 1b3f1e8afa..613b620d72 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java
@@ -32,7 +32,9 @@ import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,9 +54,17 @@ public class KahaDBOffsetRecoveryListenerTest {
     protected BrokerService brokerService = null;
     protected BrokerService restartBrokerService = null;
 
+    @Rule
+    public TestName testName = new TestName();
+
+    protected final int PRETEST_MSG_COUNT = 17531;
+
     @Before
     public void beforeEach() throws Exception {
-
+        // Send+Recv a odd number of messages beyond cache sizes
+        // to confirm the queue's sequence number gets pushed off
+        sendMessages(PRETEST_MSG_COUNT, testName.getMethodName());
+        assertEquals(Integer.valueOf(PRETEST_MSG_COUNT), 
Integer.valueOf(receiveMessages(testName.getMethodName())));
     }
 
     @After
@@ -68,7 +78,7 @@ public class KahaDBOffsetRecoveryListenerTest {
         broker.setUseJmx(false);
         broker.setPersistenceAdapter(kaha);
         broker.start();
-        broker.waitUntilStarted(10_000l);
+        broker.waitUntilStarted(10_000L);
         return broker;
     }
 
@@ -130,7 +140,7 @@ public class KahaDBOffsetRecoveryListenerTest {
 
         restartBrokerService = createBroker(createStore(false));
         restartBrokerService.start();
-        restartBrokerService.waitUntilStarted(30_000l);
+        restartBrokerService.waitUntilStarted(30_000L);
         assertEquals(sendCount, receiveMessages(queueName));
 
         restartBrokerService.stop();
@@ -139,42 +149,42 @@ public class KahaDBOffsetRecoveryListenerTest {
 
     @Test
     public void testOffsetZero() throws Exception {
-        runOffsetTest(1_000, 1_000, 0, 1, 1, 0, "TEST.OFFSET.ZERO");
+        runOffsetTest(1_000, 1_000, 0, 1, 1, 0, testName.getMethodName());
     }
 
     @Test
     public void testOffsetOne() throws Exception {
-        runOffsetTest(1_000, 1_000, 1, 1, 1, 1, "TEST.OFFSET.ONE");
+        runOffsetTest(1_000, 1_000, 1, 1, 1, 1, testName.getMethodName());
     }
 
     @Test
     public void testOffsetLastMinusOne() throws Exception {
-        runOffsetTest(1_000, 1_000, 999, 1, 1, 999, 
"TEST.OFFSET.LASTMINUSONE");
+        runOffsetTest(1_000, 1_000, 999, 1, 1, 999, testName.getMethodName());
     }
 
     @Test
     public void testOffsetLast() throws Exception {
-        runOffsetTest(1_000, 1_000, 1_000, 1, 0, -1, "TEST.OFFSET.LAST");
+        runOffsetTest(1_000, 1_000, 1_000, 1, 0, -1, testName.getMethodName());
     }
 
     @Test
     public void testOffsetBeyondQueueSizeNoError() throws Exception {
-        runOffsetTest(1_000, 1_000, 10_000, 1, 0, -1, "TEST.OFFSET.BEYOND");
+        runOffsetTest(1_000, 1_000, 10_000, 1, 0, -1, 
testName.getMethodName());
     }
 
     @Test
     public void testOffsetEmptyQueue() throws Exception {
-        runOffsetTest(0, 0, 10_000, 1, 0, -1, "TEST.OFFSET.EMPTY");
+        runOffsetTest(0, 0, 10_000, 1, 0, -1, testName.getMethodName());
     }
 
     @Test
     public void testOffsetWalk() throws Exception {
-        runOffsetLoopTest(10_000, 10_000, 9_000, 200, 200, 9_000, 
"TEST.OFFSET.WALK", 8, false);
+        runOffsetLoopTest(10_000, 10_000, 9_000, 200, 200, 9_000, 
testName.getMethodName(), 8, false);
     }
 
     @Test
     public void testOffsetRepeat() throws Exception {
-        runOffsetLoopTest(10_000, 10_000, 7_000, 133, 133, 7_000, 
"TEST.OFFSET.REPEAT", 10, true);
+        runOffsetLoopTest(10_000, 10_000, 7_000, 133, 133, 7_000, 
testName.getMethodName(), 10, true);
     }
 
     private void sendMessages(int count, String queueName) throws JMSException 
{
@@ -198,12 +208,15 @@ public class KahaDBOffsetRecoveryListenerTest {
     private int receiveMessages(String queueName) throws JMSException {
         int rc=0;
         ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost");
+        cf.setWatchTopicAdvisories(false);
+
         Connection connection = cf.createConnection();
+
         try {
             connection.start();
             Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
             MessageConsumer messageConsumer = session.createConsumer(new 
ActiveMQQueue(queueName));
-            while ( messageConsumer.receive(1000) !=null ) {
+            while (messageConsumer.receive(1_000L) != null) {
                 rc++;
             }
             return rc;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@activemq.apache.org
For additional commands, e-mail: commits-h...@activemq.apache.org
For further information, visit: https://activemq.apache.org/contact


Reply via email to