This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 25d0b511ce31387a1baa767873af76e9635b1bd3 Author: Michael Pearce <[email protected]> AuthorDate: Wed Aug 21 08:19:32 2019 +0100 ARTEMIS-2458 Fix AMQP Transaction Session Close Ordering --- .../protocol/amqp/broker/AMQPSessionCallback.java | 2 +- .../proton/transaction/ProtonTransactionImpl.java | 2 +- .../apache/activemq/artemis/core/server/Queue.java | 6 +++ .../artemis/core/server/ServerConsumer.java | 2 + .../artemis/core/server/impl/QueueImpl.java | 48 ++++++++++++++++- .../artemis/core/server/impl/RefsOperation.java | 7 ++- .../core/server/impl/ServerConsumerImpl.java | 9 +++- .../core/transaction/TransactionOperation.java | 4 ++ .../core/transaction/impl/TransactionImpl.java | 62 +++++++++++----------- .../server/impl/ScheduledDeliveryHandlerTest.java | 10 ++++ .../tests/integration/cli/DummyServerConsumer.java | 5 ++ .../tests/integration/client/JMSOrderTest.java | 50 +++++++++++++++++ .../tests/unit/core/postoffice/impl/FakeQueue.java | 10 ++++ 13 files changed, 180 insertions(+), 37 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index dc249ff..4b2b669 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -363,7 +363,7 @@ public class AMQPSessionCallback implements SessionCallback { public void closeSender(final Object brokerConsumer) throws Exception { final ServerConsumer consumer = ((ServerConsumer) brokerConsumer); - consumer.close(false); + consumer.close(false, true); consumer.getQueue().recheckRefCount(serverSession.getSessionContext()); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java index 123dbb5..83128e1 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java @@ -50,7 +50,7 @@ public class ProtonTransactionImpl extends TransactionImpl { private boolean discharged; public ProtonTransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final AMQPConnectionContext connection) { - super(xid, storageManager, timeoutSeconds); + super(xid, storageManager, timeoutSeconds, true); addOperation(new TransactionOperationAbstract() { @Override public void afterCommit(Transaction tx) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index bab38d6..8b91aa9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -146,6 +146,9 @@ public interface Queue extends Bindable,CriticalComponent { ReferenceCounter getConsumersRefCount(); + /* Called when a message is cancelled back into the queue */ + void addSorted(List<MessageReference> refs, boolean scheduling); + void reload(MessageReference ref); void addTail(MessageReference ref); @@ -154,6 +157,9 @@ public interface Queue extends Bindable,CriticalComponent { void addHead(MessageReference ref, boolean scheduling); + /* Called when a message is cancelled back into the queue */ + void addSorted(MessageReference ref, boolean scheduling); + void addHead(List<MessageReference> refs, boolean scheduling); void acknowledge(MessageReference ref) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java index f1f8b1e..0c9c5bf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java @@ -63,6 +63,8 @@ public interface ServerConsumer extends Consumer, ConsumerInfo { void close(boolean failed) throws Exception; + void close(boolean failed, boolean sorted) throws Exception; + /** * This method is just to remove itself from Queues. * If for any reason during a close an exception occurred, the exception treatment diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index e19d9ef..090a83e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -896,6 +896,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { /* Called when a message is cancelled back into the queue */ @Override + public void addSorted(final MessageReference ref, boolean scheduling) { + enterCritical(CRITICAL_PATH_ADD_HEAD); + synchronized (this) { + try { + if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) { + return; + } + + internalAddSorted(ref); + + directDeliver = false; + } finally { + leaveCritical(CRITICAL_PATH_ADD_HEAD); + } + } + } + + /* Called when a message is cancelled back into the queue */ + @Override public void addHead(final List<MessageReference> refs, boolean scheduling) { enterCritical(CRITICAL_PATH_ADD_HEAD); synchronized (this) { @@ -913,6 +932,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } + /* Called when a message is cancelled back into the queue */ + @Override + public void addSorted(final List<MessageReference> refs, boolean scheduling) { + enterCritical(CRITICAL_PATH_ADD_HEAD); + synchronized (this) { + try { + for (MessageReference ref : refs) { + addSorted(ref, scheduling); + } + + resetAllIterators(); + + deliverAsync(); + } finally { + leaveCritical(CRITICAL_PATH_ADD_HEAD); + } + } + } + @Override public synchronized void reload(final MessageReference ref) { queueMemorySize.addAndGet(ref.getMessageMemoryEstimate()); @@ -3461,13 +3499,21 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } void postRollback(final LinkedList<MessageReference> refs) { + postRollback(refs, false); + } + + void postRollback(final LinkedList<MessageReference> refs, boolean sorted) { //if we have purged then ignore adding the messages back if (purgeOnNoConsumers && getConsumerCount() == 0) { purgeAfterRollback(refs); return; } - addHead(refs, false); + if (sorted) { + addSorted(refs, false); + } else { + addHead(refs, false); + } } private void purgeAfterRollback(LinkedList<MessageReference> refs) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java index c8d9297..925f439 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java @@ -79,6 +79,11 @@ public class RefsOperation extends TransactionOperationAbstract { @Override public void afterRollback(final Transaction tx) { + afterRollback(tx, false); + } + + @Override + public void afterRollback(final Transaction tx, boolean sorted) { Map<QueueImpl, LinkedList<MessageReference>> queueMap = new HashMap<>(); long timeBase = System.currentTimeMillis(); @@ -109,7 +114,7 @@ public class RefsOperation extends TransactionOperationAbstract { QueueImpl queue = entry.getKey(); synchronized (queue) { - queue.postRollback(refs); + queue.postRollback(refs, sorted); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 54cf9a2..ddba797 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -529,7 +529,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } @Override - public synchronized void close(final boolean failed) throws Exception { + public void close(final boolean failed) throws Exception { + close(failed, false); + } + + @Override + public synchronized void close(final boolean failed, boolean sorted) throws Exception { // Close should only ever be done once per consumer. if (isClosed) return; @@ -555,7 +560,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { List<MessageReference> refs = cancelRefs(failed, false, null); - Transaction tx = new TransactionImpl(storageManager); + Transaction tx = new TransactionImpl(storageManager, sorted); refs.forEach(ref -> { if (logger.isTraceEnabled()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java index 5da1d97..5c7e7e6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java @@ -52,6 +52,10 @@ public interface TransactionOperation { */ void afterRollback(Transaction tx); + default void afterRollback(Transaction tx, boolean sorted) { + afterRollback(tx); + } + List<MessageReference> getRelatedMessageReferences(); List<MessageReference> getListOnConsumer(long consumerID); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java index d459975..95983b7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java @@ -63,6 +63,8 @@ public class TransactionImpl implements Transaction { private final long createTime; + private final boolean sorted; + private volatile boolean containsPersistent; private int timeoutSeconds = -1; @@ -96,47 +98,45 @@ public class TransactionImpl implements Transaction { } public TransactionImpl(final StorageManager storageManager, final int timeoutSeconds) { - this.storageManager = storageManager; - - xid = null; - - id = storageManager.generateID(); - - createTime = System.currentTimeMillis(); - - this.timeoutSeconds = timeoutSeconds; + this(storageManager.generateID(), null, storageManager, timeoutSeconds, false); } public TransactionImpl(final StorageManager storageManager) { - this.storageManager = storageManager; - - xid = null; - - id = storageManager.generateID(); + this(storageManager, false); + } - createTime = System.currentTimeMillis(); + public TransactionImpl(final StorageManager storageManager, boolean sorted) { + this(storageManager.generateID(), null, storageManager,-1, sorted); } public TransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds) { - this.storageManager = storageManager; - - this.xid = xid; + this(storageManager.generateID(), xid, storageManager, timeoutSeconds, false); + } - id = storageManager.generateID(); + public TransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final boolean sorted) { + this(storageManager.generateID(), xid, storageManager, timeoutSeconds, sorted); + } - createTime = System.currentTimeMillis(); + public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager) { + this(id, xid, storageManager, -1, false); + } - this.timeoutSeconds = timeoutSeconds; + public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, boolean sorted) { + this(id, xid, storageManager, -1, sorted); } - public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager) { + private TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, final int timeoutSeconds, boolean sorted) { this.storageManager = storageManager; this.xid = xid; this.id = id; - createTime = System.currentTimeMillis(); + this.createTime = System.currentTimeMillis(); + + this.timeoutSeconds = timeoutSeconds; + + this.sorted = sorted; } // Transaction implementation @@ -217,7 +217,7 @@ public class TransactionImpl implements Transaction { logger.trace("TransactionImpl::prepare::rollbackonly, rollingback " + this); } - internalRollback(); + internalRollback(sorted); if (exception != null) { throw exception; @@ -276,7 +276,7 @@ public class TransactionImpl implements Transaction { return; } if (state == State.ROLLBACK_ONLY) { - internalRollback(); + internalRollback(sorted); if (exception != null) { throw exception; @@ -379,11 +379,11 @@ public class TransactionImpl implements Transaction { } } - internalRollback(); + internalRollback(sorted); } } - private void internalRollback() throws Exception { + private void internalRollback(boolean sorted) throws Exception { if (logger.isTraceEnabled()) { logger.trace("TransactionImpl::internalRollback " + this); } @@ -418,7 +418,7 @@ public class TransactionImpl implements Transaction { @Override public void done() { - afterRollback(operationsToComplete); + afterRollback(operationsToComplete, sorted); } }); @@ -432,7 +432,7 @@ public class TransactionImpl implements Transaction { @Override public void done() { - afterRollback(storeOperationsToComplete); + afterRollback(storeOperationsToComplete, sorted); } }); } @@ -562,10 +562,10 @@ public class TransactionImpl implements Transaction { } } - private synchronized void afterRollback(List<TransactionOperation> operationsToComplete) { + private synchronized void afterRollback(List<TransactionOperation> operationsToComplete, boolean sorted) { if (operationsToComplete != null) { for (TransactionOperation operation : operationsToComplete) { - operation.afterRollback(this); + operation.afterRollback(this, sorted); } // Help out GC here operationsToComplete.clear(); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index ea15264..b94bd3a 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -1036,6 +1036,11 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override + public void addSorted(List<MessageReference> refs, boolean scheduling) { + addHead(refs, scheduling); + } + + @Override public void reload(MessageReference ref) { } @@ -1056,6 +1061,11 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override + public void addSorted(MessageReference ref, boolean scheduling) { + + } + + @Override public void addHead(List<MessageReference> refs, boolean scheduling) { for (MessageReference ref : refs) { addFirst(ref); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java index ee7bdbb..9858357 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java @@ -91,6 +91,11 @@ public class DummyServerConsumer implements ServerConsumer { } @Override + public void close(boolean failed, boolean sorted) throws Exception { + + } + + @Override public void removeItself() throws Exception { } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java index dcc5a40..f4087d4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java @@ -129,4 +129,54 @@ public class JMSOrderTest extends JMSTestBase { } + @Test(timeout = 60000) + public void testReceiveSomeThenClose() throws Exception { + Connection connection = protocolCF.createConnection(); + try { + connection.start(); + + int totalCount = 5; + int consumeBeforeRollback = 2; + + sendToAmqQueue(totalCount); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(queue); + + for (int i = 1; i <= consumeBeforeRollback; i++) { + Message message = consumer.receive(3000); + assertNotNull(message); + assertEquals("Unexpected message number", i, message.getIntProperty("nr")); + } + + session.close(); + + session = connection.createSession(true, Session.SESSION_TRANSACTED); + queue = session.createQueue(name.getMethodName()); + consumer = session.createConsumer(queue); + + // Consume again.. the previously consumed messages should get delivered + // again after the rollback and then the remainder should follow + List<Integer> messageNumbers = new ArrayList<>(); + for (int i = 1; i <= totalCount; i++) { + Message message = consumer.receive(3000); + assertNotNull("Failed to receive message: " + i, message); + int msgNum = message.getIntProperty("nr"); + System.out.println("Received " + msgNum); + messageNumbers.add(msgNum); + } + + session.commit(); + + assertEquals("Unexpected size of list", totalCount, messageNumbers.size()); + for (int i = 0; i < messageNumbers.size(); i++) { + assertEquals("Unexpected order of messages: " + messageNumbers, Integer.valueOf(i + 1), messageNumbers.get(i)); + } + } finally { + connection.close(); + } + + } + } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index 612d621..da25078 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -269,6 +269,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } @Override + public void addSorted(MessageReference ref, boolean scheduling) { + + } + + @Override public void addHead(List<MessageReference> ref, boolean scheduling) { // no-op @@ -460,6 +465,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } @Override + public void addSorted(List<MessageReference> refs, boolean scheduling) { + + } + + @Override public Set<Consumer> getConsumers() { // no-op return null;
