Repository: activemq
Updated Branches:
  refs/heads/activemq-5.13.x 53c9d7ecb -> 2ec32b67a


https://issues.apache.org/jira/browse/AMQ-5426

Fixing a race condition in ActiveMQMessageConsumer that could cause a
NPE when the consumer is closing

Thanks to Michael Wong for providing the test case for this issue.

(cherry picked from commit 6bfa13b6e707fb3465a9193cd44c478514fcd948)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2ec32b67
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2ec32b67
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2ec32b67

Branch: refs/heads/activemq-5.13.x
Commit: 2ec32b67af3f494d65a076b32f71560168bb6ec9
Parents: 53c9d7e
Author: Christopher L. Shannon (cshannon) <[email protected]>
Authored: Tue Jul 5 19:32:17 2016 +0000
Committer: Christopher L. Shannon (cshannon) <[email protected]>
Committed: Tue Jul 5 20:08:07 2016 +0000

----------------------------------------------------------------------
 .../activemq/ActiveMQMessageConsumer.java       |  42 ++--
 .../org/apache/activemq/bugs/AMQ5426Test.java   | 227 +++++++++++++++++++
 2 files changed, 251 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/2ec32b67/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
 
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index 90ca7c2..f2a78da 100755
--- 
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -147,7 +147,7 @@ public class ActiveMQMessageConsumer implements 
MessageAvailableConsumer, StatsC
     private boolean clearDeliveredList;
     AtomicInteger inProgressClearRequiredFlag = new AtomicInteger(0);
 
-    private MessageAck pendingAck;
+    private volatile MessageAck pendingAck;
     private long lastDeliveredSequenceId = -1;
 
     private IOException failureError;
@@ -780,6 +780,9 @@ public class ActiveMQMessageConsumer implements 
MessageAvailableConsumer, StatsC
     void deliverAcks() {
         MessageAck ack = null;
         if (deliveryingAcknowledgements.compareAndSet(false, true)) {
+            //Capture the pendingAck reference in case the optimizeAcknowledge 
dispatch
+            //thread mutates it
+            final MessageAck oldPendingAck = pendingAck;
             if (isAutoAcknowledgeEach()) {
                 synchronized(deliveredMessages) {
                     ack = 
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
@@ -787,12 +790,12 @@ public class ActiveMQMessageConsumer implements 
MessageAvailableConsumer, StatsC
                         deliveredMessages.clear();
                         ackCounter = 0;
                     } else {
-                        ack = pendingAck;
+                        ack = oldPendingAck;
                         pendingAck = null;
                     }
                 }
-            } else if (pendingAck != null && pendingAck.isStandardAck()) {
-                ack = pendingAck;
+            } else if (oldPendingAck != null && oldPendingAck.isStandardAck()) 
{
+                ack = oldPendingAck;
                 pendingAck = null;
             }
             if (ack != null) {
@@ -971,8 +974,9 @@ public class ActiveMQMessageConsumer implements 
MessageAvailableConsumer, StatsC
                                     // we won't sent standard acks with every 
msg just
                                     // because the deliveredCounter just below
                                     // 0.5 * prefetch as used in ackLater()
-                                    if (pendingAck != null && deliveredCounter 
> 0) {
-                                        session.sendAck(pendingAck);
+                                    final MessageAck oldPendingAck = 
pendingAck;
+                                    if (oldPendingAck != null && 
deliveredCounter > 0) {
+                                        session.sendAck(oldPendingAck);
                                         pendingAck = null;
                                         deliveredCounter = 0;
                                     }
@@ -1035,29 +1039,31 @@ public class ActiveMQMessageConsumer implements 
MessageAvailableConsumer, StatsC
 
         deliveredCounter++;
 
-        MessageAck oldPendingAck = pendingAck;
-        pendingAck = new MessageAck(md, ackType, deliveredCounter);
-        
pendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
-        if( oldPendingAck==null ) {
-            pendingAck.setFirstMessageId(pendingAck.getLastMessageId());
-        } else if ( oldPendingAck.getAckType() == pendingAck.getAckType() ) {
-            pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
+        final MessageAck oldPendingAck = pendingAck;
+        final MessageAck newPendingAck = new MessageAck(md, ackType, 
deliveredCounter);
+        
newPendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
+        if (oldPendingAck == null) {
+            newPendingAck.setFirstMessageId(newPendingAck.getLastMessageId());
+        } else if (oldPendingAck.getAckType() == newPendingAck.getAckType()) {
+            newPendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
         } else {
             // old pending ack being superseded by ack of another type, if is 
is not a delivered
             // ack and hence important, send it now so it is not lost.
             if (!oldPendingAck.isDeliveredAck()) {
-                LOG.debug("Sending old pending ack {}, new pending: {}", 
oldPendingAck, pendingAck);
+                LOG.debug("Sending old pending ack {}, new pending: {}", 
oldPendingAck, newPendingAck);
                 session.sendAck(oldPendingAck);
             } else {
-                LOG.debug("dropping old pending ack {}, new pending: {}", 
oldPendingAck, pendingAck);
+                LOG.debug("dropping old pending ack {}, new pending: {}", 
oldPendingAck, newPendingAck);
             }
         }
+        pendingAck = newPendingAck;
+
         // AMQ-3956 evaluate both expired and normal msgs as
         // otherwise consumer may get stalled
         if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter + ackCounter - 
additionalWindowSize)) {
-            LOG.debug("ackLater: sending: {}", pendingAck);
-            session.sendAck(pendingAck);
-            pendingAck=null;
+            LOG.debug("ackLater: sending: {}", newPendingAck);
+            session.sendAck(newPendingAck);
+            pendingAck = null;
             deliveredCounter = 0;
             additionalWindowSize = 0;
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/2ec32b67/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5426Test.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5426Test.java 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5426Test.java
new file mode 100644
index 0000000..da09aaf
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5426Test.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertFalse;
+
+import java.io.InterruptedIOException;
+import java.net.URI;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.log4j.Appender;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ5426Test {
+
+       private static final Logger LOG = LoggerFactory
+                       .getLogger(AMQ5426Test.class);
+
+       private BrokerService brokerService;
+       private String connectionUri;
+       private AtomicBoolean hasFailureInProducer = new AtomicBoolean(false);
+       private Thread producerThread;
+       private AtomicBoolean hasErrorInLogger;
+       private Appender errorDetectorAppender;
+
+       protected ConnectionFactory createConnectionFactory() throws Exception {
+               ActiveMQConnectionFactory conFactory = new 
ActiveMQConnectionFactory(
+                               connectionUri);
+               conFactory.setWatchTopicAdvisories(false);
+               conFactory.setOptimizeAcknowledge(true);
+               return conFactory;
+       }
+
+       @Before
+       public void setUp() throws Exception {
+               hasFailureInProducer = new AtomicBoolean(false);
+               hasErrorInLogger = new AtomicBoolean(false);
+               brokerService = BrokerFactory.createBroker(new URI(
+                               
"broker://()/localhost?persistent=false&useJmx=true"));
+
+               PolicyEntry policy = new PolicyEntry();
+               policy.setTopicPrefetch(100);
+               PolicyMap pMap = new PolicyMap();
+               pMap.setDefaultEntry(policy);
+               brokerService.addConnector("tcp://0.0.0.0:0");
+               brokerService.start();
+               connectionUri = 
brokerService.getTransportConnectorByScheme("tcp")
+                               .getPublishableConnectString();
+
+               // Register an error listener to LOG4J
+               // The NPE will not be detectable as of V5.10 from
+               // ActiveMQConnection.setClientInternalExceptionListener
+               // since ActiveMQMessageConsumer.dispatch will silently catch 
and
+               // discard any RuntimeException
+               errorDetectorAppender = new AppenderSkeleton() {
+                       @Override
+                       public void close() {
+                               // Do nothing
+                       }
+
+                       @Override
+                       public boolean requiresLayout() {
+                               return false;
+                       }
+
+                       @Override
+                       protected void append(LoggingEvent event) {
+                               if 
(event.getLevel().isGreaterOrEqual(Level.ERROR))
+                                       hasErrorInLogger.set(true);
+                       }
+               };
+
+               
org.apache.log4j.Logger.getRootLogger().addAppender(errorDetectorAppender);
+               producerThread = new Thread(new Runnable() {
+                       @Override
+                       public void run() {
+                               try {
+                                       Connection connection = 
createConnectionFactory()
+                                                       .createConnection();
+                                       connection.start();
+                                       Session session = 
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                                       Topic destination = 
session.createTopic("test.AMQ5426");
+                                       LOG.debug("Created topic: {}", 
destination);
+                                       MessageProducer producer = 
session.createProducer(destination);
+                                       
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                                       producer.setTimeToLive(1000);
+                                       LOG.debug("Created producer: {}", 
producer);
+
+                                       int i = 1;
+                                       while (!Thread.interrupted()) {
+                                               try {
+                                                       TextMessage msg = 
session.createTextMessage(" testMessage " + i);
+                                                       producer.send(msg);
+                                                       try {
+                                                               // Sleep for 
some nano seconds
+                                                               Thread.sleep(0, 
100);
+                                                       } catch 
(InterruptedException e) {
+                                                               // Restore the 
interrupt
+                                                               
Thread.currentThread().interrupt();
+                                                       }
+                                                       LOG.debug("message 
sent: {}", i);
+                                                       i++;
+                                               } catch (JMSException e) {
+                                                       // Sometimes, we will 
gt a JMSException with nested
+                                                       // 
InterruptedIOException when we interrupt the thread
+                                                       if (!(e.getCause() != 
null && e.getCause() instanceof InterruptedIOException)) {
+                                                               throw e;
+                                                       }
+                                               }
+                                       }
+
+                                       producer.close();
+                                       session.close();
+                                       connection.close();
+                               } catch (Exception e) {
+                                       LOG.error(e.getMessage(), e);
+                                       hasFailureInProducer.set(true);
+                               }
+                       }
+               });
+
+               producerThread.start();
+       }
+
+       @Test(timeout = 2 * 60 * 1000)
+       public void testConsumerProperlyClosedWithoutError() throws Exception {
+               Random rn = new Random();
+
+               final int NUMBER_OF_RUNS = 1000;
+
+               for (int run = 0; run < NUMBER_OF_RUNS; run++) {
+                       final AtomicInteger numberOfMessagesReceived = new 
AtomicInteger(0);
+                       LOG.info("Starting run {} of {}", run, NUMBER_OF_RUNS);
+
+                       // Starts a consumer
+                       Connection connection = 
createConnectionFactory().createConnection();
+                       connection.start();
+
+                       Session session = connection.createSession(false, 
Session.DUPS_OK_ACKNOWLEDGE);
+                       Topic destination = session.createTopic("test.AMQ5426");
+
+                       LOG.debug("Created topic: {}", destination);
+                       MessageConsumer consumer = 
session.createConsumer(destination);
+                       consumer.setMessageListener(new MessageListener() {
+
+                               @Override
+                               public void onMessage(Message message) {
+                                       LOG.debug("Received message");
+                                       
numberOfMessagesReceived.getAndIncrement();
+                               }
+                       });
+                       LOG.debug("Created consumer: {}", consumer);
+
+                       try {
+                               // Sleep for a random time
+                               Thread.sleep(rn.nextInt(5) + 1);
+                       } catch (InterruptedException e) {
+                               // Restore the interrupt
+                               Thread.currentThread().interrupt();
+                       }
+
+                       // Close the consumer
+                       LOG.debug("Closing consumer");
+                       consumer.close();
+                       session.close();
+                       connection.close();
+
+                       assertFalse("Exception in Producer Thread", 
hasFailureInProducer.get());
+                       assertFalse("Error detected in Logger", 
hasErrorInLogger.get());
+                       LOG.info("Run {} of {} completed, message received: 
{}", run,
+                                       NUMBER_OF_RUNS, 
numberOfMessagesReceived.get());
+               }
+       }
+
+       @After
+       public void tearDown() throws Exception {
+               // Interrupt the producer thread
+               LOG.info("Shutdown producer thread");
+               producerThread.interrupt();
+               producerThread.join();
+               brokerService.stop();
+               brokerService.waitUntilStopped();
+
+               assertFalse("Exception in Producer Thread", 
hasFailureInProducer.get());
+               assertFalse("Error detected in Logger", hasErrorInLogger.get());
+       }
+}

Reply via email to