This is an automated email from the ASF dual-hosted git repository.

mattrpav pushed a commit to branch activemq-5.19.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.19.x by this push:
     new 1b50853071 [AMQ-9646] Support selecting specific messages for command 
line backup (#1377)
1b50853071 is described below

commit 1b50853071ba27f2571658fcbf88d429b7657e04
Author: Matt Pavlovich <[email protected]>
AuthorDate: Sat Feb 8 09:13:19 2025 -0600

    [AMQ-9646] Support selecting specific messages for command line backup 
(#1377)
    
    (cherry picked from commit 572414dbbc889adce11a020ff141d60391ac28e9)
---
 .../activemq/store/MessageRecoveryContext.java     | 186 +++++++++++++++++++++
 .../org/apache/activemq/store/MessageStore.java    |   8 +-
 .../apache/activemq/store/ProxyMessageStore.java   |   6 +-
 .../activemq/store/memory/MemoryMessageStore.java  |  28 +---
 .../activemq/store/MessageRecoveryContextTest.java | 137 +++++++++++++++
 .../console/command/store/StoreBackup.java         | 155 ++++++++++++-----
 .../activemq/store/jdbc/JDBCMessageStore.java      |  13 --
 .../apache/activemq/store/kahadb/KahaDBStore.java  |  61 +++++--
 .../activemq/store/kahadb/TempKahaDBStore.java     |  32 ----
 .../region/cursors/StoreQueueCursorOrderTest.java  |   5 -
 .../kahadb/KahaDBOffsetRecoveryListenerTest.java   | 121 +++++++++++---
 11 files changed, 584 insertions(+), 168 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/MessageRecoveryContext.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/MessageRecoveryContext.java
new file mode 100644
index 0000000000..da2b198ee2
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/MessageRecoveryContext.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+
+public class MessageRecoveryContext implements MessageRecoveryListener {
+
+    public static final int DEFAULT_MAX_MESSAGE_COUNT_RETURNED = 100;
+    public static final boolean DEFAULT_USE_DEDICATED_CURSOR = true;
+
+    // Config
+    private final boolean useDedicatedCursor;
+    private final int maxMessageCountReturned;
+    private final Long offset;
+    private final String startMessageId;
+    private String endMessageId = null;
+    private final MessageRecoveryListener messageRecoveryListener;
+
+    // State
+    private Long endSequenceId = Long.MAX_VALUE;
+
+    // Stats
+    private final AtomicInteger recoveredCount = new AtomicInteger(0);
+
+    MessageRecoveryContext(final MessageRecoveryListener 
messageRecoveryListener, final String startMessageId, 
+            final String endMessageId, final Long offset, final Integer 
maxMessageCountReturned, 
+            final Boolean useDedicatedCursor) {
+        if(maxMessageCountReturned != null && maxMessageCountReturned < 0) {
+            throw new IllegalArgumentException("maxMessageCountReturned must 
be a positive integer value");
+        }
+        if(messageRecoveryListener == null) {
+            throw new IllegalArgumentException("MessageRecoveryListener must 
be specified");
+        }
+        if(offset != null) {
+            if(offset < 0L) {
+                throw new IllegalArgumentException("offset must be a 
non-negative integer value");
+            }
+            if(startMessageId != null) { 
+                throw new IllegalArgumentException("Only one of offset and 
startMessageId may be specified");
+            }
+        }
+        this.endMessageId = endMessageId;
+        this.maxMessageCountReturned = (maxMessageCountReturned != null ? 
maxMessageCountReturned : DEFAULT_MAX_MESSAGE_COUNT_RETURNED);
+        this.messageRecoveryListener = messageRecoveryListener;
+        this.offset = offset;
+        this.startMessageId = startMessageId;
+        this.useDedicatedCursor = (useDedicatedCursor != null ? 
useDedicatedCursor : DEFAULT_USE_DEDICATED_CURSOR);
+    }
+
+    public boolean isUseDedicatedCursor() {
+        return this.useDedicatedCursor;
+    }
+
+    public int getMaxMessageCountReturned() {
+        return this.maxMessageCountReturned;
+    }
+
+    public Long getOffset() {
+        return this.offset;
+    }
+
+    public String getEndMessageId() {
+        return this.endMessageId;
+    }
+
+    public String getStartMessageId() {
+        return this.startMessageId;
+    }
+
+    public MessageRecoveryListener getMessageRecoveryListener() {
+        return this.messageRecoveryListener;
+    }
+
+    // MessageRecoveryContext functions
+    public void setEndSequenceId(long endSequenceId) {
+        this.endSequenceId = endSequenceId;
+    }
+
+    public boolean canRecoveryNextMessage(Long sequenceId) {
+        if (getRecoveredCount() >= getMaxMessageCountReturned() || 
+            !this.messageRecoveryListener.canRecoveryNextMessage() ||
+            sequenceId >= endSequenceId) {
+            return false;
+        }
+        return true;
+    }
+
+    // MessageRecoveryListener functions
+    public boolean recoverMessage(Message message) throws Exception {
+        boolean tmpReturned = 
this.messageRecoveryListener.recoverMessage(message);
+        if(tmpReturned) {
+            this.recoveredCount.incrementAndGet();
+        }
+        return tmpReturned;
+    }
+
+    @Override
+    public boolean recoverMessageReference(MessageId ref) throws Exception {
+        return this.messageRecoveryListener.recoverMessageReference(ref);
+    }
+
+    @Override
+    public boolean hasSpace() {
+        return this.messageRecoveryListener.hasSpace();
+    }
+
+    @Override
+    public boolean isDuplicate(MessageId ref) {
+        return this.messageRecoveryListener.isDuplicate(ref);
+    }
+
+    // Metrics
+    public int getRecoveredCount() {
+        return this.recoveredCount.get();
+    }
+
+    @Override
+    public String toString() {
+        return "MessageRecoveryContext [useDedicatedCursor=" + 
useDedicatedCursor + ", maxMessageCountReturned="
+                + maxMessageCountReturned + ", offset=" + offset + ", 
startMessageId=" + startMessageId
+                + ", endMessageId=" + endMessageId + ", 
messageRecoveryListener=" + messageRecoveryListener
+                + ", endSequenceId=" + endSequenceId + ", recoveredCount=" + 
recoveredCount + "]";
+    }
+
+    public static class Builder {
+
+        private Boolean useDedicatedCursor;
+        private Integer maxMessageCountReturned;
+        private Long offset;
+        private String startMessageId;
+        private String endMessageId;
+        private MessageRecoveryListener messageRecoveryListener;
+
+        public Builder useDedicatedCursor(final boolean useDedicatedCursor) {
+            this.useDedicatedCursor = useDedicatedCursor;
+            return this;
+        }
+
+        public Builder maxMessageCountReturned(final int 
maxMessageCountReturned) {
+            this.maxMessageCountReturned = maxMessageCountReturned;
+            return this;
+        }
+
+        public Builder offset(final long offset) {
+            this.offset = offset;
+            return this;
+        }
+
+        public Builder endMessageId(final String endMessageId) {
+            this.endMessageId = endMessageId;
+            return this;
+        }
+
+        public Builder startMessageId(final String startMessageId) {
+            this.startMessageId = startMessageId;
+            return this;
+        }
+
+        public Builder messageRecoveryListener(final MessageRecoveryListener 
messageRecoveryListener) {
+            this.messageRecoveryListener = messageRecoveryListener;
+            return this;
+        }
+
+        public MessageRecoveryContext build() {
+            return new MessageRecoveryContext(messageRecoveryListener, 
startMessageId, endMessageId, offset, maxMessageCountReturned, 
useDedicatedCursor);
+        }
+    }
+}
+
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java 
b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
index e35327f848..aa0e85e0ec 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
@@ -176,9 +176,12 @@ public interface MessageStore extends Service {
      */
     void resetBatching();
 
-    void recoverNextMessages(int maxReturned, MessageRecoveryListener 
listener) throws Exception;
 
-    void recoverNextMessages(int offset, int maxReturned, 
MessageRecoveryListener listener) throws Exception;
+    default void recoverMessages(final MessageRecoveryContext 
messageRecoveryContext) throws Exception {
+        throw new 
UnsupportedOperationException("recoverMessages(messageRecoveryContext) is not 
supported");
+    }
+
+    void recoverNextMessages(int maxReturned, MessageRecoveryListener 
listener) throws Exception;
 
     void dispose(ConnectionContext context);
 
@@ -211,4 +214,5 @@ public interface MessageStore extends Service {
     void updateMessage(Message message) throws IOException;
 
     void registerIndexListener(IndexListener indexListener);
+
 }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
index a4fb4be5b8..d2f953aef5 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
@@ -106,13 +106,13 @@ public class ProxyMessageStore implements MessageStore {
     }
 
     @Override
-    public void recoverNextMessages(int maxReturned, MessageRecoveryListener 
listener) throws Exception {
+    public void recoverNextMessages(final int maxReturned, final 
MessageRecoveryListener listener) throws Exception {
         delegate.recoverNextMessages(maxReturned, listener);
     }
 
     @Override
-    public void recoverNextMessages(int offset, int maxReturned, 
MessageRecoveryListener listener) throws Exception {
-        delegate.recoverNextMessages(offset, maxReturned, listener);
+    public void recoverMessages(final MessageRecoveryContext 
messageRecoveryContext) throws Exception {
+        delegate.recoverMessages(messageRecoveryContext);
     }
 
     @Override
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
index f0857fb960..7a0f69b04e 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
@@ -111,7 +111,7 @@ public class MemoryMessageStore extends 
AbstractMessageStore {
     }
 
     @Override
-    public void recoverNextMessages(int maxReturned, MessageRecoveryListener 
listener) throws Exception {
+    public void recoverNextMessages(final int maxReturned, final 
MessageRecoveryListener listener) throws Exception {
         synchronized (messageTable) {
             boolean pastLackBatch = lastBatchId == null;
             for (Map.Entry<MessageId, Message> entry : 
messageTable.entrySet()) {
@@ -130,32 +130,6 @@ public class MemoryMessageStore extends 
AbstractMessageStore {
         }
     }
 
-    @Override
-    public void recoverNextMessages(int offset, int maxReturned, 
MessageRecoveryListener listener) throws Exception {
-        synchronized (messageTable) {
-            boolean pastLackBatch = lastBatchId == null;
-            int position = 0;
-            for (Map.Entry<MessageId, Message> entry : 
messageTable.entrySet()) {
-                if(offset > 0 && offset > position) {
-                    position++;
-                    continue;
-                }
-                if (pastLackBatch) {
-                    Object msg = entry.getValue();
-                    lastBatchId = entry.getKey();
-                    if (msg.getClass() == MessageId.class) {
-                        listener.recoverMessageReference((MessageId) msg);
-                    } else {
-                        listener.recoverMessage((Message) msg);
-                    }
-                } else {
-                    pastLackBatch = entry.getKey().equals(lastBatchId);
-                }
-                position++;
-            }
-        }
-    }
-
     @Override
     public void resetBatching() {
         lastBatchId = null;
diff --git 
a/activemq-broker/src/test/java/org/apache/activemq/store/MessageRecoveryContextTest.java
 
b/activemq-broker/src/test/java/org/apache/activemq/store/MessageRecoveryContextTest.java
new file mode 100644
index 0000000000..f77c94b5cd
--- /dev/null
+++ 
b/activemq-broker/src/test/java/org/apache/activemq/store/MessageRecoveryContextTest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import static org.junit.Assert.*;
+
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.junit.Test;
+
+public class MessageRecoveryContextTest {
+
+    @Test
+    public void testConfigOffset() {
+        MessageRecoveryContext messageRecoveryContext =
+                new MessageRecoveryContext.Builder()
+                    .maxMessageCountReturned(999)
+                    .messageRecoveryListener(new TestMessageRecoveryListener())
+                    .offset(10_000)
+                    .build();
+
+        assertNotNull(messageRecoveryContext);
+        assertNull(messageRecoveryContext.getEndMessageId());
+        assertEquals(Integer.valueOf(999), 
Integer.valueOf(messageRecoveryContext.getMaxMessageCountReturned()));
+        assertNotNull(messageRecoveryContext.getMessageRecoveryListener());
+        assertEquals(Long.valueOf(10_000l), 
Long.valueOf(messageRecoveryContext.getOffset()));
+        assertNull(messageRecoveryContext.getStartMessageId());
+        assertTrue(messageRecoveryContext.isUseDedicatedCursor());
+    }
+
+    @Test
+    public void testConfigOffsetNoDedicatedCursor() {
+        MessageRecoveryContext messageRecoveryContext =
+                new MessageRecoveryContext.Builder()
+                    .maxMessageCountReturned(999)
+                    .messageRecoveryListener(new TestMessageRecoveryListener())
+                    .offset(10_000)
+                    .useDedicatedCursor(false)
+                    .build();
+
+        assertNotNull(messageRecoveryContext);
+        assertNull(messageRecoveryContext.getEndMessageId());
+        assertEquals(Integer.valueOf(999), 
Integer.valueOf(messageRecoveryContext.getMaxMessageCountReturned()));
+        assertNotNull(messageRecoveryContext.getMessageRecoveryListener());
+        assertEquals(Long.valueOf(10_000l), 
Long.valueOf(messageRecoveryContext.getOffset()));
+        assertNull(messageRecoveryContext.getStartMessageId());
+        assertFalse(messageRecoveryContext.isUseDedicatedCursor());
+    }
+
+    @Test
+    public void testConfigStartEndMsgId() {
+        MessageRecoveryContext messageRecoveryContext =
+                new MessageRecoveryContext.Builder()
+                    .endMessageId("ID-end-99")
+                    .maxMessageCountReturned(77)
+                    .messageRecoveryListener(new TestMessageRecoveryListener())
+                    .startMessageId("ID-start-12")
+                    .build();
+
+        assertNotNull(messageRecoveryContext);
+        assertEquals("ID-end-99", messageRecoveryContext.getEndMessageId());
+        assertEquals(Integer.valueOf(77), 
Integer.valueOf(messageRecoveryContext.getMaxMessageCountReturned()));
+        assertNotNull(messageRecoveryContext.getMessageRecoveryListener());
+        assertNull(messageRecoveryContext.getOffset());
+        assertEquals("ID-start-12", 
messageRecoveryContext.getStartMessageId());
+        assertTrue(messageRecoveryContext.isUseDedicatedCursor());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testInvalidMaxReturned() {
+        new MessageRecoveryContext.Builder()
+            .maxMessageCountReturned(-33)
+            .messageRecoveryListener(new TestMessageRecoveryListener())
+            .build();
+}
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testInvalidMessageRecoveryListener() {
+        new MessageRecoveryContext.Builder()
+            .maxMessageCountReturned(44)
+            .messageRecoveryListener(null)
+            .startMessageId("ID-start-12")
+            .build();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testInvalidOffset() {
+        new MessageRecoveryContext.Builder()
+            .maxMessageCountReturned(33)
+            .messageRecoveryListener(new TestMessageRecoveryListener())
+            .offset(-1_000L)
+            .build();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testInvalidOffsetAndStartMessageId() {
+        new MessageRecoveryContext.Builder()
+            .maxMessageCountReturned(33)
+            .messageRecoveryListener(new TestMessageRecoveryListener())
+            .offset(1_000L)
+            .startMessageId("ID-start-37")
+            .build();
+    }
+
+    static class TestMessageRecoveryListener implements 
MessageRecoveryListener {
+        @Override
+        public boolean recoverMessageReference(MessageId ref) throws Exception 
{
+            return false;
+        }
+        @Override
+        public boolean recoverMessage(Message message) throws Exception {
+            return false;
+        }
+        @Override
+        public boolean isDuplicate(MessageId ref) {
+            return false;
+        }
+        @Override
+        public boolean hasSpace() {
+            return false;
+        }
+    }
+}
diff --git 
a/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreBackup.java
 
b/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreBackup.java
index 026accf0eb..95c6622d6b 100644
--- 
a/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreBackup.java
+++ 
b/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreBackup.java
@@ -22,7 +22,10 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
@@ -38,12 +41,15 @@ import 
org.apache.activemq.console.command.store.protobuf.MessagePB;
 import org.apache.activemq.console.command.store.protobuf.QueueEntryPB;
 import org.apache.activemq.console.command.store.protobuf.QueuePB;
 import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.store.MessageRecoveryContext;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionRecoveryListener;
-import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.protobuf.UTF8Buffer;
@@ -52,6 +58,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 
 public class StoreBackup {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(StoreBackup.class);
+
     static final int OPENWIRE_VERSION = 11;
     static final boolean TIGHT_ENCODING = false;
 
@@ -61,7 +69,12 @@ public class StoreBackup {
 
     String queue;
     Integer offset;
-    Integer count;
+    Integer count = Integer.MAX_VALUE;
+    String indexes;
+    Collection<Integer> indexesList;
+
+    String startMsgId;
+    String endMsgId;
 
     private final ObjectMapper mapper = new ObjectMapper();
     private final AsciiBuffer ds_kind = new AsciiBuffer("ds");
@@ -78,24 +91,38 @@ public class StoreBackup {
 
     public void execute() throws Exception {
         if (config == null) {
-            throw new Exception("required --config option missing");
+            throw new IllegalArgumentException("required --config option 
missing");
         }
         if (filename == null) {
-            throw new Exception("required --filename option missing");
+            throw new IllegalArgumentException("required --filename option 
missing");
         }
 
         if (offset != null && count == null) {
-            throw new Exception("optional --offset and --count must be 
specified together");
+            throw new IllegalArgumentException("optional --offset and --count 
must be specified together");
+        }
+
+        if ((startMsgId != null || endMsgId != null) && queue == null) {
+            throw new IllegalArgumentException("optional --queue must be 
specified when using startMsgId or endMsgId");
+        }
+
+        if (indexes != null && !indexes.isBlank()) {
+            indexesList = Stream.of(indexes.split(","))
+                .map(index -> Integer.parseInt(index.trim()))
+                .peek(num -> {
+                    if (num < 0) {
+                        throw new IllegalArgumentException("Index value cannot 
be negative: " + num);
+                    }
+                }).collect(Collectors.toList());
         }
 
         setFile(new File(filename));
-        System.out.println("Loading: " + config);
+        logger.info("Loading config file:{} ", config);
         BrokerFactory.setStartDefault(false); // to avoid the broker 
auto-starting..
         BrokerService broker = BrokerFactory.createBroker(config);
         BrokerFactory.resetStartDefault();
         PersistenceAdapter store = broker.getPersistenceAdapter();
 
-        System.out.println("Starting: " + store);
+        logger.info("Starting: " + store);
         store.start();
         try(BufferedOutputStream fos = new BufferedOutputStream(new 
FileOutputStream(file))) {
             export(store, fos);
@@ -119,7 +146,7 @@ public class StoreBackup {
         });
 
         if (preparedTxs[0] > 0) {
-            throw new Exception("Cannot export a store with prepared XA 
transactions.  Please commit or rollback those transactions before attempting 
to backup.");
+            throw new IllegalStateException("Cannot export a store with 
prepared XA transactions.  Please commit or rollback those transactions before 
attempting to backup.");
         }
 
         for (ActiveMQDestination odest : store.getDestinations()) {
@@ -143,45 +170,58 @@ public class StoreBackup {
                 jsonMap.put("@class", "queue_destination");
                 jsonMap.put("name", dest.getQueueName());
                 String json = mapper.writeValueAsString(jsonMap);
-                System.out.println(json);
+                logger.info("Queue info:{}", json);
                 destRecord.setBindingData(new UTF8Buffer(json));
                 manager.store_queue(destRecord);
 
-                MessageRecoveryListener queueRecoveryListener = new 
MessageRecoveryListener() {
-
-                    @Override
-                    public boolean hasSpace() {
-                        return true;
+                MessageRecoveryContext.Builder builder = 
+                        new MessageRecoveryContext.Builder()
+                            .maxMessageCountReturned(count)
+                            .messageRecoveryListener(new 
MessageRecoveryListener() {
+
+                            @Override
+                            public boolean hasSpace() {
+                                return true;
+                            }
+
+                            @Override
+                            public boolean recoverMessageReference(MessageId 
ref) throws Exception {
+                                return true;
+                            }
+
+                            @Override
+                            public boolean isDuplicate(MessageId ref) {
+                                return false;
+                            }
+
+                            @Override
+                            public boolean recoverMessage(Message message) 
throws IOException {
+                                messageKeyCounter[0]++;
+                                seqKeyCounter[0]++;
+
+                                MessagePB messageRecord = 
createMessagePB(message, messageKeyCounter[0]);
+                                manager.store_message(messageRecord);
+
+                                QueueEntryPB entryRecord = 
createQueueEntryPB(message, containerKeyCounter[0], seqKeyCounter[0], 
messageKeyCounter[0]);
+                                manager.store_queue_entry(entryRecord);
+
+                                return true;
+                            }
+                        });
+
+                if(startMsgId != null || endMsgId != null) {
+                    logger.info("Backing up from startMsgId:{} to endMsgId:{} 
", startMsgId, endMsgId);
+                    
queue.recoverMessages(builder.endMessageId(endMsgId).startMessageId(startMsgId).build());
+                } else if(indexesList != null) {
+                    logger.info("Backing up using indexes count:{}", 
indexesList.size());
+                    for(int idx : indexesList) {
+                        
queue.recoverMessages(builder.maxMessageCountReturned(1).offset(idx).build());
                     }
-
-                    @Override
-                    public boolean recoverMessageReference(MessageId ref) 
throws Exception {
-                        return true;
-                    }
-
-                    @Override
-                    public boolean isDuplicate(MessageId ref) {
-                        return false;
-                    }
-
-                    @Override
-                    public boolean recoverMessage(Message message) throws 
IOException {
-                        messageKeyCounter[0]++;
-                        seqKeyCounter[0]++;
-
-                        MessagePB messageRecord = createMessagePB(message, 
messageKeyCounter[0]);
-                        manager.store_message(messageRecord);
-
-                        QueueEntryPB entryRecord = createQueueEntryPB(message, 
containerKeyCounter[0], seqKeyCounter[0], messageKeyCounter[0]);
-                        manager.store_queue_entry(entryRecord);
-
-                        return true;
-                    }
-                };
-                if(offset != null) {
-                    queue.recoverNextMessages(offset, count, 
queueRecoveryListener);
+                } else if(offset != null) {
+                    logger.info("Backing up from offset:{} count:{} ", offset, 
count);
+                    queue.recoverMessages(builder.offset(offset).build());
                 } else {
-                    queue.recover(queueRecoveryListener);
+                    queue.recover(builder.build());
                 }
             } else if (odest instanceof ActiveMQTopic) {
                 ActiveMQTopic dest = (ActiveMQTopic) odest;
@@ -193,7 +233,6 @@ public class StoreBackup {
                     destRecord.setKey(containerKeyCounter[0]);
                     destRecord.setBindingKind(ds_kind);
 
-                    // TODO: use a real JSON encoder like jackson.
                     HashMap<String, Object> jsonMap = new HashMap<String, 
Object>();
                     jsonMap.put("@class", "dsub_destination");
                     jsonMap.put("name", sub.getClientId() + ":" + 
sub.getSubscriptionName());
@@ -205,7 +244,7 @@ public class StoreBackup {
                     }
                     jsonMap.put("noLocal", sub.isNoLocal());
                     String json = mapper.writeValueAsString(jsonMap);
-                    System.out.println(json);
+                    logger.info("Topic info:{}", json);
 
                     destRecord.setBindingData(new UTF8Buffer(json));
                     manager.store_queue(destRecord);
@@ -265,14 +304,14 @@ public class StoreBackup {
     private MessagePB createMessagePB(Message message, long messageKey) throws 
IOException {
         DataByteArrayOutputStream mos = new DataByteArrayOutputStream();
         mos.writeBoolean(TIGHT_ENCODING);
-        mos.writeVarInt(OPENWIRE_VERSION);
+        mos.writeInt(OPENWIRE_VERSION);
         wireformat.marshal(message, mos);
 
         MessagePB messageRecord = new MessagePB();
         messageRecord.setCodec(codec_id);
         messageRecord.setMessageKey(messageKey);
         messageRecord.setSize(message.getSize());
-        messageRecord.setValue(new Buffer(mos.toBuffer().getData()));
+        messageRecord.setValue(new Buffer(mos.getData()));
         return messageRecord;
     }
 
@@ -323,4 +362,28 @@ public class StoreBackup {
     public Integer getCount() {
         return count;
     }
+
+    public void setIndexes(String indexes) {
+        this.indexes = indexes;
+    }
+
+    public String getIndexes() {
+        return indexes;
+    }
+
+    public String getStartMsgId() {
+        return startMsgId;
+    }
+
+    public void setStartMsgId(String startMsgId) {
+        this.startMsgId = startMsgId;
+    }
+
+    public String getEndMsgId() {
+        return endMsgId;
+    }
+
+    public void setEndMsgId(String endMsgId) {
+        this.endMsgId = endMsgId;
+    }
 }
diff --git 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
index 70ddb7ab1e..8adc2f78ee 100644
--- 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
+++ 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
@@ -398,19 +398,6 @@ public class JDBCMessageStore extends AbstractMessageStore 
{
 
     }
 
-    /**
-     * @param offset
-     * @param maxReturned
-     * @param listener
-     * @throws Exception
-     * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int,
-     *      org.apache.activemq.store.MessageRecoveryListener)
-     */
-    @Override
-    public void recoverNextMessages(int offset, int maxReturned, final 
MessageRecoveryListener listener) throws Exception {
-        throw new 
UnsupportedOperationException("recoverNextMesage(offset,maxReturned,listener) 
is not supported.");
-    }
-
     public void trackRollbackAck(Message message) {
         synchronized (rolledBackAcks) {
             rolledBackAcks.put((Long)message.getMessageId().getEntryLocator(), 
message);
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 26341b8b29..b987eea6d1 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -28,6 +28,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
@@ -61,6 +62,7 @@ import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.store.AbstractMessageStore;
 import org.apache.activemq.store.IndexListener;
 import org.apache.activemq.store.ListenableFuture;
+import org.apache.activemq.store.MessageRecoveryContext;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.MessageStoreStatistics;
@@ -683,8 +685,9 @@ public class KahaDBStore extends MessageDatabase implements 
PersistenceAdapter,
                         StoredDestination sd = getStoredDestination(dest, tx);
                         recoverRolledBackAcks(destination.getPhysicalName(), 
sd, tx, Integer.MAX_VALUE, listener);
                         sd.orderIndex.resetCursorPosition();
-                        for (Iterator<Entry<Long, MessageKeys>> iterator = 
sd.orderIndex.iterator(tx); listener.hasSpace() && iterator
-                                .hasNext(); ) {
+                        for (Iterator<Entry<Long, MessageKeys>> iterator = 
+                                sd.orderIndex.iterator(tx, new 
MessageOrderCursor()); listener.hasSpace() && 
+                                iterator.hasNext(); ) {
                             Entry<Long, MessageKeys> entry = iterator.next();
                             Set<String> ackedAndPrepared = 
ackedAndPreparedMap.get(destination.getPhysicalName());
                             if (ackedAndPrepared != null && 
ackedAndPrepared.contains(entry.getValue().messageId)) {
@@ -733,35 +736,61 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
         }
 
         @Override
-        public void recoverNextMessages(final int offset, final int 
maxReturned, final MessageRecoveryListener listener) throws Exception {
+        public void recoverMessages(final MessageRecoveryContext 
messageRecoveryContext) throws Exception {
+
+            if(messageRecoveryContext == null || 
+               (messageRecoveryContext.getStartMessageId() != null &&
+                messageRecoveryContext.getOffset() != null)) {
+                LOG.warn("Invalid messageRecoveryContext:{}", 
messageRecoveryContext);
+                throw new IllegalArgumentException("Invalid 
messageRecoveryContext specified");
+            }
+
             indexLock.writeLock().lock();
             try {
                 pageFile.tx().execute(new Transaction.Closure<Exception>() {
                     @Override
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
+
+                        Long startSequenceOffset = null;
+                        Long endSequenceOffset = null;
+
+                        if(messageRecoveryContext.getStartMessageId() != null 
&& !messageRecoveryContext.getStartMessageId().isBlank()) {
+                            startSequenceOffset = 
Optional.ofNullable(sd.messageIdIndex.get(tx, 
messageRecoveryContext.getStartMessageId())).orElse(0L);
+                        } else {
+                            startSequenceOffset = 
Optional.ofNullable(messageRecoveryContext.getOffset()).orElse(0L);
+                        }
+
+                        if(messageRecoveryContext.getEndMessageId() != null && 
!messageRecoveryContext.getEndMessageId().isBlank()) {
+                            endSequenceOffset = 
Optional.ofNullable(sd.messageIdIndex.get(tx, 
messageRecoveryContext.getEndMessageId()))
+                                                        
.orElse(startSequenceOffset + 
Long.valueOf(messageRecoveryContext.getMaxMessageCountReturned()));
+                        } else {
+                            endSequenceOffset = startSequenceOffset + 
Long.valueOf(messageRecoveryContext.getMaxMessageCountReturned());
+                        }
+
+                        if(endSequenceOffset < startSequenceOffset) {
+                            LOG.warn("Invalid offset parameters start:{} 
end:{}", startSequenceOffset, endSequenceOffset);
+                            throw new IllegalStateException("Invalid offset 
parameters start:" + startSequenceOffset + " end:" + endSequenceOffset);
+                        }
+
+                        
messageRecoveryContext.setEndSequenceId(endSequenceOffset);
                         Entry<Long, MessageKeys> entry = null;
-                        int position = 0;
-                        int counter = 
recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, maxReturned, 
listener);
-                        Set ackedAndPrepared = 
ackedAndPreparedMap.get(destination.getPhysicalName());
-                        for (Iterator<Entry<Long, MessageKeys>> iterator = 
sd.orderIndex.iterator(tx); iterator.hasNext(); ) {
+                        recoverRolledBackAcks(destination.getPhysicalName(), 
sd, tx, messageRecoveryContext.getMaxMessageCountReturned(), 
messageRecoveryContext);
+                        Set<String> ackedAndPrepared = 
ackedAndPreparedMap.get(destination.getPhysicalName());
+                        Iterator<Entry<Long, MessageKeys>> iterator = 
(messageRecoveryContext.isUseDedicatedCursor() ? sd.orderIndex.iterator(tx, new 
MessageOrderCursor(startSequenceOffset)) : sd.orderIndex.iterator(tx));
+
+                        while (iterator.hasNext()) {
                             entry = iterator.next();
 
                             if (ackedAndPrepared != null && 
ackedAndPrepared.contains(entry.getValue().messageId)) {
                                 continue;
                             }
 
-                            if(offset > 0 && offset > position) {
-                                position++;
-                                continue;
-                            }
-
                             Message msg = 
loadMessage(entry.getValue().location);
                             
msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
-                            listener.recoverMessage(msg);
-                            counter++;
-                            position++;
-                            if (counter >= maxReturned || 
!listener.canRecoveryNextMessage()) {
+
+                            messageRecoveryContext.recoverMessage(msg);
+                            if 
(!messageRecoveryContext.canRecoveryNextMessage(entry.getKey())) {
                                 break;
                             }
                         }
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
index 7835a1b7b6..f7a49b8c18 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
@@ -241,43 +241,11 @@ public class TempKahaDBStore extends TempMessageDatabase 
implements PersistenceA
             }
         }
 
-        @Override
-        public void recoverNextMessages(final int offset, final int 
maxReturned, final MessageRecoveryListener listener) throws Exception {
-            synchronized(indexMutex) {
-                pageFile.tx().execute(new Transaction.Closure<Exception>(){
-                    @Override
-                    public void execute(Transaction tx) throws Exception {
-                        StoredDestination sd = getStoredDestination(dest, tx);
-                        Entry<Long, MessageRecord> entry=null;
-                        int counter = 0;
-                        int position = 0;
-                        for (Iterator<Entry<Long, MessageRecord>> iterator = 
sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
-                            entry = iterator.next();
-                            if(offset > 0 && offset > position) {
-                                position++;
-                                continue;
-                            }
-                            listener.recoverMessage( (Message) 
wireFormat.unmarshal(entry.getValue().data ) );
-                            counter++;
-                            position++;
-                            if( counter >= maxReturned ) {
-                                break;
-                            }
-                        }
-                        if( entry!=null ) {
-                            cursorPos = entry.getKey()+1;
-                        }
-                    }
-                });
-            }
-        }
-
         @Override
         public void resetBatching() {
             cursorPos=0;
         }
 
-
         @Override
         public void setBatch(MessageId identity) throws IOException {
             final String key = identity.toProducerKey();
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
index cbb1579b57..5a1ab90d58 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
@@ -510,11 +510,6 @@ public class StoreQueueCursorOrderTest {
             }
         }
 
-        @Override
-        public void recoverNextMessages(int offset, int maxReturned, 
MessageRecoveryListener listener) throws Exception {
-
-        }
-
         @Override
         public void setBatch(MessageId message) {
             batch.set((Long)message.getFutureOrSequenceLong());
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java
index 9aefbbf385..f5df1fb9c8 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java
@@ -27,11 +27,14 @@ import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageRecoveryContext;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;
@@ -44,8 +47,10 @@ import static org.junit.Assert.assertEquals;
 
 public class KahaDBOffsetRecoveryListenerTest {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(KahaDBOffsetRecoveryListenerTest.class);
+
     protected BrokerService brokerService = null;
-    protected KahaDBStore kaha = null;
+    protected BrokerService restartBrokerService = null;
 
     @Before
     public void beforeEach() throws Exception {
@@ -55,7 +60,7 @@ public class KahaDBOffsetRecoveryListenerTest {
     @After
     public void afterEach() {
         brokerService = null;
-        kaha = null;
+        restartBrokerService = null;
     }
 
     protected BrokerService createBroker(KahaDBStore kaha) throws Exception {
@@ -69,39 +74,67 @@ public class KahaDBOffsetRecoveryListenerTest {
 
     private KahaDBStore createStore(boolean delete) throws IOException {
         KahaDBStore kaha = new KahaDBStore();
-        kaha.setDirectory(new 
File("target/activemq-data/kahadb-recovery-tests"));
+        kaha.setJournalMaxFileLength(1024*100);
+        kaha.setDirectory(new File("target" + File.separator + "activemq-data" 
+ File.separator + "kahadb-recovery-tests"));
         if( delete ) {
             kaha.deleteAllMessages();
         }
         return kaha;
     }
 
-    protected void runOffsetTest(int sendCount, int expectedMessageCount, int 
recoverOffset, int recoverCount, int expectedRecoverCount, int 
expectedRecoverIndex, String queueName) throws Exception {
-        kaha = createStore(true);
-        kaha.setJournalMaxFileLength(1024*100);
-        brokerService = createBroker(kaha);
+    protected void runOffsetTest(final int sendCount, final int 
expectedMessageCount, final int recoverOffset, final int recoverCount, final 
int expectedRecoverCount, final int expectedRecoverIndex, final String 
queueName) throws Exception {
+        runOffsetLoopTest(sendCount, expectedMessageCount, recoverOffset, 
recoverCount, expectedRecoverCount, expectedRecoverIndex, queueName, 1, false);
+    }
+
+    protected void runOffsetLoopTest(final int sendCount, final int 
expectedMessageCount, final int recoverOffset, final int recoverCount, final 
int expectedRecoverCount, final int expectedRecoverIndex, final String 
queueName, final int loopCount, final boolean repeatExpected) throws Exception {
+        KahaDBStore kahaDBStore = createStore(true);
+        brokerService = createBroker(kahaDBStore);
         sendMessages(sendCount, queueName);
-        brokerService.stop();
-        brokerService.waitUntilStopped();
 
-        TestMessageRecoveryListener testMessageRecoveryListener = new 
TestMessageRecoveryListener();
-        kaha = createStore(false);
-        kaha.start();
-        MessageStore messageStore = kaha.createQueueMessageStore(new 
ActiveMQQueue(queueName));
-        messageStore.start();
-        assertEquals(Integer.valueOf(expectedMessageCount), 
Integer.valueOf(messageStore.getMessageCount()));
-        messageStore.recoverNextMessages(recoverOffset, recoverCount, 
testMessageRecoveryListener);
-        messageStore.stop();
-        kaha.stop();
+        MessageStore messageStore = kahaDBStore.createQueueMessageStore(new 
ActiveMQQueue(queueName));
+
+        int tmpExpectedRecoverCount = expectedRecoverCount;
+        int tmpExpectedRecoverIndex = expectedRecoverIndex;
+        int tmpRecoverOffset = recoverOffset;
+
+        for(int i=0; i<loopCount; i++) {
+            logger.info("Loop:{} recoverOffset:{} expectedRecoverCount:{} 
expectedRecoverIndex:{}", loopCount, tmpRecoverOffset, tmpExpectedRecoverCount, 
tmpExpectedRecoverIndex);
+
+            TestMessageRecoveryListener testMessageRecoveryListener = new 
TestMessageRecoveryListener();
+            assertEquals(Integer.valueOf(expectedMessageCount), 
Integer.valueOf(messageStore.getMessageCount()));
 
-        assertEquals(Integer.valueOf(expectedRecoverCount), 
Integer.valueOf(testMessageRecoveryListener.getRecoveredMessages().size()));
+            messageStore.recoverMessages(new MessageRecoveryContext.Builder()
+                    .messageRecoveryListener(testMessageRecoveryListener)
+                    .offset(tmpRecoverOffset)
+                    .maxMessageCountReturned(recoverCount).build());
 
-        if(expectedRecoverIndex >= 0) {
-            assertEquals(Integer.valueOf(expectedRecoverIndex), 
(Integer)testMessageRecoveryListener.getRecoveredMessages().get(0).getProperty("index"));
+            assertEquals(Integer.valueOf(tmpExpectedRecoverCount), 
Integer.valueOf(testMessageRecoveryListener.getRecoveredMessages().size()));
+
+            if(tmpExpectedRecoverIndex >= 0) {
+                assertEquals(Integer.valueOf(tmpExpectedRecoverIndex), 
(Integer)testMessageRecoveryListener.getRecoveredMessages().get(0).getProperty("index"));
+            }
+
+            if(!repeatExpected) {
+                int nextExpectedRecoverCount = 
calculateExpectedRecoverCount(tmpRecoverOffset, tmpExpectedRecoverCount, 
expectedMessageCount);
+                int nextExpectedRecoverIndex = 
calculateExpectedRecoverIndex(tmpRecoverOffset, tmpExpectedRecoverCount, 
tmpExpectedRecoverIndex, expectedMessageCount);
+                int nextRecoverOffset = 
calculateRecoverOffset(tmpRecoverOffset, recoverCount, expectedMessageCount);
+    
+                tmpExpectedRecoverCount = nextExpectedRecoverCount;
+                tmpExpectedRecoverIndex = nextExpectedRecoverIndex;
+                tmpRecoverOffset = nextRecoverOffset;
+            }
         }
 
-        brokerService = createBroker(kaha);
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+
+        restartBrokerService = createBroker(createStore(false));
+        restartBrokerService.start();
+        restartBrokerService.waitUntilStarted(30_000l);
         assertEquals(sendCount, receiveMessages(queueName));
+
+        restartBrokerService.stop();
+        restartBrokerService.waitUntilStopped();
     }
 
     @Test
@@ -134,10 +167,18 @@ public class KahaDBOffsetRecoveryListenerTest {
         runOffsetTest(0, 0, 10_000, 1, 0, -1, "TEST.OFFSET.EMPTY");
     }
 
+    @Test
+    public void testOffsetWalk() throws Exception {
+        runOffsetLoopTest(10_000, 10_000, 9_000, 200, 200, 9_000, 
"TEST.OFFSET.WALK", 8, false);
+    }
+
+    @Test
+    public void testOffsetRepeat() throws Exception {
+        runOffsetLoopTest(10_000, 10_000, 7_000, 133, 133, 7_000, 
"TEST.OFFSET.REPEAT", 10, true);
+    }
+
     private void sendMessages(int count, String queueName) throws JMSException 
{
         ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost");
-        cf.setUseAsyncSend(true);
-        cf.setProducerWindowSize(1024);
         cf.setWatchTopicAdvisories(false);
 
         Connection connection = cf.createConnection();
@@ -179,6 +220,38 @@ public class KahaDBOffsetRecoveryListenerTest {
         return sb.toString();
     }
 
+    private static int calculateExpectedRecoverCount(final int recoverOffset, 
final int expectedRecoverCount, final int expectedMessageCount) {
+        int nextOffset = calculateRecoverOffset(recoverOffset, 
expectedRecoverCount, expectedMessageCount);
+        if(nextOffset >= expectedMessageCount) {
+            return 0;
+        }
+
+        int nextRange = nextOffset + expectedRecoverCount;
+        int remaining = expectedMessageCount - nextRange;
+
+        if(remaining <= 0) {
+            return expectedRecoverCount - remaining;
+        }
+
+        return expectedRecoverCount;
+    }
+
+    private static int calculateExpectedRecoverIndex(final int recoverOffset, 
final int expectedRecoverCount, final int expectedRecoverIndex, final int 
expectedMessageCount) {
+        int nextOffset = calculateRecoverOffset(recoverOffset, 
expectedRecoverCount, expectedMessageCount);
+
+        if(nextOffset >= (expectedMessageCount - 1)) {
+            return -1;
+        }
+
+        return nextOffset;
+    }
+
+    private static int calculateRecoverOffset(final int recoverOffset, final 
int expectedRecoverCount, final int expectedMessageCount) {
+        return (recoverOffset + expectedRecoverCount);
+    }
+
+    // int tmpRecoverOffset = recoverOffset;
+
     static class TestMessageRecoveryListener implements 
MessageRecoveryListener {
 
         List<MessageId> recoveredMessageIds = new LinkedList<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to