Frank Schwarz created ARTEMIS-5418:
--------------------------------------

             Summary: Scheduled messages cannot be rolled back
                 Key: ARTEMIS-5418
                 URL: https://issues.apache.org/jira/browse/ARTEMIS-5418
             Project: ActiveMQ Artemis
          Issue Type: Bug
          Components: Broker
    Affects Versions: 2.33.0
            Reporter: Frank Schwarz


We are seeing NullPointerExceptions in the Artemis broker when rolling back 
messages with a scheduled delivery time.

For the exception to occure reliably {{ConsumerWindowSize}} must be set to 
{{{}0{}}}.

The exception occurs when a message with a scheduled delivery time is rolled 
back while at the same moment a new message without delivery time is committed 
to the queue.

As a consequence, the rolled back message disappears. It is still visible for 
the queue message counter but will not be redelivered until you restart the 
Artemis broker.
{noformat}
2025-04-10 16:04:58,032 WARN  
[org.apache.activemq.artemis.utils.actors.OrderedExecutor] null
java.lang.NullPointerException: null
        at 
org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl$MessageReferenceComparatorSequence.compare(MessageReferenceImpl.java:50)
 ~[artemis-server-2.33.0.jar:2.33.0]
        at 
org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl$MessageReferenceComparatorSequence.compare(MessageReferenceImpl.java:42)
 ~[artemis-server-2.33.0.jar:2.33.0]
        at 
org.apache.activemq.artemis.utils.collections.LinkedListImpl.scanLeft(LinkedListImpl.java:286)
 ~[artemis-commons-2.33.0.jar:2.33.0]
        at 
org.apache.activemq.artemis.utils.collections.LinkedListImpl.addSorted(LinkedListImpl.java:248)
 ~[artemis-commons-2.33.0.jar:2.33.0]
        at 
org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl.addSorted(PriorityLinkedListImpl.java:93)
 ~[artemis-commons-2.33.0.jar:2.33.0]
        at 
org.apache.activemq.artemis.core.server.impl.QueueImpl.internalAddSorted(QueueImpl.java:3065)
 ~[artemis-server-2.33.0.jar:2.33.0]
        at 
org.apache.activemq.artemis.core.server.impl.QueueImpl.addSorted(QueueImpl.java:1167)
 ~[artemis-server-2.33.0.jar:2.33.0]
        at 
org.apache.activemq.artemis.core.server.impl.QueueImpl.addSorted(QueueImpl.java:1202)
 ~[artemis-server-2.33.0.jar:2.33.0]
        at 
org.apache.activemq.artemis.core.server.impl.QueueImpl.postRollback(QueueImpl.java:4227)
 ~[artemis-server-2.33.0.jar:2.33.0]
        at 
org.apache.activemq.artemis.core.server.impl.RefsOperation.afterRollback(RefsOperation.java:131)
 ~[artemis-server-2.33.0.jar:2.33.0]
        at 
org.apache.activemq.artemis.core.transaction.impl.TransactionImpl.afterRollback(TransactionImpl.java:600)
 ~[artemis-server-2.33.0.jar:2.33.0]
        at 
org.apache.activemq.artemis.core.transaction.impl.TransactionImpl$4.done(TransactionImpl.java:436)
 ~[artemis-server-2.33.0.jar:2.33.0]
        at 
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl$1.run(OperationContextImpl.java:320)
 ~[artemis-server-2.33.0.jar:2.33.0]
        at 
org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:57)
 ~[artemis-commons-2.33.0.jar:2.33.0]
        at 
org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:32)
 ~[artemis-commons-2.33.0.jar:2.33.0]
        at 
org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:68)
 ~[artemis-commons-2.33.0.jar:2.33.0]
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 [?:?]
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 [?:?]
        at 
org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
 [artemis-commons-2.33.0.jar:2.33.0]
{noformat}
Here is the test case:
{code:java}
package org.apache.activemq.artemis.tests.integration.xa;

import java.util.HashMap;

import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;

import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Before;
import org.junit.Test;

public class RollbackScheduledMessageTest extends ActiveMQTestBase {

    private static final SimpleString ADDRESS = new 
SimpleString("RollbackScheduledMessageTestq");

    private ActiveMQServer server;

    private ServerLocator sharedLocator;

    private ClientSessionFactory sharedSf;

    @Override
    @Before
    public void setUp() throws Exception {
        super.setUp();
        HashMap<String, AddressSettings> settings = new HashMap<>();
        server = createServer(true, createDefaultInVMConfig(), 10024, 200024, 
settings);
        server.start();
        server.createQueue(new 
QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));
    }

    @Test
    public void testRollbackScheduledMessage() throws Exception {
        sharedLocator = createInVMNonHALocator();
        sharedLocator.setConsumerWindowSize(0);
        sharedSf = createSessionFactory(sharedLocator);

        ClientSession clientSession = 
addClientSession(sharedSf.createSession(true, false, false));

        Xid xid = newXID();
        Xid xid2 = newXID();
        ClientProducer clientProducer = clientSession.createProducer(ADDRESS);

        ClientMessage m1 = createTextMessage(clientSession, "m1");
        ClientMessage m2 = createTextMessage(clientSession, "m2");

        clientSession.start(xid, XAResource.TMNOFLAGS);
        m1.setScheduledDeliveryTime(System.currentTimeMillis() + 100);
        clientProducer.send(m1);
        m2.setScheduledDeliveryTime(System.currentTimeMillis() + 100);
        clientProducer.send(m2);
        clientSession.end(xid, XAResource.TMSUCCESS);
        clientSession.commit(xid, true);

        Xid xidRec = newXID();
        ClientSession recSession = 
addClientSession(sharedSf.createSession(true, false, false));
        recSession.start();
        recSession.start(xidRec, XAResource.TMNOFLAGS);
        ClientConsumer clientConsumer = recSession.createConsumer(ADDRESS);
        ClientMessage message = clientConsumer.receive(500);
        assertNotNull(message);

        // commit a new message while processing the first one
        ClientMessage m3 = createTextMessage(clientSession, "m3");
        clientSession.start(xid2, XAResource.TMNOFLAGS);
        clientProducer.send(m3);
        clientSession.end(xid2, XAResource.TMSUCCESS);
        clientSession.commit(xid2, true);

        // rollback the scheduled message
        recSession.end(xidRec, XAResource.TMFAIL);

        // the rollback is not successful
        recSession.rollback(xidRec);
   }
}
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to