[
https://issues.apache.org/jira/browse/ARTEMIS-4240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17712616#comment-17712616
]
Apache Dev edited comment on ARTEMIS-4240 at 4/15/23 9:12 AM:
--------------------------------------------------------------
I reproduced the issue using Artemis broker/client version 2.28.0.
Reproducer is deterministic for me.
This sample ouput shows that messages are processed randomly with a delay of 30
seconds:
{code:java}
Messages sent: 10
Message received
Message received. Elapsed from previous message: 137 ms
Message received. Elapsed from previous message: 30001 ms
Message received. Elapsed from previous message: 7 ms
Message received. Elapsed from previous message: 30002 ms
Message received. Elapsed from previous message: 104 ms
Message received. Elapsed from previous message: 30001 ms
Message received. Elapsed from previous message: 6 ms
Message received. Elapsed from previous message: 30001 ms
Message received. Elapsed from previous message: 110 ms
{code}
This is the code:
{code:java}
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
public class Main {
private static final String BROKER_URL =
"tcp://localhost:61616?minLargeMessageSize=2147483647";
private static final String QUEUE = "testQueue";
private static final int MESSAGE_SIZE_BYTES = 1024 * 1024;
private static final int NUMBER_OF_MESSAGES = 10;
private static final ExecutorService executor =
Executors.newCachedThreadPool();
private static final CountDownLatch completionLatch = new
CountDownLatch(NUMBER_OF_MESSAGES);
public static void main(String[] args) throws Exception {
// create large payload
char[] chars = new char[MESSAGE_SIZE_BYTES];
Arrays.fill(chars, '0');
String largePayload = new String(chars);
// create connection
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(BROKER_URL);
Connection connection = connectionFactory.createConnection();
// register a single consumer that processes messages using
ExecutorService
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
final AtomicLong lastProcessedTimestamp = new AtomicLong();
@Override
public void onMessage(Message message) {
long now = System.currentTimeMillis();
if (lastProcessedTimestamp.compareAndSet(0, now)) {
System.out.println("Message received");
} else {
long before = lastProcessedTimestamp.getAndSet(now);
System.out.printf("Message received. Elapsed from previous
message: %d ms%n", now - before);
}
// process message asynchronously
executor.execute(new Runnable() {
@Override
public void run() {
// here we read the message
try {
BytesMessage bytesMessage = (BytesMessage) message;
byte[] buff = new byte[(int)
bytesMessage.getBodyLength()];
bytesMessage.readBytes(buff);
} catch (JMSException e) {
throw new RuntimeException(e);
}
completionLatch.countDown();
}
});
}
});
// send messages
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
Session sessionSend = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Destination destinationSend = sessionSend.createQueue(QUEUE);
MessageProducer producer =
sessionSend.createProducer(destinationSend);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
BytesMessage message = sessionSend.createBytesMessage();
message.writeBytes(largePayload.getBytes());
producer.send(message);
}
System.out.printf("Messages sent: %d%n%n", NUMBER_OF_MESSAGES);
connection.start();
completionLatch.await();
connection.close();
executor.shutdown();
}
}
{code}
was (Author: apachedev):
I reproduced the issue using Artemis broker/client version 2.28.0.
Reproducer is deterministic for me.
This sample ouput shows that messages are processed randomly with a delay of 30
seconds:
{code:java}
Messages sent: 10
Message received
Message received. Elapsed from previous message: 137 ms
Message received. Elapsed from previous message: 30001 ms
Message received. Elapsed from previous message: 7 ms
Message received. Elapsed from previous message: 30002 ms
Message received. Elapsed from previous message: 104 ms
Message received. Elapsed from previous message: 30001 ms
Message received. Elapsed from previous message: 6 ms
Message received. Elapsed from previous message: 30001 ms
Message received. Elapsed from previous message: 110 ms
{code}
This is the code:
{code:java}
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
public class Main {
private static final String BROKER_URL =
"tcp://localhost:61616?minLargeMessageSize=2147483647";
private static final String QUEUE = "testQueue";
private static final int MESSAGE_SIZE_BYTES = 1024 * 1024;
private static final int NUMBER_OF_MESSAGES = 10;
private static final ExecutorService executor =
Executors.newCachedThreadPool();
private static final CountDownLatch completionLatch = new
CountDownLatch(NUMBER_OF_MESSAGES);
public static void main(String[] args) throws Exception {
// create large payload
char[] chars = new char[MESSAGE_SIZE_BYTES];
Arrays.fill(chars, '0');
String largePayload = new String(chars);
// create connection
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(BROKER_URL);
Connection connection = connectionFactory.createConnection();
// register a single consumer that processes messages using
ExecutorService
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
final AtomicLong lastProcessedTimestamp = new AtomicLong();
@Override
public void onMessage(Message message) {
long now = System.currentTimeMillis();
if (lastProcessedTimestamp.compareAndSet(0, now)) {
System.out.println("Message received");
} else {
long before = lastProcessedTimestamp.getAndSet(now);
System.out.printf("Message received. Elapsed from previous
message: %d ms%n", now - before);
}
// process message asynchronously
executor.execute(new Runnable() {
@Override
public void run() {
// here we read the message
try {
BytesMessage bytesMessage = (BytesMessage) message;
byte[] buff = new byte[(int)
bytesMessage.getBodyLength()];
bytesMessage.readBytes(buff);
} catch (JMSException e) {
throw new RuntimeException(e);
}
completionLatch.countDown();
}
});
}
});
// send two messages
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
Session sessionSend = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Destination destinationSend = sessionSend.createQueue(QUEUE);
MessageProducer producer =
sessionSend.createProducer(destinationSend);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
BytesMessage message = sessionSend.createBytesMessage();
message.writeBytes(largePayload.getBytes());
producer.send(message);
}
System.out.printf("Messages sent: %d%n%n", NUMBER_OF_MESSAGES);
connection.start();
completionLatch.await();
connection.close();
executor.shutdown();
}
}
{code}
> Consumer stuck handling Large Message
> -------------------------------------
>
> Key: ARTEMIS-4240
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4240
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Components: Broker, JMS
> Affects Versions: 2.19.1, 2.28.0
> Reporter: Apache Dev
> Priority: Critical
>
> In this scenario:
> * "core" protocol
> * JMS consumer APIs
> * non-persistent messaging
> * client connection configured with {{minLargeMessageSize=2147483647}} in
> order to disable Large Messages
> a consumer receives correctly all messages having a small size.
> But when a message > 1Mib is received, the consumer thread is stuck for 30
> seconds, with this stack:
> {noformat}
> 3XMTHREADINFO "Thread-3 (ActiveMQ-client-global-threads)"
> J9VMThread:0x00000000006D9300, omrthread_t:0x00007FB7A002C248,
> java/lang/Thread:0x00000000E0BD22F8, state:P, prio=5
> 3XMJAVALTHREAD (java/lang/Thread getId:0x37, isDaemon:true)
> 3XMJAVALTHRCCL sun/misc/Launcher$AppClassLoader(0x00000000E00298C0)
> 3XMTHREADINFO1 (native thread ID:0xDD80, native priority:0x5,
> native policy:UNKNOWN, vmstate:P, vm thread flags:0x000a0081)
> 3XMTHREADINFO2 (native stack address range
> from:0x00007FB85EE4F000, to:0x00007FB85EE8F000, size:0x40000)
> 3XMCPUTIME CPU usage total: 0.025981590 secs, current
> category="Application"
> 3XMTHREADBLOCK Parked on:
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x00000000E0DA3B80
> Owned by: <unknown>
> 3XMHEAPALLOC Heap bytes allocated since last GC cycle=0 (0x0)
> 3XMTHREADINFO3 Java callstack:
> 4XESTACKTRACE at sun/misc/Unsafe.park(Native Method)
> 4XESTACKTRACE at
> java/util/concurrent/locks/LockSupport.parkNanos(LockSupport.java:226)
> 4XESTACKTRACE at
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2089)
> 4XESTACKTRACE at
> java/util/concurrent/LinkedBlockingQueue.poll(LinkedBlockingQueue.java:478)
> 4XESTACKTRACE at
> org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.popPacket(LargeMessageControllerImpl.java:1123)
> 4XESTACKTRACE at
> org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.checkForPacket(LargeMessageControllerImpl.java:1167)
> 4XESTACKTRACE at
> org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.discardUnusedPackets(LargeMessageControllerImpl.java:135)
> 4XESTACKTRACE at
> org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.discardBody(ClientLargeMessageImpl.java:142)
> 4XESTACKTRACE at
> org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1031)
> 4XESTACKTRACE at
> org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.access$400(ClientConsumerImpl.java:49)
> 4XESTACKTRACE at
> org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1129)
> 4XESTACKTRACE at
> org/apache/activemq/artemis/utils/actors/OrderedExecutor.doTask(OrderedExecutor.java:42)
> 4XESTACKTRACE at
> org/apache/activemq/artemis/utils/actors/OrderedExecutor.doTask(OrderedExecutor.java:31)
> 4XESTACKTRACE at
> org/apache/activemq/artemis/utils/actors/ProcessorBase.executePendingTasks(ProcessorBase.java:65)
> 4XESTACKTRACE at
> org/apache/activemq/artemis/utils/actors/ProcessorBase$$Lambda$6/0x00000000e00d1330.run(Bytecode
> PC:4)
> 4XESTACKTRACE at
> java/util/concurrent/ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160(Compiled
> Code))
> 4XESTACKTRACE at
> java/util/concurrent/ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> 4XESTACKTRACE at
> org/apache/activemq/artemis/utils/ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
> 3XMTHREADINFO3 Native callstack:
> 4XENATIVESTACK (0x00007FB863821952 [libj9prt29.so+0x5c952])
> 4XENATIVESTACK (0x00007FB8637EC7E3 [libj9prt29.so+0x277e3])
> 4XENATIVESTACK (0x00007FB863821E4A [libj9prt29.so+0x5ce4a])
> 4XENATIVESTACK (0x00007FB8637EC7E3 [libj9prt29.so+0x277e3])
> 4XENATIVESTACK (0x00007FB8638217E4 [libj9prt29.so+0x5c7e4])
> 4XENATIVESTACK (0x00007FB86381DB3F [libj9prt29.so+0x58b3f])
> 4XENATIVESTACK (0x00007FB8698B7420 [libpthread.so.0+0x14420])
> 4XENATIVESTACK pthread_cond_timedwait+0x271 (0x00007FB8698B27D1
> [libpthread.so.0+0xf7d1])
> 4XENATIVESTACK omrthread_park+0x184 (0x00007FB8680C9454
> [libj9thr29.so+0x7454])
> 4XENATIVESTACK (0x00007FB863D35744 [libj9vm29.so+0xdb744])
> 4XENATIVESTACK (0x00007FB84DC354B7 [<unknown>+0x0]){noformat}
> At that point, the consumer thread has already invoked the application
> "onMessage" callback, but then it blocks causing the entire consumer to block
> and not process any new message for 30 seconds.
> In all cases, no errors or message lost has been detected. Just slowdown due
> to the blocked consumer.
> "Large messages" are disabled client-side, however we noticed that broker
> decides if message has to be converted to large when sending it.
> Broker uses the following method in
> org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl#checkLargeMessage:
> {code:java}
> /** This will check if a regular message needs to be converted as large
> message */
> public static Message checkLargeMessage(Message message, StorageManager
> storageManager) throws Exception {
> if (message.isLargeMessage()) {
> return message; // nothing to be done on this case
> }
> if (message.getEncodeSize() + ESTIMATE_RECORD_TRAIL >
> storageManager.getMaxRecordSize()) {
> return asLargeMessage(message, storageManager);
> } else {
> return message;
> }
> }{code}
> As a workaround, it is possible to reconfigure broker journal to use the
> following configurations, so that {{storageManager.getMaxRecordSize()}}
> always returns a big number, ensuring that message is never converted to
> Large:
> {code:xml}
> <journal-file-size>2147483647</journal-file-size>
> <journal-buffer-size>2147483647</journal-buffer-size>
> {code}
> Not clear to me why broker converts the message to Large even if client does
> not require it.
> And why broker decides this according to the record size of storage, as in
> this scenario only non-persistent messages are used.
> Possibly related to ARTEMIS-3809.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)