This is an automated email from the ASF dual-hosted git repository.

brusdev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 781a4132ca ARTEMIS-3110 Contract openwire credit window when pull 
times out
781a4132ca is described below

commit 781a4132ca1e6cc7ba0fc14c9ecef45df70afd44
Author: Timothy Bish <[email protected]>
AuthorDate: Fri May 30 11:28:28 2025 -0400

    ARTEMIS-3110 Contract openwire credit window when pull times out
    
    Ensure that outstanding credit is reduced when an openwire  pull consumer
    issues a pull command and no message is dispatched. Pull consumers should
    only have an open credit window while the pull command window is open.
---
 .../core/protocol/openwire/amq/AMQConsumer.java    |  13 ++-
 .../openwire/amq/JMSPullConsumerTest.java          | 103 +++++++++++++++++++++
 2 files changed, 113 insertions(+), 3 deletions(-)

diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 635d50c93b..4ac10382b5 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -306,7 +306,8 @@ public class AMQConsumer {
          int size = dispatch.getMessage().getSize();
          reference.setProtocolData(MessageId.class, 
dispatch.getMessage().getMessageId());
          session.deliverMessage(dispatch);
-         currentWindow.decrementAndGet();
+         // Prevent races with other updates that can lead to credit going 
negative and starving consumers.
+         currentWindow.updateAndGet(i -> i > 0 ? i - 1 : i);
          return size;
       } catch (Throwable t) {
          logger.warn("Error during message dispatch", t);
@@ -430,7 +431,6 @@ public class AMQConsumer {
       }
    }
 
-
    public org.apache.activemq.command.ActiveMQDestination 
getOpenwireDestination() {
       return openwireDestination;
    }
@@ -468,7 +468,7 @@ public class AMQConsumer {
     */
    private class MessagePullHandler {
 
-      private long next = -1;
+      private volatile long next = -1;
       private long timeout;
       private CountDownLatch latch = new CountDownLatch(1);
       private ScheduledFuture<?> messagePullFuture;
@@ -492,10 +492,17 @@ public class AMQConsumer {
          if 
(message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
             if (next >= 0) {
                if (timeout <= 0) {
+                  // Prevent races with other updates that can lead to credit 
going negative and starving consumers.
+                  currentWindow.updateAndGet(i -> i > 0 ? i - 1 : i);
                   latch.countDown();
                } else {
                   messagePullFuture = scheduledPool.schedule(() -> {
                      if (next >= 0) {
+                        // Timed pull did not get a message before the 
timeout, reduce credit. This
+                        // can race with an actual message arriving so we must 
ensure we don't reduce
+                        // credit below zero as we want credit to always be 
zero or on active pull it
+                        // should be one (greater than one indicates a broken 
client implementation).
+                        currentWindow.updateAndGet(i -> i > 0 ? i - 1 : i);
                         handleDeliverNullDispatch();
                      }
                   }, timeout, TimeUnit.MILLISECONDS);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSPullConsumerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSPullConsumerTest.java
new file mode 100644
index 0000000000..318af3233f
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSPullConsumerTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire.amq;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Session;
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.activemq.ActiveMQMessageConsumer;
+import 
org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters;
+import 
org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class JMSPullConsumerTest extends BasicOpenWireTest {
+
+   @Parameters(name = "deliveryMode={0} destinationType={1}")
+   public static Collection<Object[]> getParams() {
+      return Arrays.asList(new Object[][]{{DeliveryMode.NON_PERSISTENT, 
ActiveMQDestination.QUEUE_TYPE},
+                                          {DeliveryMode.NON_PERSISTENT, 
ActiveMQDestination.TEMP_QUEUE_TYPE},
+                                          {DeliveryMode.PERSISTENT, 
ActiveMQDestination.QUEUE_TYPE},
+                                          {DeliveryMode.PERSISTENT, 
ActiveMQDestination.TEMP_QUEUE_TYPE}});
+   }
+
+   public ActiveMQDestination destination;
+   public int deliveryMode;
+   public byte destinationType;
+
+   public JMSPullConsumerTest(int deliveryMode, byte destinationType) {
+      this.deliveryMode = deliveryMode;
+      this.destinationType = destinationType;
+   }
+
+   @TestTemplate
+   public void 
testReceiveMessageWithZeroPrefetchDoesNotOverConsumeWhenMessagesAdded() throws 
Exception {
+      // Receive a message with the JMS API using pull consumers
+      connection.getPrefetchPolicy().setAll(0);
+      connection.start();
+
+      Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      destination = createDestination(session, destinationType);
+      ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) 
session.createConsumer(destination);
+
+      assertNull(consumer.receive(100));
+      assertNull(consumer.receive(100));
+      assertNull(consumer.receive(100));
+
+      sendMessages(session, destination, 3);
+
+      assertNotNull(consumer.receive(100));
+
+      ActiveMQMessageConsumer consumer2 = (ActiveMQMessageConsumer) 
session.createConsumer(destination);
+      ActiveMQMessageConsumer consumer3 = (ActiveMQMessageConsumer) 
session.createConsumer(destination);
+
+      assertNotNull(consumer2.receive(100));
+      assertNotNull(consumer3.receive(100));
+   }
+
+   @TestTemplate
+   public void 
testReceiveMessageWithZeroPrefetchDoesNotRetainCreditAfterTimedPulls() throws 
Exception {
+      // Receive a message with the JMS API using pull consumers
+      connection.getPrefetchPolicy().setAll(0);
+      connection.start();
+
+      Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      destination = createDestination(session, destinationType);
+      ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) 
session.createConsumer(destination);
+
+      assertNull(consumer.receive(100));
+      assertNull(consumer.receive(100));
+      assertNull(consumer.receive(100));
+
+      sendMessages(session, destination, 3);
+
+      ActiveMQMessageConsumer consumer2 = (ActiveMQMessageConsumer) 
session.createConsumer(destination);
+
+      assertNotNull(consumer2.receive(100));
+      assertNotNull(consumer2.receive(100));
+      assertNotNull(consumer2.receive(100));
+
+      assertNull(consumer.receiveNoWait());
+   }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to