This is an automated email from the ASF dual-hosted git repository.
brusdev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 781a4132ca ARTEMIS-3110 Contract openwire credit window when pull
times out
781a4132ca is described below
commit 781a4132ca1e6cc7ba0fc14c9ecef45df70afd44
Author: Timothy Bish <[email protected]>
AuthorDate: Fri May 30 11:28:28 2025 -0400
ARTEMIS-3110 Contract openwire credit window when pull times out
Ensure that outstanding credit is reduced when an openwire pull consumer
issues a pull command and no message is dispatched. Pull consumers should
only have an open credit window while the pull command window is open.
---
.../core/protocol/openwire/amq/AMQConsumer.java | 13 ++-
.../openwire/amq/JMSPullConsumerTest.java | 103 +++++++++++++++++++++
2 files changed, 113 insertions(+), 3 deletions(-)
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 635d50c93b..4ac10382b5 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -306,7 +306,8 @@ public class AMQConsumer {
int size = dispatch.getMessage().getSize();
reference.setProtocolData(MessageId.class,
dispatch.getMessage().getMessageId());
session.deliverMessage(dispatch);
- currentWindow.decrementAndGet();
+ // Prevent races with other updates that can lead to credit going
negative and starving consumers.
+ currentWindow.updateAndGet(i -> i > 0 ? i - 1 : i);
return size;
} catch (Throwable t) {
logger.warn("Error during message dispatch", t);
@@ -430,7 +431,6 @@ public class AMQConsumer {
}
}
-
public org.apache.activemq.command.ActiveMQDestination
getOpenwireDestination() {
return openwireDestination;
}
@@ -468,7 +468,7 @@ public class AMQConsumer {
*/
private class MessagePullHandler {
- private long next = -1;
+ private volatile long next = -1;
private long timeout;
private CountDownLatch latch = new CountDownLatch(1);
private ScheduledFuture<?> messagePullFuture;
@@ -492,10 +492,17 @@ public class AMQConsumer {
if
(message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
if (next >= 0) {
if (timeout <= 0) {
+ // Prevent races with other updates that can lead to credit
going negative and starving consumers.
+ currentWindow.updateAndGet(i -> i > 0 ? i - 1 : i);
latch.countDown();
} else {
messagePullFuture = scheduledPool.schedule(() -> {
if (next >= 0) {
+ // Timed pull did not get a message before the
timeout, reduce credit. This
+ // can race with an actual message arriving so we must
ensure we don't reduce
+ // credit below zero as we want credit to always be
zero or on active pull it
+ // should be one (greater than one indicates a broken
client implementation).
+ currentWindow.updateAndGet(i -> i > 0 ? i - 1 : i);
handleDeliverNullDispatch();
}
}, timeout, TimeUnit.MILLISECONDS);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSPullConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSPullConsumerTest.java
new file mode 100644
index 0000000000..318af3233f
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSPullConsumerTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire.amq;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Session;
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.activemq.ActiveMQMessageConsumer;
+import
org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters;
+import
org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class JMSPullConsumerTest extends BasicOpenWireTest {
+
+ @Parameters(name = "deliveryMode={0} destinationType={1}")
+ public static Collection<Object[]> getParams() {
+ return Arrays.asList(new Object[][]{{DeliveryMode.NON_PERSISTENT,
ActiveMQDestination.QUEUE_TYPE},
+ {DeliveryMode.NON_PERSISTENT,
ActiveMQDestination.TEMP_QUEUE_TYPE},
+ {DeliveryMode.PERSISTENT,
ActiveMQDestination.QUEUE_TYPE},
+ {DeliveryMode.PERSISTENT,
ActiveMQDestination.TEMP_QUEUE_TYPE}});
+ }
+
+ public ActiveMQDestination destination;
+ public int deliveryMode;
+ public byte destinationType;
+
+ public JMSPullConsumerTest(int deliveryMode, byte destinationType) {
+ this.deliveryMode = deliveryMode;
+ this.destinationType = destinationType;
+ }
+
+ @TestTemplate
+ public void
testReceiveMessageWithZeroPrefetchDoesNotOverConsumeWhenMessagesAdded() throws
Exception {
+ // Receive a message with the JMS API using pull consumers
+ connection.getPrefetchPolicy().setAll(0);
+ connection.start();
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)
session.createConsumer(destination);
+
+ assertNull(consumer.receive(100));
+ assertNull(consumer.receive(100));
+ assertNull(consumer.receive(100));
+
+ sendMessages(session, destination, 3);
+
+ assertNotNull(consumer.receive(100));
+
+ ActiveMQMessageConsumer consumer2 = (ActiveMQMessageConsumer)
session.createConsumer(destination);
+ ActiveMQMessageConsumer consumer3 = (ActiveMQMessageConsumer)
session.createConsumer(destination);
+
+ assertNotNull(consumer2.receive(100));
+ assertNotNull(consumer3.receive(100));
+ }
+
+ @TestTemplate
+ public void
testReceiveMessageWithZeroPrefetchDoesNotRetainCreditAfterTimedPulls() throws
Exception {
+ // Receive a message with the JMS API using pull consumers
+ connection.getPrefetchPolicy().setAll(0);
+ connection.start();
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)
session.createConsumer(destination);
+
+ assertNull(consumer.receive(100));
+ assertNull(consumer.receive(100));
+ assertNull(consumer.receive(100));
+
+ sendMessages(session, destination, 3);
+
+ ActiveMQMessageConsumer consumer2 = (ActiveMQMessageConsumer)
session.createConsumer(destination);
+
+ assertNotNull(consumer2.receive(100));
+ assertNotNull(consumer2.receive(100));
+ assertNotNull(consumer2.receive(100));
+
+ assertNull(consumer.receiveNoWait());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact