Viliam Durina created ARTEMIS-2546:
--------------------------------------

             Summary: Message loss when consumers consume and disapper without 
acknowledging
                 Key: ARTEMIS-2546
                 URL: https://issues.apache.org/jira/browse/ARTEMIS-2546
             Project: ActiveMQ Artemis
          Issue Type: Bug
    Affects Versions: 2.10.1
         Environment: I'm on Windows 10, Oracle JVM 11.0.1
            Reporter: Viliam Durina


I do a stress testing of my product where I check that I commit messages 
properly and that I get exactly-once delivery. I have multiple parallel 
consumers of a queue and I restart them and check for correct results. But 
occasionally I miss the initial messages. By checking the logs I found out that 
those were never returned from the `MessageConsumer.receive()` method, so I 
thought it's not me incorrectly committing and rolling back. I managed to write 
a standalone test that reproduces this issue. On my machine it fails in about 
50% of cases, but I surmise that it might be micro-timing dependent, so you 
might tweak sleep times and what not.

The test code:

{noformat}
import org.apache.activemq.artemis.jms.client.ActiveMQXAConnectionFactory;
import org.apache.activemq.artemis.junit.EmbeddedActiveMQResource;
import org.junit.ClassRule;
import org.junit.Test;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.XAConnection;
import javax.jms.XAConnectionFactory;
import javax.jms.XASession;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.util.Arrays;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static javax.jms.Session.AUTO_ACKNOWLEDGE;
import static org.junit.Assert.assertTrue;

public class ArtemisTest_NoInterrupts {

    @ClassRule
    public static EmbeddedActiveMQResource resource = new 
EmbeddedActiveMQResource();

    private volatile boolean receivedMsg0;

    @Test
    public void test() throws InterruptedException {
        AtomicBoolean driverShutdownFlag = new AtomicBoolean();
        Thread consumerDriverThread = new Thread(() -> 
consumerDriver(driverShutdownFlag));
        Thread producerThread = new Thread(this::producer);

        producerThread.start();
        consumerDriverThread.start();

        producerThread.join();
        driverShutdownFlag.set(true);
        consumerDriverThread.join();

        assertTrue("msg-0 not received", receivedMsg0);
    }

    private void consumerDriver(AtomicBoolean driverShutdownFlag) {
        try {
            Xid xid1 = new MyXid(new byte[] {1});
            Xid xid2 = new MyXid(new byte[] {2});
            for (int id = 0; !driverShutdownFlag.get(); id++) {
                int finalId = id;
                Thread thread1 = null;
                Thread thread2 = null;
                try {
                    AtomicBoolean shutdownFlag = new AtomicBoolean();
                    thread1 = new Thread(() -> consumer(xid1, finalId, 
shutdownFlag));
                    thread2 = new Thread(() -> consumer(xid2, finalId, 
shutdownFlag));
                    thread1.start();
                    thread2.start();
                    Thread.sleep(15 + id < 4 ? 0 : 
ThreadLocalRandom.current().nextInt(100));
                    shutdownFlag.set(true);
                } finally {
                    if (thread1 != null) {
                        thread1.join();
                    }
                    if (thread2 != null) {
                        thread2.join();
                    }
                }
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void consumer(Xid xid, int id, AtomicBoolean shutdownFlag) {
        try (
                XAConnection conn = getConnectionFactory().createXAConnection();
                XASession sess = conn.createXASession()
        ) {
            conn.start();
            XAResource res = sess.getXAResource();

            // rollback the xid first
            try {
                res.rollback(xid);
                System.out.println("consumer " + id + " rolled back, xid=" + 
xid);
            } catch (XAException e) {
                if (e.errorCode == XAException.XAER_NOTA) {
                    System.out.println("consumer " + id + " ignoring rollback, 
XAER_NOTA, xid=" + xid);
                }
                else throw e;
            }

            res.start(xid, XAResource.TMNOFLAGS);
            MessageConsumer consumer = 
sess.createConsumer(sess.createQueue("queue"));
            for (int msgCnt = 0; !shutdownFlag.get(); ) {
                TextMessage msg = (TextMessage) consumer.receiveNoWait();
                if (msg == null) {
                    continue;
                }
                msgCnt++;
                if (msg.getText().equals("msg-0")) {
                    receivedMsg0 = true;
                }
                System.out.println("consumer " + id + " received: " + 
msg.getText() + ", xid=" + xid);
                if (msgCnt % 10 == 9) {
                    res.end(xid, XAResource.TMSUCCESS);
                    res.prepare(xid);
                    System.out.println("consumer " + id + " going to commit..., 
xid=" + xid);
                    res.commit(xid, false);
                    System.out.println("consumer " + id + " committed, xid=" + 
xid);
                    res.start(xid, XAResource.TMNOFLAGS);
                }
            }
            System.out.println("consumer " + id + " shut down, xid=" + xid);
        } catch (XAException e) {
            throw new RuntimeException("XAException, errorCode=" + e.errorCode, 
e);
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
    }

    private void producer() {
        try (
                Connection conn = ((ConnectionFactory) 
getConnectionFactory()).createConnection();
                Session session = conn.createSession(false, AUTO_ACKNOWLEDGE);
                MessageProducer producer = 
session.createProducer(session.createQueue("queue"))
        ) {
            long startTime = System.nanoTime();
            for (int i = 0; i < 1_000; i++) {
                producer.send(session.createTextMessage("msg-" + i));
                Thread.sleep(Math.max(0, i - 
NANOSECONDS.toMillis(System.nanoTime() - startTime)));
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static XAConnectionFactory getConnectionFactory() {
        return new ActiveMQXAConnectionFactory(resource.getVmURL());
    }

    private static final class MyXid implements Xid {

        private final byte[] gtrid;

        private MyXid(byte[] gtrid) {
            this.gtrid = gtrid;
        }

        @Override
        public int getFormatId() {
            return 9999;
        }

        @Override
        public byte[] getGlobalTransactionId() {
            return gtrid;
        }

        @Override
        public byte[] getBranchQualifier() {
            return new byte[0];
        }

        @Override
        public String toString() {
            return Arrays.toString(gtrid);
        }
    }
}
{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to