Repository: activemq Updated Branches: refs/heads/master 6bfa13b6e -> c02bc6484
Revert "https://issues.apache.org/jira/browse/AMQ-5426" This reverts commit 6bfa13b6e707fb3465a9193cd44c478514fcd948. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3856c399 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3856c399 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3856c399 Branch: refs/heads/master Commit: 3856c3999ac8a29171c09de99bd0e8bf9cdaff4e Parents: 6bfa13b Author: Christopher L. Shannon (cshannon) <[email protected]> Authored: Tue Jul 5 20:27:02 2016 +0000 Committer: Christopher L. Shannon (cshannon) <[email protected]> Committed: Tue Jul 5 20:27:02 2016 +0000 ---------------------------------------------------------------------- .../activemq/ActiveMQMessageConsumer.java | 42 ++-- .../org/apache/activemq/bugs/AMQ5426Test.java | 227 ------------------- 2 files changed, 18 insertions(+), 251 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/3856c399/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index e736566..9e532db 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -147,7 +147,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC private boolean clearDeliveredList; AtomicInteger inProgressClearRequiredFlag = new AtomicInteger(0); - private volatile MessageAck pendingAck; + private MessageAck pendingAck; private long lastDeliveredSequenceId = -1; private IOException failureError; @@ -780,9 +780,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC void deliverAcks() { MessageAck ack = null; if (deliveryingAcknowledgements.compareAndSet(false, true)) { - //Capture the pendingAck reference in case the optimizeAcknowledge dispatch - //thread mutates it - final MessageAck oldPendingAck = pendingAck; if (isAutoAcknowledgeEach()) { synchronized(deliveredMessages) { ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); @@ -790,12 +787,12 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC deliveredMessages.clear(); ackCounter = 0; } else { - ack = oldPendingAck; + ack = pendingAck; pendingAck = null; } } - } else if (oldPendingAck != null && oldPendingAck.isStandardAck()) { - ack = oldPendingAck; + } else if (pendingAck != null && pendingAck.isStandardAck()) { + ack = pendingAck; pendingAck = null; } if (ack != null) { @@ -974,9 +971,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC // we won't sent standard acks with every msg just // because the deliveredCounter just below // 0.5 * prefetch as used in ackLater() - final MessageAck oldPendingAck = pendingAck; - if (oldPendingAck != null && deliveredCounter > 0) { - session.sendAck(oldPendingAck); + if (pendingAck != null && deliveredCounter > 0) { + session.sendAck(pendingAck); pendingAck = null; deliveredCounter = 0; } @@ -1039,31 +1035,29 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC deliveredCounter++; - final MessageAck oldPendingAck = pendingAck; - final MessageAck newPendingAck = new MessageAck(md, ackType, deliveredCounter); - newPendingAck.setTransactionId(session.getTransactionContext().getTransactionId()); - if (oldPendingAck == null) { - newPendingAck.setFirstMessageId(newPendingAck.getLastMessageId()); - } else if (oldPendingAck.getAckType() == newPendingAck.getAckType()) { - newPendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId()); + MessageAck oldPendingAck = pendingAck; + pendingAck = new MessageAck(md, ackType, deliveredCounter); + pendingAck.setTransactionId(session.getTransactionContext().getTransactionId()); + if( oldPendingAck==null ) { + pendingAck.setFirstMessageId(pendingAck.getLastMessageId()); + } else if ( oldPendingAck.getAckType() == pendingAck.getAckType() ) { + pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId()); } else { // old pending ack being superseded by ack of another type, if is is not a delivered // ack and hence important, send it now so it is not lost. if (!oldPendingAck.isDeliveredAck()) { - LOG.debug("Sending old pending ack {}, new pending: {}", oldPendingAck, newPendingAck); + LOG.debug("Sending old pending ack {}, new pending: {}", oldPendingAck, pendingAck); session.sendAck(oldPendingAck); } else { - LOG.debug("dropping old pending ack {}, new pending: {}", oldPendingAck, newPendingAck); + LOG.debug("dropping old pending ack {}, new pending: {}", oldPendingAck, pendingAck); } } - pendingAck = newPendingAck; - // AMQ-3956 evaluate both expired and normal msgs as // otherwise consumer may get stalled if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter + ackCounter - additionalWindowSize)) { - LOG.debug("ackLater: sending: {}", newPendingAck); - session.sendAck(newPendingAck); - pendingAck = null; + LOG.debug("ackLater: sending: {}", pendingAck); + session.sendAck(pendingAck); + pendingAck=null; deliveredCounter = 0; additionalWindowSize = 0; } http://git-wip-us.apache.org/repos/asf/activemq/blob/3856c399/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5426Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5426Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5426Test.java deleted file mode 100644 index da09aaf..0000000 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5426Test.java +++ /dev/null @@ -1,227 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertFalse; - -import java.io.InterruptedIOException; -import java.net.URI; -import java.util.Random; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.log4j.Appender; -import org.apache.log4j.AppenderSkeleton; -import org.apache.log4j.Level; -import org.apache.log4j.spi.LoggingEvent; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ5426Test { - - private static final Logger LOG = LoggerFactory - .getLogger(AMQ5426Test.class); - - private BrokerService brokerService; - private String connectionUri; - private AtomicBoolean hasFailureInProducer = new AtomicBoolean(false); - private Thread producerThread; - private AtomicBoolean hasErrorInLogger; - private Appender errorDetectorAppender; - - protected ConnectionFactory createConnectionFactory() throws Exception { - ActiveMQConnectionFactory conFactory = new ActiveMQConnectionFactory( - connectionUri); - conFactory.setWatchTopicAdvisories(false); - conFactory.setOptimizeAcknowledge(true); - return conFactory; - } - - @Before - public void setUp() throws Exception { - hasFailureInProducer = new AtomicBoolean(false); - hasErrorInLogger = new AtomicBoolean(false); - brokerService = BrokerFactory.createBroker(new URI( - "broker://()/localhost?persistent=false&useJmx=true")); - - PolicyEntry policy = new PolicyEntry(); - policy.setTopicPrefetch(100); - PolicyMap pMap = new PolicyMap(); - pMap.setDefaultEntry(policy); - brokerService.addConnector("tcp://0.0.0.0:0"); - brokerService.start(); - connectionUri = brokerService.getTransportConnectorByScheme("tcp") - .getPublishableConnectString(); - - // Register an error listener to LOG4J - // The NPE will not be detectable as of V5.10 from - // ActiveMQConnection.setClientInternalExceptionListener - // since ActiveMQMessageConsumer.dispatch will silently catch and - // discard any RuntimeException - errorDetectorAppender = new AppenderSkeleton() { - @Override - public void close() { - // Do nothing - } - - @Override - public boolean requiresLayout() { - return false; - } - - @Override - protected void append(LoggingEvent event) { - if (event.getLevel().isGreaterOrEqual(Level.ERROR)) - hasErrorInLogger.set(true); - } - }; - - org.apache.log4j.Logger.getRootLogger().addAppender(errorDetectorAppender); - producerThread = new Thread(new Runnable() { - @Override - public void run() { - try { - Connection connection = createConnectionFactory() - .createConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Topic destination = session.createTopic("test.AMQ5426"); - LOG.debug("Created topic: {}", destination); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - producer.setTimeToLive(1000); - LOG.debug("Created producer: {}", producer); - - int i = 1; - while (!Thread.interrupted()) { - try { - TextMessage msg = session.createTextMessage(" testMessage " + i); - producer.send(msg); - try { - // Sleep for some nano seconds - Thread.sleep(0, 100); - } catch (InterruptedException e) { - // Restore the interrupt - Thread.currentThread().interrupt(); - } - LOG.debug("message sent: {}", i); - i++; - } catch (JMSException e) { - // Sometimes, we will gt a JMSException with nested - // InterruptedIOException when we interrupt the thread - if (!(e.getCause() != null && e.getCause() instanceof InterruptedIOException)) { - throw e; - } - } - } - - producer.close(); - session.close(); - connection.close(); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - hasFailureInProducer.set(true); - } - } - }); - - producerThread.start(); - } - - @Test(timeout = 2 * 60 * 1000) - public void testConsumerProperlyClosedWithoutError() throws Exception { - Random rn = new Random(); - - final int NUMBER_OF_RUNS = 1000; - - for (int run = 0; run < NUMBER_OF_RUNS; run++) { - final AtomicInteger numberOfMessagesReceived = new AtomicInteger(0); - LOG.info("Starting run {} of {}", run, NUMBER_OF_RUNS); - - // Starts a consumer - Connection connection = createConnectionFactory().createConnection(); - connection.start(); - - Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); - Topic destination = session.createTopic("test.AMQ5426"); - - LOG.debug("Created topic: {}", destination); - MessageConsumer consumer = session.createConsumer(destination); - consumer.setMessageListener(new MessageListener() { - - @Override - public void onMessage(Message message) { - LOG.debug("Received message"); - numberOfMessagesReceived.getAndIncrement(); - } - }); - LOG.debug("Created consumer: {}", consumer); - - try { - // Sleep for a random time - Thread.sleep(rn.nextInt(5) + 1); - } catch (InterruptedException e) { - // Restore the interrupt - Thread.currentThread().interrupt(); - } - - // Close the consumer - LOG.debug("Closing consumer"); - consumer.close(); - session.close(); - connection.close(); - - assertFalse("Exception in Producer Thread", hasFailureInProducer.get()); - assertFalse("Error detected in Logger", hasErrorInLogger.get()); - LOG.info("Run {} of {} completed, message received: {}", run, - NUMBER_OF_RUNS, numberOfMessagesReceived.get()); - } - } - - @After - public void tearDown() throws Exception { - // Interrupt the producer thread - LOG.info("Shutdown producer thread"); - producerThread.interrupt(); - producerThread.join(); - brokerService.stop(); - brokerService.waitUntilStopped(); - - assertFalse("Exception in Producer Thread", hasFailureInProducer.get()); - assertFalse("Error detected in Logger", hasErrorInLogger.get()); - } -}
