[
https://issues.apache.org/jira/browse/ARTEMIS-5418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Justin Bertram resolved ARTEMIS-5418.
-------------------------------------
Fix Version/s: 2.40.0
Resolution: Duplicate
> Scheduled messages cannot be rolled back
> ----------------------------------------
>
> Key: ARTEMIS-5418
> URL: https://issues.apache.org/jira/browse/ARTEMIS-5418
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Components: Broker
> Affects Versions: 2.33.0
> Reporter: Frank Schwarz
> Priority: Major
> Fix For: 2.40.0
>
>
> We are seeing NullPointerExceptions in the Artemis broker when rolling back
> messages with a scheduled delivery time.
> For the exception to occure reliably {{ConsumerWindowSize}} must be set to
> {{{}0{}}}.
> The exception occurs when a message with a scheduled delivery time is rolled
> back while at the same moment a new message without delivery time is
> committed to the queue.
> As a consequence, the rolled back message disappears. It is still visible for
> the queue message counter but will not be redelivered until you restart the
> Artemis broker.
> {noformat}
> 2025-04-10 16:04:58,032 WARN
> [org.apache.activemq.artemis.utils.actors.OrderedExecutor] null
> java.lang.NullPointerException: null
> at
> org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl$MessageReferenceComparatorSequence.compare(MessageReferenceImpl.java:50)
> ~[artemis-server-2.33.0.jar:2.33.0]
> at
> org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl$MessageReferenceComparatorSequence.compare(MessageReferenceImpl.java:42)
> ~[artemis-server-2.33.0.jar:2.33.0]
> at
> org.apache.activemq.artemis.utils.collections.LinkedListImpl.scanLeft(LinkedListImpl.java:286)
> ~[artemis-commons-2.33.0.jar:2.33.0]
> at
> org.apache.activemq.artemis.utils.collections.LinkedListImpl.addSorted(LinkedListImpl.java:248)
> ~[artemis-commons-2.33.0.jar:2.33.0]
> at
> org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl.addSorted(PriorityLinkedListImpl.java:93)
> ~[artemis-commons-2.33.0.jar:2.33.0]
> at
> org.apache.activemq.artemis.core.server.impl.QueueImpl.internalAddSorted(QueueImpl.java:3065)
> ~[artemis-server-2.33.0.jar:2.33.0]
> at
> org.apache.activemq.artemis.core.server.impl.QueueImpl.addSorted(QueueImpl.java:1167)
> ~[artemis-server-2.33.0.jar:2.33.0]
> at
> org.apache.activemq.artemis.core.server.impl.QueueImpl.addSorted(QueueImpl.java:1202)
> ~[artemis-server-2.33.0.jar:2.33.0]
> at
> org.apache.activemq.artemis.core.server.impl.QueueImpl.postRollback(QueueImpl.java:4227)
> ~[artemis-server-2.33.0.jar:2.33.0]
> at
> org.apache.activemq.artemis.core.server.impl.RefsOperation.afterRollback(RefsOperation.java:131)
> ~[artemis-server-2.33.0.jar:2.33.0]
> at
> org.apache.activemq.artemis.core.transaction.impl.TransactionImpl.afterRollback(TransactionImpl.java:600)
> ~[artemis-server-2.33.0.jar:2.33.0]
> at
> org.apache.activemq.artemis.core.transaction.impl.TransactionImpl$4.done(TransactionImpl.java:436)
> ~[artemis-server-2.33.0.jar:2.33.0]
> at
> org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl$1.run(OperationContextImpl.java:320)
> ~[artemis-server-2.33.0.jar:2.33.0]
> at
> org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:57)
> ~[artemis-commons-2.33.0.jar:2.33.0]
> at
> org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:32)
> ~[artemis-commons-2.33.0.jar:2.33.0]
> at
> org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:68)
> ~[artemis-commons-2.33.0.jar:2.33.0]
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> [?:?]
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> [?:?]
> at
> org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
> [artemis-commons-2.33.0.jar:2.33.0]
> {noformat}
> Here is the test case:
> {code:java}
> package org.apache.activemq.artemis.tests.integration.xa;
> import java.util.HashMap;
> import javax.transaction.xa.XAResource;
> import javax.transaction.xa.Xid;
> import org.apache.activemq.artemis.api.core.QueueConfiguration;
> import org.apache.activemq.artemis.api.core.RoutingType;
> import org.apache.activemq.artemis.api.core.SimpleString;
> import org.apache.activemq.artemis.api.core.client.ClientConsumer;
> import org.apache.activemq.artemis.api.core.client.ClientMessage;
> import org.apache.activemq.artemis.api.core.client.ClientProducer;
> import org.apache.activemq.artemis.api.core.client.ClientSession;
> import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
> import org.apache.activemq.artemis.api.core.client.ServerLocator;
> import org.apache.activemq.artemis.core.server.ActiveMQServer;
> import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
> import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
> import org.junit.Before;
> import org.junit.Test;
> public class RollbackScheduledMessageTest extends ActiveMQTestBase {
> private static final SimpleString ADDRESS = new
> SimpleString("RollbackScheduledMessageTestq");
> private ActiveMQServer server;
> private ServerLocator sharedLocator;
> private ClientSessionFactory sharedSf;
> @Override
> @Before
> public void setUp() throws Exception {
> super.setUp();
> HashMap<String, AddressSettings> settings = new HashMap<>();
> server = createServer(true, createDefaultInVMConfig(), 10024, 200024,
> settings);
> server.start();
> server.createQueue(new
> QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));
> }
> @Test
> public void testRollbackScheduledMessage() throws Exception {
> sharedLocator = createInVMNonHALocator();
> sharedLocator.setConsumerWindowSize(0);
> sharedSf = createSessionFactory(sharedLocator);
> ClientSession clientSession =
> addClientSession(sharedSf.createSession(true, false, false));
> Xid xid = newXID();
> Xid xid2 = newXID();
> ClientProducer clientProducer = clientSession.createProducer(ADDRESS);
> ClientMessage m1 = createTextMessage(clientSession, "m1");
> ClientMessage m2 = createTextMessage(clientSession, "m2");
> clientSession.start(xid, XAResource.TMNOFLAGS);
> m1.setScheduledDeliveryTime(System.currentTimeMillis() + 100);
> clientProducer.send(m1);
> m2.setScheduledDeliveryTime(System.currentTimeMillis() + 100);
> clientProducer.send(m2);
> clientSession.end(xid, XAResource.TMSUCCESS);
> clientSession.commit(xid, true);
> Xid xidRec = newXID();
> ClientSession recSession =
> addClientSession(sharedSf.createSession(true, false, false));
> recSession.start();
> recSession.start(xidRec, XAResource.TMNOFLAGS);
> ClientConsumer clientConsumer = recSession.createConsumer(ADDRESS);
> ClientMessage message = clientConsumer.receive(500);
> assertNotNull(message);
> // commit a new message while processing the first one
> ClientMessage m3 = createTextMessage(clientSession, "m3");
> clientSession.start(xid2, XAResource.TMNOFLAGS);
> clientProducer.send(m3);
> clientSession.end(xid2, XAResource.TMSUCCESS);
> clientSession.commit(xid2, true);
> // rollback the scheduled message
> recSession.end(xidRec, XAResource.TMFAIL);
> // the rollback is not successful
> recSession.rollback(xidRec);
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact