Viliam Durina created ARTEMIS-2546:
--------------------------------------
Summary: Message loss when consumers consume and disapper without
acknowledging
Key: ARTEMIS-2546
URL: https://issues.apache.org/jira/browse/ARTEMIS-2546
Project: ActiveMQ Artemis
Issue Type: Bug
Affects Versions: 2.10.1
Environment: I'm on Windows 10, Oracle JVM 11.0.1
Reporter: Viliam Durina
I do a stress testing of my product where I check that I commit messages
properly and that I get exactly-once delivery. I have multiple parallel
consumers of a queue and I restart them and check for correct results. But
occasionally I miss the initial messages. By checking the logs I found out that
those were never returned from the `MessageConsumer.receive()` method, so I
thought it's not me incorrectly committing and rolling back. I managed to write
a standalone test that reproduces this issue. On my machine it fails in about
50% of cases, but I surmise that it might be micro-timing dependent, so you
might tweak sleep times and what not.
The test code:
{noformat}
import org.apache.activemq.artemis.jms.client.ActiveMQXAConnectionFactory;
import org.apache.activemq.artemis.junit.EmbeddedActiveMQResource;
import org.junit.ClassRule;
import org.junit.Test;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.XAConnection;
import javax.jms.XAConnectionFactory;
import javax.jms.XASession;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.util.Arrays;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static javax.jms.Session.AUTO_ACKNOWLEDGE;
import static org.junit.Assert.assertTrue;
public class ArtemisTest_NoInterrupts {
@ClassRule
public static EmbeddedActiveMQResource resource = new
EmbeddedActiveMQResource();
private volatile boolean receivedMsg0;
@Test
public void test() throws InterruptedException {
AtomicBoolean driverShutdownFlag = new AtomicBoolean();
Thread consumerDriverThread = new Thread(() ->
consumerDriver(driverShutdownFlag));
Thread producerThread = new Thread(this::producer);
producerThread.start();
consumerDriverThread.start();
producerThread.join();
driverShutdownFlag.set(true);
consumerDriverThread.join();
assertTrue("msg-0 not received", receivedMsg0);
}
private void consumerDriver(AtomicBoolean driverShutdownFlag) {
try {
Xid xid1 = new MyXid(new byte[] {1});
Xid xid2 = new MyXid(new byte[] {2});
for (int id = 0; !driverShutdownFlag.get(); id++) {
int finalId = id;
Thread thread1 = null;
Thread thread2 = null;
try {
AtomicBoolean shutdownFlag = new AtomicBoolean();
thread1 = new Thread(() -> consumer(xid1, finalId,
shutdownFlag));
thread2 = new Thread(() -> consumer(xid2, finalId,
shutdownFlag));
thread1.start();
thread2.start();
Thread.sleep(15 + id < 4 ? 0 :
ThreadLocalRandom.current().nextInt(100));
shutdownFlag.set(true);
} finally {
if (thread1 != null) {
thread1.join();
}
if (thread2 != null) {
thread2.join();
}
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void consumer(Xid xid, int id, AtomicBoolean shutdownFlag) {
try (
XAConnection conn = getConnectionFactory().createXAConnection();
XASession sess = conn.createXASession()
) {
conn.start();
XAResource res = sess.getXAResource();
// rollback the xid first
try {
res.rollback(xid);
System.out.println("consumer " + id + " rolled back, xid=" +
xid);
} catch (XAException e) {
if (e.errorCode == XAException.XAER_NOTA) {
System.out.println("consumer " + id + " ignoring rollback,
XAER_NOTA, xid=" + xid);
}
else throw e;
}
res.start(xid, XAResource.TMNOFLAGS);
MessageConsumer consumer =
sess.createConsumer(sess.createQueue("queue"));
for (int msgCnt = 0; !shutdownFlag.get(); ) {
TextMessage msg = (TextMessage) consumer.receiveNoWait();
if (msg == null) {
continue;
}
msgCnt++;
if (msg.getText().equals("msg-0")) {
receivedMsg0 = true;
}
System.out.println("consumer " + id + " received: " +
msg.getText() + ", xid=" + xid);
if (msgCnt % 10 == 9) {
res.end(xid, XAResource.TMSUCCESS);
res.prepare(xid);
System.out.println("consumer " + id + " going to commit...,
xid=" + xid);
res.commit(xid, false);
System.out.println("consumer " + id + " committed, xid=" +
xid);
res.start(xid, XAResource.TMNOFLAGS);
}
}
System.out.println("consumer " + id + " shut down, xid=" + xid);
} catch (XAException e) {
throw new RuntimeException("XAException, errorCode=" + e.errorCode,
e);
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
private void producer() {
try (
Connection conn = ((ConnectionFactory)
getConnectionFactory()).createConnection();
Session session = conn.createSession(false, AUTO_ACKNOWLEDGE);
MessageProducer producer =
session.createProducer(session.createQueue("queue"))
) {
long startTime = System.nanoTime();
for (int i = 0; i < 1_000; i++) {
producer.send(session.createTextMessage("msg-" + i));
Thread.sleep(Math.max(0, i -
NANOSECONDS.toMillis(System.nanoTime() - startTime)));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static XAConnectionFactory getConnectionFactory() {
return new ActiveMQXAConnectionFactory(resource.getVmURL());
}
private static final class MyXid implements Xid {
private final byte[] gtrid;
private MyXid(byte[] gtrid) {
this.gtrid = gtrid;
}
@Override
public int getFormatId() {
return 9999;
}
@Override
public byte[] getGlobalTransactionId() {
return gtrid;
}
@Override
public byte[] getBranchQualifier() {
return new byte[0];
}
@Override
public String toString() {
return Arrays.toString(gtrid);
}
}
}
{noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)