This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch activemq-6.1.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-6.1.x by this push:
     new 655ee900ba [AMQ-9595] Fix recoverNextMessages when there are messages 
consumed farther than maxBatchSize.
655ee900ba is described below

commit 655ee900ba925ed6faec28e5fd4255ff9906bd37
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Fri Oct 18 15:26:32 2024 -0700

    [AMQ-9595] Fix recoverNextMessages when there are messages consumed farther 
than maxBatchSize.
    
    (cherry picked from commit 6b08e104208c1a344a1ab8a312450f855415ceb1)
---
 .../apache/activemq/store/kahadb/KahaDBStore.java  |   3 +-
 .../DurableSubscriptionPartialAckTest.java         | 114 +++++++++++++++++++++
 2 files changed, 115 insertions(+), 2 deletions(-)

diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index bde01d0404..27dfa12718 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -1198,14 +1198,13 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
                         StoredDestination sd = getStoredDestination(dest, tx);
                         sd.orderIndex.resetCursorPosition();
                         MessageOrderCursor moc = 
sd.subscriptionCursors.get(subscriptionKey);
-                        SequenceSet subAckPositions = null;
+                        SequenceSet subAckPositions = getSequenceSet(tx, sd, 
subscriptionKey);;
                         if (moc == null) {
                             LastAck pos = getLastAck(tx, sd, subscriptionKey);
                             if (pos == null) {
                                 // sub deleted
                                 return;
                             }
-                            subAckPositions = getSequenceSet(tx, sd, 
subscriptionKey);
                             //If we have ackPositions tracked then compare the 
first one as individual acknowledge mode
                             //may have bumped lastAck even though there are 
earlier messages to still consume
                             if (subAckPositions != null && 
!subAckPositions.isEmpty()
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionPartialAckTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionPartialAckTest.java
new file mode 100644
index 0000000000..31776199a9
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionPartialAckTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import jakarta.jms.Connection;
+import jakarta.jms.Message;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Session;
+import jakarta.jms.TextMessage;
+import jakarta.jms.TopicSubscriber;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.IOHelper;
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class DurableSubscriptionPartialAckTest {
+
+    private BrokerService createBroker() throws Exception {
+        BrokerService broker = BrokerFactory.createBroker("broker:(vm://" + 
getClass().getName() + ")");
+        broker.setBrokerName("broker");
+        broker.setAdvisorySupport(false);
+        File dir = broker.getBrokerDataDirectory();
+        if (dir != null) {
+            IOHelper.deleteChildren(dir);
+        }
+        return broker;
+    }
+
+    @Test
+    public void test() throws Exception {
+        BrokerService broker = createBroker();
+        broker.start();
+        broker.waitUntilStarted();
+
+        ActiveMQTopic topic = new ActiveMQTopic("TOPIC.TEST");
+        String subName1 = "SUB1";
+        String subName2 = "SUB2";
+        int numberOfMessages = 10_000;
+
+        ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(broker.getVmConnectorURI());
+        try (Connection connection = connectionFactory.createConnection()) {
+            connection.setClientID("CLIENT_ID");
+            connection.start();
+
+            Session session = connection.createSession(false, 
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+
+            TopicSubscriber subscriber = 
session.createDurableSubscriber(topic, subName1);
+            session.createDurableSubscriber(topic, subName2).close();
+
+            MessageProducer producer = session.createProducer(topic);
+
+            for (int i = 0; i < numberOfMessages; i++) {
+                ActiveMQTextMessage message = new ActiveMQTextMessage();
+                message.setText(Integer.toString(i));
+                producer.send(message);
+            }
+
+            for (int i = 0; i < numberOfMessages; i++) {
+                Message receivedMessage = subscriber.receive(1000);
+                assertNotNull(receivedMessage);
+                assertTrue(receivedMessage instanceof TextMessage);
+                assertEquals(Integer.toString(i), ((TextMessage) 
receivedMessage).getText());
+                if (i % 2 == 0) {
+                    receivedMessage.acknowledge();
+                }
+            }
+        }
+        broker.stop();
+        broker.waitUntilStopped();
+        broker.start();
+        broker.waitUntilStarted();
+        try (Connection connection = connectionFactory.createConnection()) {
+            connection.setClientID("CLIENT_ID");
+            connection.start();
+
+            Session session = connection.createSession(false, 
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+            TopicSubscriber subscriber = 
session.createDurableSubscriber(topic, subName1);
+
+            for (int i = 1; i < numberOfMessages; i += 2) {
+                Message receivedMessage = subscriber.receive(10000);
+                assertNotNull(receivedMessage);
+                assertTrue(receivedMessage instanceof TextMessage);
+                assertEquals(Integer.toString(i), ((TextMessage) 
receivedMessage).getText());
+
+                receivedMessage.acknowledge();
+            }
+        }
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to