This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new a5f317dcdb ARTEMIS-5093 support configurable onMessage timeout
w/closing consumer
a5f317dcdb is described below
commit a5f317dcdb0d272a6565fa205a893ee78e9fab24
Author: Justin Bertram <[email protected]>
AuthorDate: Thu Oct 10 14:55:54 2024 -0500
ARTEMIS-5093 support configurable onMessage timeout w/closing consumer
---
.../artemis/api/config/ServerLocatorConfig.java | 1 +
.../artemis/api/core/client/ActiveMQClient.java | 2 +
.../artemis/api/core/client/ServerLocator.java | 19 ++++++
.../artemis/core/client/ActiveMQClientLogger.java | 4 +-
.../core/client/impl/ClientConsumerImpl.java | 19 +++---
.../core/client/impl/ClientSessionFactoryImpl.java | 2 +-
.../core/client/impl/ClientSessionImpl.java | 7 ++-
.../core/client/impl/ServerLocatorImpl.java | 12 ++++
.../protocol/core/impl/ActiveMQSessionContext.java | 5 +-
.../artemis/spi/core/remoting/SessionContext.java | 3 +-
.../apache/activemq/artemis/utils/FutureLatch.java | 8 +++
.../client/HornetQClientSessionContext.java | 5 +-
.../integration/client/MessageHandlerTest.java | 73 ++++++++++++++++++++--
13 files changed, 139 insertions(+), 21 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java
index dc3180f7a9..cd57f98449 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java
@@ -47,6 +47,7 @@ public class ServerLocatorConfig {
public int initialConnectAttempts = ActiveMQClient.INITIAL_CONNECT_ATTEMPTS;
public int failoverAttempts = ActiveMQClient.DEFAULT_FAILOVER_ATTEMPTS;
public int initialMessagePacketSize =
ActiveMQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
+ public int onMessageCloseTimeout =
ActiveMQClient.DEFAULT_ONMESSAGE_CLOSE_TIMEOUT;
public boolean cacheLargeMessagesClient =
ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
public int compressionLevel = ActiveMQClient.DEFAULT_COMPRESSION_LEVEL;
public boolean compressLargeMessage =
ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
index 94feade8e7..e5b5229f1a 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
@@ -138,6 +138,8 @@ public final class ActiveMQClient {
public static final int DEFAULT_INITIAL_MESSAGE_PACKET_SIZE = 1500;
+ public static final int DEFAULT_ONMESSAGE_CLOSE_TIMEOUT = 10_000;
+
public static final boolean DEFAULT_XA = false;
public static final boolean DEFAULT_HA = false;
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
index f94bf22577..8e5f9d1d06 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
@@ -744,6 +744,25 @@ public interface ServerLocator extends AutoCloseable {
*/
ServerLocator setInitialMessagePacketSize(int size);
+ /**
+ * Returns the timeout for onMessage completion when closing
ClientConsumers created through this factory.
+ * <p>
+ * Value is in milliseconds, default value is {@link
ActiveMQClient#DEFAULT_ONMESSAGE_CLOSE_TIMEOUT}.
+ *
+ * @return the timeout for onMessage completion when closing
ClientConsumers created through this factory
+ */
+ int getOnMessageCloseTimeout();
+
+ /**
+ * Sets the timeout in milliseconds for onMessage completion when closing
ClientConsumers created through this factory.
+ * <p>
+ * A value of -1 means wait until the onMessage completes no matter how
long it takes.
+ *
+ * @param onMessageCloseTimeout how long to wait in milliseconds for the
ClientConsumer's MessageHandler's onMessage method to finish before closing or
stopping the ClientConsumer.
+ * @return this ServerLocator
+ */
+ ServerLocator setOnMessageCloseTimeout(int onMessageCloseTimeout);
+
/**
* Adds an interceptor which will be executed <em>after packets are
received from the server</em>.
*
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
index ff3a2ed816..1debd394a2 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
@@ -36,8 +36,8 @@ public interface ActiveMQClientLogger {
@LogMessage(id = 212001, value = "Error on clearing messages", level =
LogMessage.Level.WARN)
void errorClearingMessages(Throwable e);
- @LogMessage(id = 212002, value = "Timed out waiting for handler to complete
processing", level = LogMessage.Level.WARN)
- void timeOutWaitingForProcessing();
+ @LogMessage(id = 212002, value = "Timed out after waiting {}ms for handler
to complete processing", level = LogMessage.Level.WARN)
+ void timeOutWaitingForProcessing(long duration);
@LogMessage(id = 212003, value = "Unable to close session", level =
LogMessage.Level.WARN)
void unableToCloseSession(Exception e);
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
index bcb5012b26..871db154df 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
@@ -56,8 +56,6 @@ public final class ClientConsumerImpl implements
ClientConsumerInternal {
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private static final long CLOSE_TIMEOUT_MILLISECONDS = 10000;
-
private static final int NUM_PRIORITIES = 10;
public static final SimpleString FORCED_DELIVERY_MESSAGE =
SimpleString.of("_hornetq.forced.delivery.seq");
@@ -137,6 +135,8 @@ public final class ClientConsumerImpl implements
ClientConsumerInternal {
private final ClassLoader contextClassLoader;
private volatile boolean manualFlowManagement;
+ private final int onMessageCloseTimeout;
+
public ClientConsumerImpl(final ClientSessionInternal session,
final ConsumerContext consumerContext,
final SimpleString queueName,
@@ -151,7 +151,8 @@ public final class ClientConsumerImpl implements
ClientConsumerInternal {
final Executor flowControlExecutor,
final SessionContext sessionContext,
final ClientSession.QueueQuery queueInfo,
- final ClassLoader contextClassLoader) {
+ final ClassLoader contextClassLoader,
+ final int onMessageCloseTimeout) {
this.consumerContext = consumerContext;
this.queueName = queueName;
@@ -182,6 +183,8 @@ public final class ClientConsumerImpl implements
ClientConsumerInternal {
this.flowControlExecutor = flowControlExecutor;
+ this.onMessageCloseTimeout = onMessageCloseTimeout;
+
if (logger.isTraceEnabled()) {
logger.trace("{}:: being created at", this, new Exception("trace"));
}
@@ -921,10 +924,12 @@ public final class ClientConsumerImpl implements
ClientConsumerInternal {
sessionExecutor.execute(future);
- boolean ok = future.await(ClientConsumerImpl.CLOSE_TIMEOUT_MILLISECONDS);
-
- if (!ok) {
- ActiveMQClientLogger.LOGGER.timeOutWaitingForProcessing();
+ if (onMessageCloseTimeout == -1) {
+ future.await();
+ } else {
+ if (!future.await(onMessageCloseTimeout)) {
+
ActiveMQClientLogger.LOGGER.timeOutWaitingForProcessing(onMessageCloseTimeout);
+ }
}
}
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index 7298074a1d..1d45bc1080 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -841,7 +841,7 @@ public class ClientSessionFactoryImpl implements
ClientSessionFactoryInternal, C
SessionContext context = createSessionChannel(name, username, password,
xa, autoCommitSends, autoCommitAcks, preAcknowledge, clientID);
- ClientSessionInternal session = new ClientSessionImpl(this, name,
username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge,
serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(),
ackBatchSize, serverLocator.getConsumerWindowSize(),
serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(),
serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(),
serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSe [...]
+ ClientSessionInternal session = new ClientSessionImpl(this, name,
username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge,
serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(),
ackBatchSize, serverLocator.getConsumerWindowSize(),
serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(),
serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(),
serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSe [...]
synchronized (sessions) {
if (closed || !clientProtocolManager.isAlive()) {
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index 537fafd0af..f140001f1c 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -144,6 +144,8 @@ public final class ClientSessionImpl implements
ClientSessionInternal, FailureLi
private final String groupID;
+ private volatile int onMessageCloseTimeout;
+
private volatile boolean inClose;
private volatile boolean mayAttemptToFailover = true;
@@ -189,6 +191,7 @@ public final class ClientSessionImpl implements
ClientSessionInternal, FailureLi
final int compressionLevel,
final int initialMessagePacketSize,
final String groupID,
+ final int onMessageCloseTimeout,
final SessionContext sessionContext,
final Executor executor,
final Executor confirmationExecutor,
@@ -246,6 +249,8 @@ public final class ClientSessionImpl implements
ClientSessionInternal, FailureLi
this.groupID = groupID;
+ this.onMessageCloseTimeout = onMessageCloseTimeout;
+
producerCreditManager = new ClientProducerCreditManagerImpl(this,
producerWindowSize);
this.sessionContext = sessionContext;
@@ -2012,7 +2017,7 @@ public final class ClientSessionImpl implements
ClientSessionInternal, FailureLi
final boolean browseOnly)
throws ActiveMQException {
checkClosed();
- ClientConsumerInternal consumer =
sessionContext.createConsumer(queueName, filterString, priority, windowSize,
maxRate, ackBatchSize, browseOnly, executor, flowControlExecutor);
+ ClientConsumerInternal consumer =
sessionContext.createConsumer(queueName, filterString, priority, windowSize,
maxRate, ackBatchSize, browseOnly, executor, flowControlExecutor,
onMessageCloseTimeout);
addConsumer(consumer);
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index e235821bc2..7e8418ca65 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -1298,6 +1298,18 @@ public final class ServerLocatorImpl implements
ServerLocatorInternal, Discovery
return this;
}
+ @Override
+ public int getOnMessageCloseTimeout() {
+ return config.onMessageCloseTimeout;
+ }
+
+ @Override
+ public ServerLocator setOnMessageCloseTimeout(int onMessageCloseTimeout) {
+ checkWrite();
+ config.onMessageCloseTimeout = onMessageCloseTimeout;
+ return this;
+ }
+
@Override
public ServerLocatorImpl setGroupID(final String groupID) {
checkWrite();
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 4170c2e7a9..0ea23600cb 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -400,7 +400,8 @@ public class ActiveMQSessionContext extends SessionContext {
int ackBatchSize,
boolean browseOnly,
Executor executor,
- Executor flowControlExecutor)
throws ActiveMQException {
+ Executor flowControlExecutor,
+ int onMessageCloseTimeout)
throws ActiveMQException {
long consumerID = idGenerator.generateID();
ActiveMQConsumerContext consumerContext = new
ActiveMQConsumerContext(consumerID);
@@ -420,7 +421,7 @@ public class ActiveMQSessionContext extends SessionContext {
// The value we send is just a hint
final int consumerWindowSize = windowSize ==
ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE ?
this.getDefaultConsumerWindowSize(queueInfo) : windowSize;
- return new ClientConsumerImpl(session, consumerContext, queueName,
filterString, priority, browseOnly, consumerWindowSize,
calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new
TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor,
this, queueInfo.toQueueQuery(), lookupTCCL());
+ return new ClientConsumerImpl(session, consumerContext, queueName,
filterString, priority, browseOnly, consumerWindowSize,
calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new
TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor,
this, queueInfo.toQueueQuery(), lookupTCCL(), onMessageCloseTimeout);
}
@Override
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index 7c33fff290..550ac75075 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -340,7 +340,8 @@ public abstract class SessionContext {
int ackBatchSize,
boolean browseOnly,
Executor executor,
- Executor
flowControlExecutor) throws ActiveMQException;
+ Executor
flowControlExecutor,
+ int
onMessageCloseTimeout) throws ActiveMQException;
/**
* Performs a round trip to the server requesting what is the current tx
timeout on the session
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/FutureLatch.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/FutureLatch.java
index a5ee1ce9bc..30ea4dca91 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/FutureLatch.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/FutureLatch.java
@@ -41,6 +41,14 @@ public class FutureLatch implements Runnable {
}
}
+ public void await() {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+
@Override
public void run() {
latch.countDown();
diff --git
a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java
b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java
index a0389be865..eafa4345c2 100644
---
a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java
+++
b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java
@@ -83,7 +83,8 @@ public class HornetQClientSessionContext extends
ActiveMQSessionContext {
int ackBatchSize,
boolean browseOnly,
Executor executor,
- Executor flowControlExecutor)
throws ActiveMQException {
+ Executor flowControlExecutor,
+ int onMessageCloseTimeout)
throws ActiveMQException {
long consumerID = idGenerator.generateID();
ActiveMQConsumerContext consumerContext = new
ActiveMQConsumerContext(consumerID);
@@ -96,7 +97,7 @@ public class HornetQClientSessionContext extends
ActiveMQSessionContext {
// could be overridden on the queue settings
// The value we send is just a hint
- return new ClientConsumerImpl(session, consumerContext, queueName,
filterString, priority, browseOnly, windowSize, calcWindowSize(windowSize),
ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null,
executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
+ return new ClientConsumerImpl(session, consumerContext, queueName,
filterString, priority, browseOnly, windowSize, calcWindowSize(windowSize),
ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null,
executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL(),
onMessageCloseTimeout);
}
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageHandlerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageHandlerTest.java
index 614a6abb91..34b78c9d3e 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageHandlerTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageHandlerTest.java
@@ -16,16 +16,13 @@
*/
package org.apache.activemq.artemis.tests.integration.client;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
@@ -34,9 +31,17 @@ import
org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class MessageHandlerTest extends ActiveMQTestBase {
@@ -117,6 +122,64 @@ public class MessageHandlerTest extends ActiveMQTestBase {
session.close();
}
+ @Test
+ @Timeout(20)
+ public void testMessageHandlerCloseTimeout() throws Exception {
+ // create Netty acceptor so client can use new onMessageCloseTimeout URL
parameter
+ server.getRemotingService().createAcceptor("netty",
"tcp://127.0.0.1:61616").start();
+ final int TIMEOUT = 100;
+ locator =
ActiveMQClient.createServerLocator("tcp://127.0.0.1:61616?onMessageCloseTimeout="
+ TIMEOUT);
+ sf = createSessionFactory(locator);
+ ClientSession session = addClientSession(sf.createSession(false, true,
true));
+ session.createQueue(QueueConfiguration.of(QUEUE).setDurable(false));
+ ClientProducer producer = session.createProducer(QUEUE);
+ producer.send(createTextMessage(session, "m"));
+
+ ClientConsumer consumer = session.createConsumer(QUEUE, null, false);
+
+ session.start();
+
+ try (AssertionLoggerHandler loggerHandler = new
AssertionLoggerHandler()) {
+ CountDownLatch beginLatch = new CountDownLatch(1);
+ AtomicBoolean messageHandlerFinished = new AtomicBoolean(false);
+ CountDownLatch completedLatch = new CountDownLatch(1);
+
+ consumer.setMessageHandler(message -> {
+ try {
+ beginLatch.countDown();
+ // don't just Thread.sleep() here because it will be
interrupted on
+ // ClientConsumer.close()
+ while (!messageHandlerFinished.get()) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ } finally {
+ completedLatch.countDown();
+ }
+ });
+
+ try {
+ beginLatch.await();
+ long start = System.currentTimeMillis();
+ consumer.close();
+ long duration = System.currentTimeMillis() - start;
+
+ assertTrue(duration >= TIMEOUT, "Closing consumer took " +
duration + "ms");
+ assertEquals(1, completedLatch.getCount(), "MessageHandler should
still be working!");
+ } finally {
+ // don't let the MessageHandler stick around even if an assertion
failed
+ messageHandlerFinished.set(true);
+ }
+
+ assertTrue(loggerHandler.findText("AMQ212002", TIMEOUT + "ms"),
"timeout message not found in logs");
+
+ assertTrue(completedLatch.await(10, TimeUnit.SECONDS),
"MessageHandler should complete!");
+ }
+ }
+
@Test
public void testSetResetMessageHandler() throws Exception {
final ClientSession session = sf.createSession(false, true, true);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact