This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch activemq-5.18.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.18.x by this push:
new 9106a9838f [AMQ-9595] Fix recoverNextMessages when there are messages
consumed farther than maxBatchSize.
9106a9838f is described below
commit 9106a9838f8412f8b1ae87d1391bf5b15879cce1
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Fri Oct 18 15:26:32 2024 -0700
[AMQ-9595] Fix recoverNextMessages when there are messages consumed farther
than maxBatchSize.
(cherry picked from commit 6b08e104208c1a344a1ab8a312450f855415ceb1)
---
.../apache/activemq/store/kahadb/KahaDBStore.java | 3 +-
.../DurableSubscriptionPartialAckTest.java | 114 +++++++++++++++++++++
2 files changed, 115 insertions(+), 2 deletions(-)
diff --git
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index b74759c6b4..836e73f44e 100644
---
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -1239,14 +1239,13 @@ public class KahaDBStore extends MessageDatabase
implements PersistenceAdapter,
StoredDestination sd = getStoredDestination(dest, tx);
sd.orderIndex.resetCursorPosition();
MessageOrderCursor moc =
sd.subscriptionCursors.get(subscriptionKey);
- SequenceSet subAckPositions = null;
+ SequenceSet subAckPositions = getSequenceSet(tx, sd,
subscriptionKey);;
if (moc == null) {
LastAck pos = getLastAck(tx, sd, subscriptionKey);
if (pos == null) {
// sub deleted
return;
}
- subAckPositions = getSequenceSet(tx, sd,
subscriptionKey);
//If we have ackPositions tracked then compare the
first one as individual acknowledge mode
//may have bumped lastAck even though there are
earlier messages to still consume
if (subAckPositions != null &&
!subAckPositions.isEmpty()
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionPartialAckTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionPartialAckTest.java
new file mode 100644
index 0000000000..31776199a9
--- /dev/null
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionPartialAckTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import jakarta.jms.Connection;
+import jakarta.jms.Message;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Session;
+import jakarta.jms.TextMessage;
+import jakarta.jms.TopicSubscriber;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.IOHelper;
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class DurableSubscriptionPartialAckTest {
+
+ private BrokerService createBroker() throws Exception {
+ BrokerService broker = BrokerFactory.createBroker("broker:(vm://" +
getClass().getName() + ")");
+ broker.setBrokerName("broker");
+ broker.setAdvisorySupport(false);
+ File dir = broker.getBrokerDataDirectory();
+ if (dir != null) {
+ IOHelper.deleteChildren(dir);
+ }
+ return broker;
+ }
+
+ @Test
+ public void test() throws Exception {
+ BrokerService broker = createBroker();
+ broker.start();
+ broker.waitUntilStarted();
+
+ ActiveMQTopic topic = new ActiveMQTopic("TOPIC.TEST");
+ String subName1 = "SUB1";
+ String subName2 = "SUB2";
+ int numberOfMessages = 10_000;
+
+ ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(broker.getVmConnectorURI());
+ try (Connection connection = connectionFactory.createConnection()) {
+ connection.setClientID("CLIENT_ID");
+ connection.start();
+
+ Session session = connection.createSession(false,
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+
+ TopicSubscriber subscriber =
session.createDurableSubscriber(topic, subName1);
+ session.createDurableSubscriber(topic, subName2).close();
+
+ MessageProducer producer = session.createProducer(topic);
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ message.setText(Integer.toString(i));
+ producer.send(message);
+ }
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ Message receivedMessage = subscriber.receive(1000);
+ assertNotNull(receivedMessage);
+ assertTrue(receivedMessage instanceof TextMessage);
+ assertEquals(Integer.toString(i), ((TextMessage)
receivedMessage).getText());
+ if (i % 2 == 0) {
+ receivedMessage.acknowledge();
+ }
+ }
+ }
+ broker.stop();
+ broker.waitUntilStopped();
+ broker.start();
+ broker.waitUntilStarted();
+ try (Connection connection = connectionFactory.createConnection()) {
+ connection.setClientID("CLIENT_ID");
+ connection.start();
+
+ Session session = connection.createSession(false,
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+ TopicSubscriber subscriber =
session.createDurableSubscriber(topic, subName1);
+
+ for (int i = 1; i < numberOfMessages; i += 2) {
+ Message receivedMessage = subscriber.receive(10000);
+ assertNotNull(receivedMessage);
+ assertTrue(receivedMessage instanceof TextMessage);
+ assertEquals(Integer.toString(i), ((TextMessage)
receivedMessage).getText());
+
+ receivedMessage.acknowledge();
+ }
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact