Mike Youngstrom created ARTEMIS-2642:
----------------------------------------
Summary: Drain requests with a selector can cause Drain Timeouts
Key: ARTEMIS-2642
URL: https://issues.apache.org/jira/browse/ARTEMIS-2642
Project: ActiveMQ Artemis
Issue Type: Bug
Components: Broker
Affects Versions: 2.11.0
Reporter: Mike Youngstrom
Using the Qpid JMS AMQP client attempting to receive messages with no wait with
a selector can produce very long drain times causing a Drain Timeout on the
client.
Test Case using Qpid AMQP JMS client
{code:java}
public static void main(String[] args) throws Exception {
final String queueName = "queue";
var connectionFactory =
new JmsConnectionFactory(
"tqadmin",
"admin",
"amqp://localhost:5672?jms.prefetchPolicy.all=1&jms.connectTimeout=60000&transport.trustAll=true&transport.verifyHost=false&amqp.drainTimeout=10000");
connectionFactory.setExceptionListener(
e -> {
System.out.println("Got a JMSException. Terminating the VM.");
e.printStackTrace();
Runtime.getRuntime().halt(100);
});
var sendCount = new LongAdder();
var consumeCount = new LongAdder();
var consumerThread =
new Thread(
() -> {
try (var listenerContext =
connectionFactory.createContext(Session.AUTO_ACKNOWLEDGE)) {
try (var consumer =
listenerContext.createConsumer(
listenerContext.createQueue(queueName),
"selector='dude'")) {
while (!Thread.interrupted()) {
while (consumer.receiveNoWait() != null) {
consumeCount.increment();
long consumed = consumeCount.sum();
if (consumed % 100 == 0) {
System.out.println("Messages Consumed: " + consumed);
}
}
}
}
}
});
consumerThread.start();
try (var context =
connectionFactory.createContext(Session.AUTO_ACKNOWLEDGE)) {
final Message message = context.createMessage();
message.setStringProperty("selector", "dude");
var producer = context.createProducer();
var queue = context.createQueue(queueName);
while (sendCount.sum() < 100000 && !Thread.interrupted()) {
producer.send(queue, message);
sendCount.increment();
long sent = sendCount.sum();
if (sent % 100 == 0) {
System.out.println("Messages Sent: " + sent);
}
}
}
}
{code}
Error Thrown after about 2000 messages are consumed (in a default local
environment)
{code:java}
Exception in thread "Thread-0" javax.jms.JMSRuntimeException: Remote did not
respond to a drain request in timeException in thread "Thread-0"
javax.jms.JMSRuntimeException: Remote did not respond to a drain request in
time at
org.apache.qpid.jms.exceptions.JmsExceptionSupport.createRuntimeException(JmsExceptionSupport.java:211)
at org.apache.qpid.jms.JmsConsumer.receiveNoWait(JmsConsumer.java:100) at
connections.TQTest2.lambda$1(TQTest2.java:33) at
java.base/java.lang.Thread.run(Thread.java:834)Caused by:
org.apache.qpid.jms.JmsOperationTimedOutException: Remote did not respond to a
drain request in time at
org.apache.qpid.jms.provider.exceptions.ProviderOperationTimedOutException.toJMSException(ProviderOperationTimedOutException.java:39)
at
org.apache.qpid.jms.provider.exceptions.ProviderOperationTimedOutException.toJMSException(ProviderOperationTimedOutException.java:1)
at
org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80)
at
org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112)
at org.apache.qpid.jms.JmsConnection.pull(JmsConnection.java:915) at
org.apache.qpid.jms.JmsConnection.pull(JmsConnection.java:899) at
org.apache.qpid.jms.JmsMessageConsumer.performPullIfRequired(JmsMessageConsumer.java:726)
at org.apache.qpid.jms.JmsMessageConsumer.dequeue(JmsMessageConsumer.java:332)
at
org.apache.qpid.jms.JmsMessageConsumer.receiveNoWait(JmsMessageConsumer.java:221)
at org.apache.qpid.jms.JmsConsumer.receiveNoWait(JmsConsumer.java:98) ... 2
moreCaused by:
org.apache.qpid.jms.provider.exceptions.ProviderOperationTimedOutException:
Remote did not respond to a drain request in time at
org.apache.qpid.jms.provider.amqp.AmqpConsumer.lambda$1(AmqpConsumer.java:179)
at
io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at
io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:127)
at
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515) at
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
... 1 more {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)