This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 8599917222 ARTEMIS-4141 Update credits even for expired messages
8599917222 is described below
commit 85999172227bff29e8c5ffeef9340132d1413a35
Author: Nicolas Filotto <[email protected]>
AuthorDate: Fri Sep 22 09:06:11 2023 +0200
ARTEMIS-4141 Update credits even for expired messages
When big messages are produced if a consumer receives an expired message,
the credits are not updated, so if the consumer is too slow and an expiry delay
has been set, we can end up with a situation where there are no more credits
which prevents the consumer from receiving any more messages.
---
.../artemis/api/core/client/MessageHandler.java | 9 +++
.../core/client/impl/ClientConsumerImpl.java | 11 ++-
.../jms/client/JMSMessageListenerWrapper.java | 9 +++
.../artemis/ra/inflow/ActiveMQMessageHandler.java | 11 +++
.../jms/client/SlowLargeMessageConsumerTest.java | 83 ++++++++++++++++++++++
5 files changed, 120 insertions(+), 3 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/MessageHandler.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/MessageHandler.java
index 4f39d1cc5b..64fa15d352 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/MessageHandler.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/MessageHandler.java
@@ -32,4 +32,13 @@ public interface MessageHandler {
* @param message a message
*/
void onMessage(ClientMessage message);
+
+ /**
+ * Notifies the MessageHandler that an expired message has been received.
+ *
+ * @param message a message
+ */
+ default void onMessageExpired(ClientMessage message) {
+ // Do nothing by default
+ }
}
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
index 6a730a0972..75b5621843 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
@@ -992,13 +992,18 @@ public final class ClientConsumerImpl implements
ClientConsumerInternal {
logger.trace("{}::Handler.onMessage done", this);
- if (message.isLargeMessage()) {
- message.discardBody();
- }
} else {
+ theHandler.onMessageExpired(message);
+
+ logger.trace("{}::Handler.onMessageExpired done", this);
+
session.expire(this, message);
}
+ if (message.isLargeMessage()) {
+ message.discardBody();
+ }
+
// If slow consumer, we need to send 1 credit to make sure we get
another message
if (clientWindowSize == 0) {
startSlowConsumer();
diff --git
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
index f24e90db02..ec7bba23c2 100644
---
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
+++
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
@@ -143,4 +143,13 @@ public class JMSMessageListenerWrapper implements
MessageHandler {
session.setRecoverCalled(false);
}
+
+ @Override
+ public void onMessageExpired(ClientMessage message) {
+ try {
+ message.checkCompletion();
+ } catch (ActiveMQException e) {
+ ActiveMQJMSClientLogger.LOGGER.errorProcessingMessage(e);
+ }
+ }
}
diff --git
a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
index 8be00955ad..2a3cf24048 100644
---
a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
+++
b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
@@ -42,6 +42,7 @@ import
org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.jms.client.ActiveMQJMSClientLogger;
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
import
org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQCompatibleMessage;
@@ -397,6 +398,16 @@ public class ActiveMQMessageHandler implements
MessageHandler, FailoverEventList
}
+
+ @Override
+ public void onMessageExpired(ClientMessage message) {
+ try {
+ message.checkCompletion();
+ } catch (ActiveMQException e) {
+ ActiveMQJMSClientLogger.LOGGER.errorProcessingMessage(e);
+ }
+ }
+
public void start() throws ActiveMQException {
session.start();
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/SlowLargeMessageConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/SlowLargeMessageConsumerTest.java
new file mode 100644
index 0000000000..7354edf5e7
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/SlowLargeMessageConsumerTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.jms.client;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.apache.commons.lang3.RandomUtils;
+import org.junit.Test;
+
+public class SlowLargeMessageConsumerTest extends JMSTestBase {
+
+ private static final String TOPIC = "SlowLargeMessageConsumerTopic";
+
+
+ @Override
+ protected void extraServerConfig(ActiveMQServer server) {
+ server.getConfiguration().getAddressSettings().put(TOPIC, new
AddressSettings().setExpiryDelay(100L).setMaxSizeBytes(1024));
+ }
+
+ /**
+ * @see <a
href="https://issues.apache.org/jira/browse/ARTEMIS-4141">ARTEMIS-4141</a>
+ */
+ @Test
+ public void ensureSlowConsumerOfLargeMessageNeverGetsStuck() throws
Exception {
+ try (Connection conn = cf.createConnection()) {
+ conn.start();
+ try (Session sessionConsumer = conn.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ Session sessionProducer = conn.createSession(false,
Session.CLIENT_ACKNOWLEDGE)) {
+ final Destination topic = sessionConsumer.createTopic(TOPIC);
+ final MessageConsumer consumer =
sessionConsumer.createConsumer(topic);
+ final AtomicBoolean slow = new AtomicBoolean(true);
+ final CountDownLatch messageReceived = new CountDownLatch(1);
+ consumer.setMessageListener(message -> {
+ if (slow.get()) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(50);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ } else {
+ messageReceived.countDown();
+ }
+ });
+ final MessageProducer producer =
sessionProducer.createProducer(topic);
+ int msgSize = 512 * 1024;
+ for (int i = 0; i < 100; i++) {
+
producer.send(sessionProducer.createObjectMessage(RandomUtils.nextBytes(msgSize)));
+ TimeUnit.MILLISECONDS.sleep(25);
+ }
+ TimeUnit.MILLISECONDS.sleep(100);
+ slow.set(false);
+
producer.send(sessionProducer.createObjectMessage(RandomUtils.nextBytes(msgSize)));
+ assertTrue(messageReceived.await(500, TimeUnit.MILLISECONDS));
+ }
+ }
+ }
+}