Repository: activemq
Updated Branches:
  refs/heads/activemq-5.13.x 2ec32b67a -> 8812cb9b8


Revert "https://issues.apache.org/jira/browse/AMQ-5426";

This reverts commit 2ec32b67af3f494d65a076b32f71560168bb6ec9.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/72c3a6d0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/72c3a6d0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/72c3a6d0

Branch: refs/heads/activemq-5.13.x
Commit: 72c3a6d0b4e72ffce30d07b7395031152009fe53
Parents: 2ec32b6
Author: Christopher L. Shannon (cshannon) <[email protected]>
Authored: Tue Jul 5 20:38:53 2016 +0000
Committer: Christopher L. Shannon (cshannon) <[email protected]>
Committed: Tue Jul 5 20:38:53 2016 +0000

----------------------------------------------------------------------
 .../activemq/ActiveMQMessageConsumer.java       |  42 ++--
 .../org/apache/activemq/bugs/AMQ5426Test.java   | 227 -------------------
 2 files changed, 18 insertions(+), 251 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/72c3a6d0/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
 
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index f2a78da..90ca7c2 100755
--- 
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -147,7 +147,7 @@ public class ActiveMQMessageConsumer implements 
MessageAvailableConsumer, StatsC
     private boolean clearDeliveredList;
     AtomicInteger inProgressClearRequiredFlag = new AtomicInteger(0);
 
-    private volatile MessageAck pendingAck;
+    private MessageAck pendingAck;
     private long lastDeliveredSequenceId = -1;
 
     private IOException failureError;
@@ -780,9 +780,6 @@ public class ActiveMQMessageConsumer implements 
MessageAvailableConsumer, StatsC
     void deliverAcks() {
         MessageAck ack = null;
         if (deliveryingAcknowledgements.compareAndSet(false, true)) {
-            //Capture the pendingAck reference in case the optimizeAcknowledge 
dispatch
-            //thread mutates it
-            final MessageAck oldPendingAck = pendingAck;
             if (isAutoAcknowledgeEach()) {
                 synchronized(deliveredMessages) {
                     ack = 
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
@@ -790,12 +787,12 @@ public class ActiveMQMessageConsumer implements 
MessageAvailableConsumer, StatsC
                         deliveredMessages.clear();
                         ackCounter = 0;
                     } else {
-                        ack = oldPendingAck;
+                        ack = pendingAck;
                         pendingAck = null;
                     }
                 }
-            } else if (oldPendingAck != null && oldPendingAck.isStandardAck()) 
{
-                ack = oldPendingAck;
+            } else if (pendingAck != null && pendingAck.isStandardAck()) {
+                ack = pendingAck;
                 pendingAck = null;
             }
             if (ack != null) {
@@ -974,9 +971,8 @@ public class ActiveMQMessageConsumer implements 
MessageAvailableConsumer, StatsC
                                     // we won't sent standard acks with every 
msg just
                                     // because the deliveredCounter just below
                                     // 0.5 * prefetch as used in ackLater()
-                                    final MessageAck oldPendingAck = 
pendingAck;
-                                    if (oldPendingAck != null && 
deliveredCounter > 0) {
-                                        session.sendAck(oldPendingAck);
+                                    if (pendingAck != null && deliveredCounter 
> 0) {
+                                        session.sendAck(pendingAck);
                                         pendingAck = null;
                                         deliveredCounter = 0;
                                     }
@@ -1039,31 +1035,29 @@ public class ActiveMQMessageConsumer implements 
MessageAvailableConsumer, StatsC
 
         deliveredCounter++;
 
-        final MessageAck oldPendingAck = pendingAck;
-        final MessageAck newPendingAck = new MessageAck(md, ackType, 
deliveredCounter);
-        
newPendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
-        if (oldPendingAck == null) {
-            newPendingAck.setFirstMessageId(newPendingAck.getLastMessageId());
-        } else if (oldPendingAck.getAckType() == newPendingAck.getAckType()) {
-            newPendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
+        MessageAck oldPendingAck = pendingAck;
+        pendingAck = new MessageAck(md, ackType, deliveredCounter);
+        
pendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
+        if( oldPendingAck==null ) {
+            pendingAck.setFirstMessageId(pendingAck.getLastMessageId());
+        } else if ( oldPendingAck.getAckType() == pendingAck.getAckType() ) {
+            pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
         } else {
             // old pending ack being superseded by ack of another type, if is 
is not a delivered
             // ack and hence important, send it now so it is not lost.
             if (!oldPendingAck.isDeliveredAck()) {
-                LOG.debug("Sending old pending ack {}, new pending: {}", 
oldPendingAck, newPendingAck);
+                LOG.debug("Sending old pending ack {}, new pending: {}", 
oldPendingAck, pendingAck);
                 session.sendAck(oldPendingAck);
             } else {
-                LOG.debug("dropping old pending ack {}, new pending: {}", 
oldPendingAck, newPendingAck);
+                LOG.debug("dropping old pending ack {}, new pending: {}", 
oldPendingAck, pendingAck);
             }
         }
-        pendingAck = newPendingAck;
-
         // AMQ-3956 evaluate both expired and normal msgs as
         // otherwise consumer may get stalled
         if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter + ackCounter - 
additionalWindowSize)) {
-            LOG.debug("ackLater: sending: {}", newPendingAck);
-            session.sendAck(newPendingAck);
-            pendingAck = null;
+            LOG.debug("ackLater: sending: {}", pendingAck);
+            session.sendAck(pendingAck);
+            pendingAck=null;
             deliveredCounter = 0;
             additionalWindowSize = 0;
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/72c3a6d0/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5426Test.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5426Test.java 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5426Test.java
deleted file mode 100644
index da09aaf..0000000
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5426Test.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertFalse;
-
-import java.io.InterruptedIOException;
-import java.net.URI;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.log4j.Appender;
-import org.apache.log4j.AppenderSkeleton;
-import org.apache.log4j.Level;
-import org.apache.log4j.spi.LoggingEvent;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ5426Test {
-
-       private static final Logger LOG = LoggerFactory
-                       .getLogger(AMQ5426Test.class);
-
-       private BrokerService brokerService;
-       private String connectionUri;
-       private AtomicBoolean hasFailureInProducer = new AtomicBoolean(false);
-       private Thread producerThread;
-       private AtomicBoolean hasErrorInLogger;
-       private Appender errorDetectorAppender;
-
-       protected ConnectionFactory createConnectionFactory() throws Exception {
-               ActiveMQConnectionFactory conFactory = new 
ActiveMQConnectionFactory(
-                               connectionUri);
-               conFactory.setWatchTopicAdvisories(false);
-               conFactory.setOptimizeAcknowledge(true);
-               return conFactory;
-       }
-
-       @Before
-       public void setUp() throws Exception {
-               hasFailureInProducer = new AtomicBoolean(false);
-               hasErrorInLogger = new AtomicBoolean(false);
-               brokerService = BrokerFactory.createBroker(new URI(
-                               
"broker://()/localhost?persistent=false&useJmx=true"));
-
-               PolicyEntry policy = new PolicyEntry();
-               policy.setTopicPrefetch(100);
-               PolicyMap pMap = new PolicyMap();
-               pMap.setDefaultEntry(policy);
-               brokerService.addConnector("tcp://0.0.0.0:0");
-               brokerService.start();
-               connectionUri = 
brokerService.getTransportConnectorByScheme("tcp")
-                               .getPublishableConnectString();
-
-               // Register an error listener to LOG4J
-               // The NPE will not be detectable as of V5.10 from
-               // ActiveMQConnection.setClientInternalExceptionListener
-               // since ActiveMQMessageConsumer.dispatch will silently catch 
and
-               // discard any RuntimeException
-               errorDetectorAppender = new AppenderSkeleton() {
-                       @Override
-                       public void close() {
-                               // Do nothing
-                       }
-
-                       @Override
-                       public boolean requiresLayout() {
-                               return false;
-                       }
-
-                       @Override
-                       protected void append(LoggingEvent event) {
-                               if 
(event.getLevel().isGreaterOrEqual(Level.ERROR))
-                                       hasErrorInLogger.set(true);
-                       }
-               };
-
-               
org.apache.log4j.Logger.getRootLogger().addAppender(errorDetectorAppender);
-               producerThread = new Thread(new Runnable() {
-                       @Override
-                       public void run() {
-                               try {
-                                       Connection connection = 
createConnectionFactory()
-                                                       .createConnection();
-                                       connection.start();
-                                       Session session = 
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                                       Topic destination = 
session.createTopic("test.AMQ5426");
-                                       LOG.debug("Created topic: {}", 
destination);
-                                       MessageProducer producer = 
session.createProducer(destination);
-                                       
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-                                       producer.setTimeToLive(1000);
-                                       LOG.debug("Created producer: {}", 
producer);
-
-                                       int i = 1;
-                                       while (!Thread.interrupted()) {
-                                               try {
-                                                       TextMessage msg = 
session.createTextMessage(" testMessage " + i);
-                                                       producer.send(msg);
-                                                       try {
-                                                               // Sleep for 
some nano seconds
-                                                               Thread.sleep(0, 
100);
-                                                       } catch 
(InterruptedException e) {
-                                                               // Restore the 
interrupt
-                                                               
Thread.currentThread().interrupt();
-                                                       }
-                                                       LOG.debug("message 
sent: {}", i);
-                                                       i++;
-                                               } catch (JMSException e) {
-                                                       // Sometimes, we will 
gt a JMSException with nested
-                                                       // 
InterruptedIOException when we interrupt the thread
-                                                       if (!(e.getCause() != 
null && e.getCause() instanceof InterruptedIOException)) {
-                                                               throw e;
-                                                       }
-                                               }
-                                       }
-
-                                       producer.close();
-                                       session.close();
-                                       connection.close();
-                               } catch (Exception e) {
-                                       LOG.error(e.getMessage(), e);
-                                       hasFailureInProducer.set(true);
-                               }
-                       }
-               });
-
-               producerThread.start();
-       }
-
-       @Test(timeout = 2 * 60 * 1000)
-       public void testConsumerProperlyClosedWithoutError() throws Exception {
-               Random rn = new Random();
-
-               final int NUMBER_OF_RUNS = 1000;
-
-               for (int run = 0; run < NUMBER_OF_RUNS; run++) {
-                       final AtomicInteger numberOfMessagesReceived = new 
AtomicInteger(0);
-                       LOG.info("Starting run {} of {}", run, NUMBER_OF_RUNS);
-
-                       // Starts a consumer
-                       Connection connection = 
createConnectionFactory().createConnection();
-                       connection.start();
-
-                       Session session = connection.createSession(false, 
Session.DUPS_OK_ACKNOWLEDGE);
-                       Topic destination = session.createTopic("test.AMQ5426");
-
-                       LOG.debug("Created topic: {}", destination);
-                       MessageConsumer consumer = 
session.createConsumer(destination);
-                       consumer.setMessageListener(new MessageListener() {
-
-                               @Override
-                               public void onMessage(Message message) {
-                                       LOG.debug("Received message");
-                                       
numberOfMessagesReceived.getAndIncrement();
-                               }
-                       });
-                       LOG.debug("Created consumer: {}", consumer);
-
-                       try {
-                               // Sleep for a random time
-                               Thread.sleep(rn.nextInt(5) + 1);
-                       } catch (InterruptedException e) {
-                               // Restore the interrupt
-                               Thread.currentThread().interrupt();
-                       }
-
-                       // Close the consumer
-                       LOG.debug("Closing consumer");
-                       consumer.close();
-                       session.close();
-                       connection.close();
-
-                       assertFalse("Exception in Producer Thread", 
hasFailureInProducer.get());
-                       assertFalse("Error detected in Logger", 
hasErrorInLogger.get());
-                       LOG.info("Run {} of {} completed, message received: 
{}", run,
-                                       NUMBER_OF_RUNS, 
numberOfMessagesReceived.get());
-               }
-       }
-
-       @After
-       public void tearDown() throws Exception {
-               // Interrupt the producer thread
-               LOG.info("Shutdown producer thread");
-               producerThread.interrupt();
-               producerThread.join();
-               brokerService.stop();
-               brokerService.waitUntilStopped();
-
-               assertFalse("Exception in Producer Thread", 
hasFailureInProducer.get());
-               assertFalse("Error detected in Logger", hasErrorInLogger.get());
-       }
-}

Reply via email to