Frank Schwarz created ARTEMIS-5418:
--------------------------------------
Summary: Scheduled messages cannot be rolled back
Key: ARTEMIS-5418
URL: https://issues.apache.org/jira/browse/ARTEMIS-5418
Project: ActiveMQ Artemis
Issue Type: Bug
Components: Broker
Affects Versions: 2.33.0
Reporter: Frank Schwarz
We are seeing NullPointerExceptions in the Artemis broker when rolling back
messages with a scheduled delivery time.
For the exception to occure reliably {{ConsumerWindowSize}} must be set to
{{{}0{}}}.
The exception occurs when a message with a scheduled delivery time is rolled
back while at the same moment a new message without delivery time is committed
to the queue.
As a consequence, the rolled back message disappears. It is still visible for
the queue message counter but will not be redelivered until you restart the
Artemis broker.
{noformat}
2025-04-10 16:04:58,032 WARN
[org.apache.activemq.artemis.utils.actors.OrderedExecutor] null
java.lang.NullPointerException: null
at
org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl$MessageReferenceComparatorSequence.compare(MessageReferenceImpl.java:50)
~[artemis-server-2.33.0.jar:2.33.0]
at
org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl$MessageReferenceComparatorSequence.compare(MessageReferenceImpl.java:42)
~[artemis-server-2.33.0.jar:2.33.0]
at
org.apache.activemq.artemis.utils.collections.LinkedListImpl.scanLeft(LinkedListImpl.java:286)
~[artemis-commons-2.33.0.jar:2.33.0]
at
org.apache.activemq.artemis.utils.collections.LinkedListImpl.addSorted(LinkedListImpl.java:248)
~[artemis-commons-2.33.0.jar:2.33.0]
at
org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl.addSorted(PriorityLinkedListImpl.java:93)
~[artemis-commons-2.33.0.jar:2.33.0]
at
org.apache.activemq.artemis.core.server.impl.QueueImpl.internalAddSorted(QueueImpl.java:3065)
~[artemis-server-2.33.0.jar:2.33.0]
at
org.apache.activemq.artemis.core.server.impl.QueueImpl.addSorted(QueueImpl.java:1167)
~[artemis-server-2.33.0.jar:2.33.0]
at
org.apache.activemq.artemis.core.server.impl.QueueImpl.addSorted(QueueImpl.java:1202)
~[artemis-server-2.33.0.jar:2.33.0]
at
org.apache.activemq.artemis.core.server.impl.QueueImpl.postRollback(QueueImpl.java:4227)
~[artemis-server-2.33.0.jar:2.33.0]
at
org.apache.activemq.artemis.core.server.impl.RefsOperation.afterRollback(RefsOperation.java:131)
~[artemis-server-2.33.0.jar:2.33.0]
at
org.apache.activemq.artemis.core.transaction.impl.TransactionImpl.afterRollback(TransactionImpl.java:600)
~[artemis-server-2.33.0.jar:2.33.0]
at
org.apache.activemq.artemis.core.transaction.impl.TransactionImpl$4.done(TransactionImpl.java:436)
~[artemis-server-2.33.0.jar:2.33.0]
at
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl$1.run(OperationContextImpl.java:320)
~[artemis-server-2.33.0.jar:2.33.0]
at
org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:57)
~[artemis-commons-2.33.0.jar:2.33.0]
at
org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:32)
~[artemis-commons-2.33.0.jar:2.33.0]
at
org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:68)
~[artemis-commons-2.33.0.jar:2.33.0]
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[?:?]
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[?:?]
at
org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
[artemis-commons-2.33.0.jar:2.33.0]
{noformat}
Here is the test case:
{code:java}
package org.apache.activemq.artemis.tests.integration.xa;
import java.util.HashMap;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Before;
import org.junit.Test;
public class RollbackScheduledMessageTest extends ActiveMQTestBase {
private static final SimpleString ADDRESS = new
SimpleString("RollbackScheduledMessageTestq");
private ActiveMQServer server;
private ServerLocator sharedLocator;
private ClientSessionFactory sharedSf;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
HashMap<String, AddressSettings> settings = new HashMap<>();
server = createServer(true, createDefaultInVMConfig(), 10024, 200024,
settings);
server.start();
server.createQueue(new
QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));
}
@Test
public void testRollbackScheduledMessage() throws Exception {
sharedLocator = createInVMNonHALocator();
sharedLocator.setConsumerWindowSize(0);
sharedSf = createSessionFactory(sharedLocator);
ClientSession clientSession =
addClientSession(sharedSf.createSession(true, false, false));
Xid xid = newXID();
Xid xid2 = newXID();
ClientProducer clientProducer = clientSession.createProducer(ADDRESS);
ClientMessage m1 = createTextMessage(clientSession, "m1");
ClientMessage m2 = createTextMessage(clientSession, "m2");
clientSession.start(xid, XAResource.TMNOFLAGS);
m1.setScheduledDeliveryTime(System.currentTimeMillis() + 100);
clientProducer.send(m1);
m2.setScheduledDeliveryTime(System.currentTimeMillis() + 100);
clientProducer.send(m2);
clientSession.end(xid, XAResource.TMSUCCESS);
clientSession.commit(xid, true);
Xid xidRec = newXID();
ClientSession recSession =
addClientSession(sharedSf.createSession(true, false, false));
recSession.start();
recSession.start(xidRec, XAResource.TMNOFLAGS);
ClientConsumer clientConsumer = recSession.createConsumer(ADDRESS);
ClientMessage message = clientConsumer.receive(500);
assertNotNull(message);
// commit a new message while processing the first one
ClientMessage m3 = createTextMessage(clientSession, "m3");
clientSession.start(xid2, XAResource.TMNOFLAGS);
clientProducer.send(m3);
clientSession.end(xid2, XAResource.TMSUCCESS);
clientSession.commit(xid2, true);
// rollback the scheduled message
recSession.end(xidRec, XAResource.TMFAIL);
// the rollback is not successful
recSession.rollback(xidRec);
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact