[ 
https://issues.apache.org/jira/browse/ARTEMIS-4240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17712616#comment-17712616
 ] 

Apache Dev edited comment on ARTEMIS-4240 at 4/15/23 9:12 AM:
--------------------------------------------------------------

I reproduced the issue using Artemis broker/client version 2.28.0.
Reproducer is deterministic for me.
This sample ouput shows that messages are processed randomly with a delay of 30 
seconds:
{code:java}
Messages sent: 10

Message received
Message received. Elapsed from previous message: 137 ms
Message received. Elapsed from previous message: 30001 ms
Message received. Elapsed from previous message: 7 ms
Message received. Elapsed from previous message: 30002 ms
Message received. Elapsed from previous message: 104 ms
Message received. Elapsed from previous message: 30001 ms
Message received. Elapsed from previous message: 6 ms
Message received. Elapsed from previous message: 30001 ms
Message received. Elapsed from previous message: 110 ms
{code}
This is the code:
{code:java}
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

public class Main {

    private static final String BROKER_URL = 
"tcp://localhost:61616?minLargeMessageSize=2147483647";
    private static final String QUEUE = "testQueue";

    private static final int MESSAGE_SIZE_BYTES = 1024 * 1024;
    private static final int NUMBER_OF_MESSAGES = 10;

    private static final ExecutorService executor = 
Executors.newCachedThreadPool();

    private static final CountDownLatch completionLatch = new 
CountDownLatch(NUMBER_OF_MESSAGES);

    public static void main(String[] args) throws Exception {

        // create large payload
        char[] chars = new char[MESSAGE_SIZE_BYTES];
        Arrays.fill(chars, '0');
        String largePayload = new String(chars);

        // create connection
        ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(BROKER_URL);
        Connection connection = connectionFactory.createConnection();

        // register a single consumer that processes messages using 
ExecutorService
        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue(QUEUE);
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {

            final AtomicLong lastProcessedTimestamp = new AtomicLong();

            @Override
            public void onMessage(Message message) {
                long now = System.currentTimeMillis();
                if (lastProcessedTimestamp.compareAndSet(0, now)) {
                    System.out.println("Message received");
                } else {
                    long before = lastProcessedTimestamp.getAndSet(now);
                    System.out.printf("Message received. Elapsed from previous 
message: %d ms%n", now - before);
                }

                // process message asynchronously
                executor.execute(new Runnable() {

                    @Override
                    public void run() {
                        // here we read the message
                        try {
                            BytesMessage bytesMessage = (BytesMessage) message;
                            byte[] buff = new byte[(int) 
bytesMessage.getBodyLength()];
                            bytesMessage.readBytes(buff);
                        } catch (JMSException e) {
                            throw new RuntimeException(e);
                        }
                        completionLatch.countDown();
                    }
                });
            }
        });

        // send messages
        for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
            Session sessionSend = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
            Destination destinationSend = sessionSend.createQueue(QUEUE);
            MessageProducer producer = 
sessionSend.createProducer(destinationSend);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            BytesMessage message = sessionSend.createBytesMessage();
            message.writeBytes(largePayload.getBytes());
            producer.send(message);
        }
        System.out.printf("Messages sent: %d%n%n", NUMBER_OF_MESSAGES);

        connection.start();
        completionLatch.await();
        connection.close();
        executor.shutdown();
    }
}
{code}


was (Author: apachedev):
I reproduced the issue using Artemis broker/client version 2.28.0.
Reproducer is deterministic for me.
This sample ouput shows that messages are processed randomly with a delay of 30 
seconds:

{code:java}
Messages sent: 10

Message received
Message received. Elapsed from previous message: 137 ms
Message received. Elapsed from previous message: 30001 ms
Message received. Elapsed from previous message: 7 ms
Message received. Elapsed from previous message: 30002 ms
Message received. Elapsed from previous message: 104 ms
Message received. Elapsed from previous message: 30001 ms
Message received. Elapsed from previous message: 6 ms
Message received. Elapsed from previous message: 30001 ms
Message received. Elapsed from previous message: 110 ms
{code}

This is the code:

{code:java}
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

public class Main {

    private static final String BROKER_URL = 
"tcp://localhost:61616?minLargeMessageSize=2147483647";
    private static final String QUEUE = "testQueue";

    private static final int MESSAGE_SIZE_BYTES = 1024 * 1024;
    private static final int NUMBER_OF_MESSAGES = 10;

    private static final ExecutorService executor = 
Executors.newCachedThreadPool();

    private static final CountDownLatch completionLatch = new 
CountDownLatch(NUMBER_OF_MESSAGES);

    public static void main(String[] args) throws Exception {

        // create large payload
        char[] chars = new char[MESSAGE_SIZE_BYTES];
        Arrays.fill(chars, '0');
        String largePayload = new String(chars);

        // create connection
        ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(BROKER_URL);
        Connection connection = connectionFactory.createConnection();

        // register a single consumer that processes messages using 
ExecutorService
        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue(QUEUE);
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {

            final AtomicLong lastProcessedTimestamp = new AtomicLong();

            @Override
            public void onMessage(Message message) {
                long now = System.currentTimeMillis();
                if (lastProcessedTimestamp.compareAndSet(0, now)) {
                    System.out.println("Message received");
                } else {
                    long before = lastProcessedTimestamp.getAndSet(now);
                    System.out.printf("Message received. Elapsed from previous 
message: %d ms%n", now - before);
                }

                // process message asynchronously
                executor.execute(new Runnable() {

                    @Override
                    public void run() {
                        // here we read the message
                        try {
                            BytesMessage bytesMessage = (BytesMessage) message;
                            byte[] buff = new byte[(int) 
bytesMessage.getBodyLength()];
                            bytesMessage.readBytes(buff);
                        } catch (JMSException e) {
                            throw new RuntimeException(e);
                        }
                        completionLatch.countDown();
                    }
                });
            }
        });

        // send two messages
        for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
            Session sessionSend = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
            Destination destinationSend = sessionSend.createQueue(QUEUE);
            MessageProducer producer = 
sessionSend.createProducer(destinationSend);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            BytesMessage message = sessionSend.createBytesMessage();
            message.writeBytes(largePayload.getBytes());
            producer.send(message);
        }
        System.out.printf("Messages sent: %d%n%n", NUMBER_OF_MESSAGES);

        connection.start();
        completionLatch.await();
        connection.close();
        executor.shutdown();
    }
}
{code}

> Consumer stuck handling Large Message
> -------------------------------------
>
>                 Key: ARTEMIS-4240
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4240
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>          Components: Broker, JMS
>    Affects Versions: 2.19.1, 2.28.0
>            Reporter: Apache Dev
>            Priority: Critical
>
> In this scenario:
>  * "core" protocol
>  * JMS consumer APIs
>  * non-persistent messaging
>  * client connection configured with {{minLargeMessageSize=2147483647}} in 
> order to disable Large Messages
> a consumer receives correctly all messages having a small size.
> But when a message > 1Mib is received, the consumer thread is stuck for 30 
> seconds, with this stack:
> {noformat}
> 3XMTHREADINFO      "Thread-3 (ActiveMQ-client-global-threads)" 
> J9VMThread:0x00000000006D9300, omrthread_t:0x00007FB7A002C248, 
> java/lang/Thread:0x00000000E0BD22F8, state:P, prio=5
> 3XMJAVALTHREAD            (java/lang/Thread getId:0x37, isDaemon:true)
> 3XMJAVALTHRCCL            sun/misc/Launcher$AppClassLoader(0x00000000E00298C0)
> 3XMTHREADINFO1            (native thread ID:0xDD80, native priority:0x5, 
> native policy:UNKNOWN, vmstate:P, vm thread flags:0x000a0081)
> 3XMTHREADINFO2            (native stack address range 
> from:0x00007FB85EE4F000, to:0x00007FB85EE8F000, size:0x40000)
> 3XMCPUTIME               CPU usage total: 0.025981590 secs, current 
> category="Application"
> 3XMTHREADBLOCK     Parked on: 
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x00000000E0DA3B80
>  Owned by: <unknown>
> 3XMHEAPALLOC             Heap bytes allocated since last GC cycle=0 (0x0)
> 3XMTHREADINFO3           Java callstack:
> 4XESTACKTRACE                at sun/misc/Unsafe.park(Native Method)
> 4XESTACKTRACE                at 
> java/util/concurrent/locks/LockSupport.parkNanos(LockSupport.java:226)
> 4XESTACKTRACE                at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2089)
> 4XESTACKTRACE                at 
> java/util/concurrent/LinkedBlockingQueue.poll(LinkedBlockingQueue.java:478)
> 4XESTACKTRACE                at 
> org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.popPacket(LargeMessageControllerImpl.java:1123)
> 4XESTACKTRACE                at 
> org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.checkForPacket(LargeMessageControllerImpl.java:1167)
> 4XESTACKTRACE                at 
> org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.discardUnusedPackets(LargeMessageControllerImpl.java:135)
> 4XESTACKTRACE                at 
> org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.discardBody(ClientLargeMessageImpl.java:142)
> 4XESTACKTRACE                at 
> org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1031)
> 4XESTACKTRACE                at 
> org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.access$400(ClientConsumerImpl.java:49)
> 4XESTACKTRACE                at 
> org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1129)
> 4XESTACKTRACE                at 
> org/apache/activemq/artemis/utils/actors/OrderedExecutor.doTask(OrderedExecutor.java:42)
> 4XESTACKTRACE                at 
> org/apache/activemq/artemis/utils/actors/OrderedExecutor.doTask(OrderedExecutor.java:31)
> 4XESTACKTRACE                at 
> org/apache/activemq/artemis/utils/actors/ProcessorBase.executePendingTasks(ProcessorBase.java:65)
> 4XESTACKTRACE                at 
> org/apache/activemq/artemis/utils/actors/ProcessorBase$$Lambda$6/0x00000000e00d1330.run(Bytecode
>  PC:4)
> 4XESTACKTRACE                at 
> java/util/concurrent/ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160(Compiled
>  Code))
> 4XESTACKTRACE                at 
> java/util/concurrent/ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> 4XESTACKTRACE                at 
> org/apache/activemq/artemis/utils/ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
> 3XMTHREADINFO3           Native callstack:
> 4XENATIVESTACK                (0x00007FB863821952 [libj9prt29.so+0x5c952])
> 4XENATIVESTACK                (0x00007FB8637EC7E3 [libj9prt29.so+0x277e3])
> 4XENATIVESTACK                (0x00007FB863821E4A [libj9prt29.so+0x5ce4a])
> 4XENATIVESTACK                (0x00007FB8637EC7E3 [libj9prt29.so+0x277e3])
> 4XENATIVESTACK                (0x00007FB8638217E4 [libj9prt29.so+0x5c7e4])
> 4XENATIVESTACK                (0x00007FB86381DB3F [libj9prt29.so+0x58b3f])
> 4XENATIVESTACK                (0x00007FB8698B7420 [libpthread.so.0+0x14420])
> 4XENATIVESTACK               pthread_cond_timedwait+0x271 (0x00007FB8698B27D1 
> [libpthread.so.0+0xf7d1])
> 4XENATIVESTACK               omrthread_park+0x184 (0x00007FB8680C9454 
> [libj9thr29.so+0x7454])
> 4XENATIVESTACK                (0x00007FB863D35744 [libj9vm29.so+0xdb744])
> 4XENATIVESTACK                (0x00007FB84DC354B7 [<unknown>+0x0]){noformat} 
> At that point, the consumer thread has already invoked the application 
> "onMessage" callback, but then it blocks causing the entire consumer to block 
> and not process any new message for 30 seconds.
> In all cases, no errors or message lost has been detected. Just slowdown due 
> to the blocked consumer.
> "Large messages" are disabled client-side, however we noticed that broker 
> decides if message has to be converted to large when sending it.
> Broker uses the following method in 
> org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl#checkLargeMessage:
> {code:java}
>    /** This will check if a regular message needs to be converted as large 
> message */
>    public static Message checkLargeMessage(Message message, StorageManager 
> storageManager) throws Exception {
>       if (message.isLargeMessage()) {
>          return message; // nothing to be done on this case
>       }
>       if (message.getEncodeSize() + ESTIMATE_RECORD_TRAIL > 
> storageManager.getMaxRecordSize()) {
>          return asLargeMessage(message, storageManager);
>       } else {
>          return message;
>       }
>    }{code}
> As a workaround, it is possible to reconfigure broker journal to use the 
> following configurations, so that {{storageManager.getMaxRecordSize()}} 
> always returns a big number, ensuring that message is never converted to 
> Large:
> {code:xml}
>         <journal-file-size>2147483647</journal-file-size>
>         <journal-buffer-size>2147483647</journal-buffer-size>
> {code}
> Not clear to me why broker converts the message to Large even if client does 
> not require it.
> And why broker decides this according to the record size of storage, as in 
> this scenario only non-persistent messages are used. 
> Possibly related to ARTEMIS-3809.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to