This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new a5f317dcdb ARTEMIS-5093 support configurable onMessage timeout 
w/closing consumer
a5f317dcdb is described below

commit a5f317dcdb0d272a6565fa205a893ee78e9fab24
Author: Justin Bertram <[email protected]>
AuthorDate: Thu Oct 10 14:55:54 2024 -0500

    ARTEMIS-5093 support configurable onMessage timeout w/closing consumer
---
 .../artemis/api/config/ServerLocatorConfig.java    |  1 +
 .../artemis/api/core/client/ActiveMQClient.java    |  2 +
 .../artemis/api/core/client/ServerLocator.java     | 19 ++++++
 .../artemis/core/client/ActiveMQClientLogger.java  |  4 +-
 .../core/client/impl/ClientConsumerImpl.java       | 19 +++---
 .../core/client/impl/ClientSessionFactoryImpl.java |  2 +-
 .../core/client/impl/ClientSessionImpl.java        |  7 ++-
 .../core/client/impl/ServerLocatorImpl.java        | 12 ++++
 .../protocol/core/impl/ActiveMQSessionContext.java |  5 +-
 .../artemis/spi/core/remoting/SessionContext.java  |  3 +-
 .../apache/activemq/artemis/utils/FutureLatch.java |  8 +++
 .../client/HornetQClientSessionContext.java        |  5 +-
 .../integration/client/MessageHandlerTest.java     | 73 ++++++++++++++++++++--
 13 files changed, 139 insertions(+), 21 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java
index dc3180f7a9..cd57f98449 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java
@@ -47,6 +47,7 @@ public class ServerLocatorConfig {
    public int initialConnectAttempts = ActiveMQClient.INITIAL_CONNECT_ATTEMPTS;
    public int failoverAttempts = ActiveMQClient.DEFAULT_FAILOVER_ATTEMPTS;
    public int initialMessagePacketSize = 
ActiveMQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
+   public int onMessageCloseTimeout = 
ActiveMQClient.DEFAULT_ONMESSAGE_CLOSE_TIMEOUT;
    public boolean cacheLargeMessagesClient = 
ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
    public int compressionLevel = ActiveMQClient.DEFAULT_COMPRESSION_LEVEL;
    public boolean compressLargeMessage = 
ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
index 94feade8e7..e5b5229f1a 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
@@ -138,6 +138,8 @@ public final class ActiveMQClient {
 
    public static final int DEFAULT_INITIAL_MESSAGE_PACKET_SIZE = 1500;
 
+   public static final int DEFAULT_ONMESSAGE_CLOSE_TIMEOUT = 10_000;
+
    public static final boolean DEFAULT_XA = false;
 
    public static final boolean DEFAULT_HA = false;
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
index f94bf22577..8e5f9d1d06 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
@@ -744,6 +744,25 @@ public interface ServerLocator extends AutoCloseable {
     */
    ServerLocator setInitialMessagePacketSize(int size);
 
+   /**
+    * Returns the timeout for onMessage completion when closing 
ClientConsumers created through this factory.
+    * <p>
+    * Value is in milliseconds, default value is {@link 
ActiveMQClient#DEFAULT_ONMESSAGE_CLOSE_TIMEOUT}.
+    *
+    * @return the timeout for onMessage completion when closing 
ClientConsumers created through this factory
+    */
+   int getOnMessageCloseTimeout();
+
+   /**
+    * Sets the timeout in milliseconds for onMessage completion when closing 
ClientConsumers created through this factory.
+    * <p>
+    * A value of -1 means wait until the onMessage completes no matter how 
long it takes.
+    *
+    * @param onMessageCloseTimeout how long to wait in milliseconds for the 
ClientConsumer's MessageHandler's onMessage method to finish before closing or 
stopping the ClientConsumer.
+    * @return this ServerLocator
+    */
+   ServerLocator setOnMessageCloseTimeout(int onMessageCloseTimeout);
+
    /**
     * Adds an interceptor which will be executed <em>after packets are 
received from the server</em>.
     *
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
index ff3a2ed816..1debd394a2 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
@@ -36,8 +36,8 @@ public interface ActiveMQClientLogger {
    @LogMessage(id = 212001, value = "Error on clearing messages", level = 
LogMessage.Level.WARN)
    void errorClearingMessages(Throwable e);
 
-   @LogMessage(id = 212002, value = "Timed out waiting for handler to complete 
processing", level = LogMessage.Level.WARN)
-   void timeOutWaitingForProcessing();
+   @LogMessage(id = 212002, value = "Timed out after waiting {}ms for handler 
to complete processing", level = LogMessage.Level.WARN)
+   void timeOutWaitingForProcessing(long duration);
 
    @LogMessage(id = 212003, value = "Unable to close session", level = 
LogMessage.Level.WARN)
    void unableToCloseSession(Exception e);
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
index bcb5012b26..871db154df 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
@@ -56,8 +56,6 @@ public final class ClientConsumerImpl implements 
ClientConsumerInternal {
 
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-   private static final long CLOSE_TIMEOUT_MILLISECONDS = 10000;
-
    private static final int NUM_PRIORITIES = 10;
 
    public static final SimpleString FORCED_DELIVERY_MESSAGE = 
SimpleString.of("_hornetq.forced.delivery.seq");
@@ -137,6 +135,8 @@ public final class ClientConsumerImpl implements 
ClientConsumerInternal {
    private final ClassLoader contextClassLoader;
    private volatile boolean manualFlowManagement;
 
+   private final int onMessageCloseTimeout;
+
    public ClientConsumerImpl(final ClientSessionInternal session,
                              final ConsumerContext consumerContext,
                              final SimpleString queueName,
@@ -151,7 +151,8 @@ public final class ClientConsumerImpl implements 
ClientConsumerInternal {
                              final Executor flowControlExecutor,
                              final SessionContext sessionContext,
                              final ClientSession.QueueQuery queueInfo,
-                             final ClassLoader contextClassLoader) {
+                             final ClassLoader contextClassLoader,
+                             final int onMessageCloseTimeout) {
       this.consumerContext = consumerContext;
 
       this.queueName = queueName;
@@ -182,6 +183,8 @@ public final class ClientConsumerImpl implements 
ClientConsumerInternal {
 
       this.flowControlExecutor = flowControlExecutor;
 
+      this.onMessageCloseTimeout = onMessageCloseTimeout;
+
       if (logger.isTraceEnabled()) {
          logger.trace("{}:: being created at", this, new Exception("trace"));
       }
@@ -921,10 +924,12 @@ public final class ClientConsumerImpl implements 
ClientConsumerInternal {
 
       sessionExecutor.execute(future);
 
-      boolean ok = future.await(ClientConsumerImpl.CLOSE_TIMEOUT_MILLISECONDS);
-
-      if (!ok) {
-         ActiveMQClientLogger.LOGGER.timeOutWaitingForProcessing();
+      if (onMessageCloseTimeout == -1) {
+         future.await();
+      } else {
+         if (!future.await(onMessageCloseTimeout)) {
+            
ActiveMQClientLogger.LOGGER.timeOutWaitingForProcessing(onMessageCloseTimeout);
+         }
       }
    }
 
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index 7298074a1d..1d45bc1080 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -841,7 +841,7 @@ public class ClientSessionFactoryImpl implements 
ClientSessionFactoryInternal, C
 
       SessionContext context = createSessionChannel(name, username, password, 
xa, autoCommitSends, autoCommitAcks, preAcknowledge, clientID);
 
-      ClientSessionInternal session = new ClientSessionImpl(this, name, 
username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, 
serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), 
ackBatchSize, serverLocator.getConsumerWindowSize(), 
serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), 
serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), 
serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSe [...]
+      ClientSessionInternal session = new ClientSessionImpl(this, name, 
username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, 
serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), 
ackBatchSize, serverLocator.getConsumerWindowSize(), 
serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), 
serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), 
serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSe [...]
 
       synchronized (sessions) {
          if (closed || !clientProtocolManager.isAlive()) {
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index 537fafd0af..f140001f1c 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -144,6 +144,8 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
 
    private final String groupID;
 
+   private volatile int onMessageCloseTimeout;
+
    private volatile boolean inClose;
 
    private volatile boolean mayAttemptToFailover = true;
@@ -189,6 +191,7 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
                      final int compressionLevel,
                      final int initialMessagePacketSize,
                      final String groupID,
+                     final int onMessageCloseTimeout,
                      final SessionContext sessionContext,
                      final Executor executor,
                      final Executor confirmationExecutor,
@@ -246,6 +249,8 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
 
       this.groupID = groupID;
 
+      this.onMessageCloseTimeout = onMessageCloseTimeout;
+
       producerCreditManager = new ClientProducerCreditManagerImpl(this, 
producerWindowSize);
 
       this.sessionContext = sessionContext;
@@ -2012,7 +2017,7 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
                                                  final boolean browseOnly) 
throws ActiveMQException {
       checkClosed();
 
-      ClientConsumerInternal consumer = 
sessionContext.createConsumer(queueName, filterString, priority, windowSize, 
maxRate, ackBatchSize, browseOnly, executor, flowControlExecutor);
+      ClientConsumerInternal consumer = 
sessionContext.createConsumer(queueName, filterString, priority, windowSize, 
maxRate, ackBatchSize, browseOnly, executor, flowControlExecutor, 
onMessageCloseTimeout);
 
       addConsumer(consumer);
 
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index e235821bc2..7e8418ca65 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -1298,6 +1298,18 @@ public final class ServerLocatorImpl implements 
ServerLocatorInternal, Discovery
       return this;
    }
 
+   @Override
+   public int getOnMessageCloseTimeout() {
+      return config.onMessageCloseTimeout;
+   }
+
+   @Override
+   public ServerLocator setOnMessageCloseTimeout(int onMessageCloseTimeout) {
+      checkWrite();
+      config.onMessageCloseTimeout = onMessageCloseTimeout;
+      return this;
+   }
+
    @Override
    public ServerLocatorImpl setGroupID(final String groupID) {
       checkWrite();
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 4170c2e7a9..0ea23600cb 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -400,7 +400,8 @@ public class ActiveMQSessionContext extends SessionContext {
                                                 int ackBatchSize,
                                                 boolean browseOnly,
                                                 Executor executor,
-                                                Executor flowControlExecutor) 
throws ActiveMQException {
+                                                Executor flowControlExecutor,
+                                                int onMessageCloseTimeout) 
throws ActiveMQException {
       long consumerID = idGenerator.generateID();
 
       ActiveMQConsumerContext consumerContext = new 
ActiveMQConsumerContext(consumerID);
@@ -420,7 +421,7 @@ public class ActiveMQSessionContext extends SessionContext {
       // The value we send is just a hint
       final int consumerWindowSize = windowSize == 
ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE ? 
this.getDefaultConsumerWindowSize(queueInfo) : windowSize;
 
-      return new ClientConsumerImpl(session, consumerContext, queueName, 
filterString, priority, browseOnly, consumerWindowSize, 
calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new 
TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, 
this, queueInfo.toQueueQuery(), lookupTCCL());
+      return new ClientConsumerImpl(session, consumerContext, queueName, 
filterString, priority, browseOnly, consumerWindowSize, 
calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new 
TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, 
this, queueInfo.toQueueQuery(), lookupTCCL(), onMessageCloseTimeout);
    }
 
    @Override
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index 7c33fff290..550ac75075 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -340,7 +340,8 @@ public abstract class SessionContext {
                                                          int ackBatchSize,
                                                          boolean browseOnly,
                                                          Executor executor,
-                                                         Executor 
flowControlExecutor) throws ActiveMQException;
+                                                         Executor 
flowControlExecutor,
+                                                         int 
onMessageCloseTimeout) throws ActiveMQException;
 
    /**
     * Performs a round trip to the server requesting what is the current tx 
timeout on the session
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/FutureLatch.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/FutureLatch.java
index a5ee1ce9bc..30ea4dca91 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/FutureLatch.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/FutureLatch.java
@@ -41,6 +41,14 @@ public class FutureLatch implements Runnable {
       }
    }
 
+   public void await() {
+      try {
+         latch.await();
+      } catch (InterruptedException e) {
+         // ignore
+      }
+   }
+
    @Override
    public void run() {
       latch.countDown();
diff --git 
a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java
 
b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java
index a0389be865..eafa4345c2 100644
--- 
a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java
+++ 
b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java
@@ -83,7 +83,8 @@ public class HornetQClientSessionContext extends 
ActiveMQSessionContext {
                                                 int ackBatchSize,
                                                 boolean browseOnly,
                                                 Executor executor,
-                                                Executor flowControlExecutor) 
throws ActiveMQException {
+                                                Executor flowControlExecutor,
+                                                int onMessageCloseTimeout) 
throws ActiveMQException {
       long consumerID = idGenerator.generateID();
 
       ActiveMQConsumerContext consumerContext = new 
ActiveMQConsumerContext(consumerID);
@@ -96,7 +97,7 @@ public class HornetQClientSessionContext extends 
ActiveMQSessionContext {
       // could be overridden on the queue settings
       // The value we send is just a hint
 
-      return new ClientConsumerImpl(session, consumerContext, queueName, 
filterString, priority, browseOnly, windowSize, calcWindowSize(windowSize), 
ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, 
executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
+      return new ClientConsumerImpl(session, consumerContext, queueName, 
filterString, priority, browseOnly, windowSize, calcWindowSize(windowSize), 
ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, 
executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL(), 
onMessageCloseTimeout);
    }
 
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageHandlerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageHandlerTest.java
index 614a6abb91..34b78c9d3e 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageHandlerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageHandlerTest.java
@@ -16,16 +16,13 @@
  */
 package org.apache.activemq.artemis.tests.integration.client;
 
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
@@ -34,9 +31,17 @@ import 
org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.MessageHandler;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class MessageHandlerTest extends ActiveMQTestBase {
 
@@ -117,6 +122,64 @@ public class MessageHandlerTest extends ActiveMQTestBase {
       session.close();
    }
 
+   @Test
+   @Timeout(20)
+   public void testMessageHandlerCloseTimeout() throws Exception {
+      // create Netty acceptor so client can use new onMessageCloseTimeout URL 
parameter
+      server.getRemotingService().createAcceptor("netty", 
"tcp://127.0.0.1:61616").start();
+      final int TIMEOUT = 100;
+      locator = 
ActiveMQClient.createServerLocator("tcp://127.0.0.1:61616?onMessageCloseTimeout="
 + TIMEOUT);
+      sf = createSessionFactory(locator);
+      ClientSession session = addClientSession(sf.createSession(false, true, 
true));
+      session.createQueue(QueueConfiguration.of(QUEUE).setDurable(false));
+      ClientProducer producer = session.createProducer(QUEUE);
+      producer.send(createTextMessage(session, "m"));
+
+      ClientConsumer consumer = session.createConsumer(QUEUE, null, false);
+
+      session.start();
+
+      try (AssertionLoggerHandler loggerHandler = new 
AssertionLoggerHandler()) {
+         CountDownLatch beginLatch = new CountDownLatch(1);
+         AtomicBoolean messageHandlerFinished = new AtomicBoolean(false);
+         CountDownLatch completedLatch = new CountDownLatch(1);
+
+         consumer.setMessageHandler(message -> {
+            try {
+               beginLatch.countDown();
+               // don't just Thread.sleep() here because it will be 
interrupted on
+               // ClientConsumer.close()
+               while (!messageHandlerFinished.get()) {
+                  try {
+                     Thread.sleep(10);
+                  } catch (InterruptedException e) {
+                     // ignore
+                  }
+               }
+            } finally {
+               completedLatch.countDown();
+            }
+         });
+
+         try {
+            beginLatch.await();
+            long start = System.currentTimeMillis();
+            consumer.close();
+            long duration = System.currentTimeMillis() - start;
+
+            assertTrue(duration >= TIMEOUT, "Closing consumer took " + 
duration + "ms");
+            assertEquals(1, completedLatch.getCount(), "MessageHandler should 
still be working!");
+         } finally {
+            // don't let the MessageHandler stick around even if an assertion 
failed
+            messageHandlerFinished.set(true);
+         }
+
+         assertTrue(loggerHandler.findText("AMQ212002", TIMEOUT + "ms"), 
"timeout message not found in logs");
+
+         assertTrue(completedLatch.await(10, TimeUnit.SECONDS), 
"MessageHandler should complete!");
+      }
+   }
+
    @Test
    public void testSetResetMessageHandler() throws Exception {
       final ClientSession session = sf.createSession(false, true, true);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to