This is an automated email from the ASF dual-hosted git repository. wy96f pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push: new 4a61d2d ARTEMIS-2380 Fix delivering message in the case of consume close new 9c1cbf3 This closes #2703 4a61d2d is described below commit 4a61d2dc769ebc76b5cb18a240931ad169c0123e Author: Wei Yang <wy96...@gmail.com> AuthorDate: Fri Aug 30 17:38:04 2019 +0800 ARTEMIS-2380 Fix delivering message in the case of consume close --- .../apache/activemq/artemis/core/server/Queue.java | 4 +++ .../artemis/core/server/ServerSession.java | 6 ++++ .../artemis/core/server/impl/QueueImpl.java | 23 +++++++++++++ .../artemis/core/server/impl/RefsOperation.java | 28 ++++++++++++++- .../core/server/impl/ServerConsumerImpl.java | 13 ++++++- .../core/server/impl/ServerSessionImpl.java | 40 ++++++++++++++++++++++ .../server/impl/ScheduledDeliveryHandlerTest.java | 10 ++++++ .../tests/integration/client/ReceiveTest.java | 39 +++++++++++++++++++++ .../integration/management/QueueControlTest.java | 37 ++++++++++++++++++++ .../tests/unit/core/postoffice/impl/FakeQueue.java | 10 ++++++ 10 files changed, 208 insertions(+), 2 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 3ff1b84..adcb72e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -129,6 +129,10 @@ public interface Queue extends Bindable,CriticalComponent { void addConsumer(Consumer consumer) throws Exception; + void addLingerSession(String sessionId); + + void removeLingerSession(String sessionId); + void removeConsumer(Consumer consumer); int getConsumerCount(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index 210fe89..f940ec2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -429,6 +429,10 @@ public interface ServerSession extends SecurityAuth { List<MessageReference> getInTXMessagesForConsumer(long consumerId); + List<MessageReference> getInTxLingerMessages(); + + void addLingerConsumer(ServerConsumer consumer); + String getValidatedUser(); SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception; @@ -490,4 +494,6 @@ public interface ServerSession extends SecurityAuth { int getProducerCount(); int getDefaultConsumerWindowSize(SimpleString address); + + String toManagementString(); } \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 04ca5d4..a9eab4f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -80,6 +80,7 @@ import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler; import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor; import org.apache.activemq.artemis.core.server.management.ManagementService; @@ -101,6 +102,7 @@ import org.apache.activemq.artemis.utils.Env; import org.apache.activemq.artemis.utils.ReferenceCounter; import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; +import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.apache.activemq.artemis.utils.collections.PriorityLinkedList; import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl; @@ -321,6 +323,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { */ private final Object directDeliveryGuard = new Object(); + private final ConcurrentHashSet<String> lingerSessionIds = new ConcurrentHashSet<>(); + public String debug() { StringWriter str = new StringWriter(); PrintWriter out = new PrintWriter(str); @@ -1261,6 +1265,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } @Override + public void addLingerSession(String sessionId) { + lingerSessionIds.add(sessionId); + } + + @Override + public void removeLingerSession(String sessionId) { + lingerSessionIds.remove(sessionId); + } + + @Override public void removeConsumer(final Consumer consumer) { enterCritical(CRITICAL_CONSUMER); @@ -1585,6 +1599,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { mapReturn.put(holder.consumer.toManagementString(), msgs); } } + + for (String lingerSessionId : lingerSessionIds) { + ServerSession serverSession = server.getSessionByID(lingerSessionId); + List<MessageReference> refs = serverSession == null ? null : serverSession.getInTxLingerMessages(); + if (refs != null && !refs.isEmpty()) { + mapReturn.put(serverSession.toManagementString(), refs); + } + } + return mapReturn; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java index f0b6d34..d3cd425 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java @@ -52,6 +52,8 @@ public class RefsOperation extends TransactionOperationAbstract { */ protected boolean ignoreRedeliveryCheck = false; + private String lingerSessionId = null; + public RefsOperation(Queue queue, AckReason reason, StorageManager storageManager) { this.queue = queue; this.reason = reason; @@ -97,6 +99,8 @@ public class RefsOperation extends TransactionOperationAbstract { List<MessageReference> ackedRefs = new ArrayList<>(); for (MessageReference ref : refsToAck) { + clearLingerRef(ref); + ref.emptyConsumerID(); if (logger.isTraceEnabled()) { @@ -175,8 +179,10 @@ public class RefsOperation extends TransactionOperationAbstract { @Override public void afterCommit(final Transaction tx) { for (MessageReference ref : refsToAck) { + clearLingerRef(ref); + synchronized (ref.getQueue()) { - queue.postAcknowledge(ref, reason); + ref.getQueue().postAcknowledge(ref, reason); } } @@ -190,6 +196,12 @@ public class RefsOperation extends TransactionOperationAbstract { } } + private void clearLingerRef(MessageReference ref) { + if (!ref.hasConsumerId() && lingerSessionId != null) { + ref.getQueue().removeLingerSession(lingerSessionId); + } + } + private void decrementRefCount(MessageReference refmsg) { try { refmsg.getMessage().decrementRefCount(); @@ -228,4 +240,18 @@ public class RefsOperation extends TransactionOperationAbstract { return refsToAck; } + public synchronized List<MessageReference> getLingerMessages() { + List<MessageReference> list = new LinkedList<>(); + for (MessageReference ref : refsToAck) { + if (!ref.hasConsumerId() && lingerSessionId != null) { + list.add(ref); + } + } + + return list; + } + + public void setLingerSession(String lingerSessionId) { + this.lingerSessionId = lingerSessionId; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index ddba797..c668f6e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -18,8 +18,8 @@ package org.apache.activemq.artemis.core.server.impl; import java.math.BigDecimal; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; @@ -571,6 +571,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { tx.rollback(); + addLingerRefs(); + if (!browseOnly) { TypedProperties props = new TypedProperties(); @@ -607,6 +609,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } } + private void addLingerRefs() throws Exception { + if (!browseOnly) { + List<MessageReference> lingerRefs = session.getInTXMessagesForConsumer(this.id); + if (lingerRefs != null && !lingerRefs.isEmpty()) { + session.addLingerConsumer(this); + } + } + } + @Override public void removeItself() throws Exception { if (browseOnly) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 7c26028..08f2d4e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -2114,6 +2114,41 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } @Override + public List<MessageReference> getInTxLingerMessages() { + Transaction transaction = tx; + if (transaction == null && callback != null) { + transaction = callback.getCurrentTransaction(); + } + RefsOperation operation = transaction == null ? null : (RefsOperation) transaction.getProperty(TransactionPropertyIndexes.REFS_OPERATION); + + return operation == null ? null : operation.getLingerMessages(); + } + + @Override + public void addLingerConsumer(ServerConsumer consumer) { + Transaction transaction = tx; + if (transaction == null && callback != null) { + transaction = callback.getCurrentTransaction(); + } + if (transaction != null) { + synchronized (transaction) { + // Transaction might be committed/rolledback, we need to synchronize and judge state + if (transaction.getState() != State.COMMITTED && transaction.getState() != State.ROLLEDBACK) { + RefsOperation operation = (RefsOperation) transaction.getProperty(TransactionPropertyIndexes.REFS_OPERATION); + List<MessageReference> refs = operation == null ? null : operation.getListOnConsumer(consumer.getID()); + if (refs != null && !refs.isEmpty()) { + for (MessageReference ref : refs) { + ref.emptyConsumerID(); + } + operation.setLingerSession(name); + consumer.getQueue().addLingerSession(name); + } + } + } + } + } + + @Override public SimpleString removePrefix(SimpleString address) { if (prefixEnabled && address != null) { return PrefixUtil.getAddress(address, prefixes); @@ -2183,4 +2218,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener { AddressSettings as = server.getAddressSettingsRepository().getMatch(address.toString()); return as.getDefaultConsumerWindowSize(); } + + @Override + public String toManagementString() { + return "ServerSession [id=" + getConnectionID() + ":" + getName() + "]"; + } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 5646905..e266ef5 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -1011,6 +1011,16 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override + public void addLingerSession(String sessionId) { + + } + + @Override + public void removeLingerSession(String sessionId) { + + } + + @Override public void removeConsumer(Consumer consumer) { } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ReceiveTest.java index 40b8333..cca8b10 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ReceiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ReceiveTest.java @@ -41,8 +41,12 @@ public class ReceiveTest extends ActiveMQTestBase { SimpleString addressA; + SimpleString addressB; + SimpleString queueA; + SimpleString queueB; + private ServerLocator locator; private ActiveMQServer server; @@ -54,6 +58,8 @@ public class ReceiveTest extends ActiveMQTestBase { addressA = RandomUtil.randomSimpleString(); queueA = RandomUtil.randomSimpleString(); + addressB = RandomUtil.randomSimpleString(); + queueB = RandomUtil.randomSimpleString(); locator = createInVMNonHALocator(); server = createServer(false); @@ -162,4 +168,37 @@ public class ReceiveTest extends ActiveMQTestBase { session.close(); sendSession.close(); } + + @Test + public void testMultiConsumersOnSession() throws Exception { + ClientSessionFactory cf = createSessionFactory(locator.setCallTimeout(10000000)); + ClientSession sendSession = cf.createSession(false, true, true); + ClientProducer cp1 = sendSession.createProducer(addressA); + ClientProducer cp2 = sendSession.createProducer(addressB); + + ClientSession session = cf.createSession(false, true, false); + session.createQueue(addressA, queueA, false); + session.createQueue(addressB, queueB, false); + + ClientConsumer cc1 = session.createConsumer(queueA); + ClientConsumer cc2 = session.createConsumer(queueB); + session.start(); + + cp1.send(sendSession.createMessage(false)); + cp2.send(sendSession.createMessage(false)); + Assert.assertNotNull(cc1.receive().acknowledge()); + Assert.assertNotNull(cc2.receive().acknowledge()); + session.commit(); + + final Queue queue1 = server.locateQueue(queueA); + final Queue queue2 = server.locateQueue(queueB); + + Wait.assertTrue(() -> queue1.getMessageCount() == 0, 500, 100); + Wait.assertTrue(() -> queue1.getMessagesAcknowledged() == 1, 500, 100); + Wait.assertTrue(() -> queue2.getMessageCount() == 0, 500, 100); + Wait.assertTrue(() -> queue2.getMessagesAcknowledged() == 1, 500, 100); + + session.close(); + sendSession.close(); + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index 15b01b8..32e52cc 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -639,6 +639,43 @@ public class QueueControlTest extends ManagementTestBase { } @Test + public void testListDeliveringMessagesOnClosedConsumer() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + int intValue = RandomUtil.randomInt(); + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); + + Queue srvqueue = server.locateQueue(queue); + + QueueControl queueControl = createManagementControl(address, queue); + + ClientProducer producer = session.createProducer(address); + ClientMessage message = session.createMessage(durable); + message.putIntProperty(new SimpleString("key"), intValue); + producer.send(message); + producer.send(session.createMessage(durable)); + + ClientConsumer consumer = session.createConsumer(queue); + session.start(); + ClientMessage msgRec = consumer.receive(5000); + assertNotNull(msgRec); + assertEquals(msgRec.getIntProperty("key").intValue(), intValue); + assertEquals(1, srvqueue.getDeliveringCount()); + assertEquals(1, queueControl.listDeliveringMessages().size()); + + msgRec.acknowledge(); + consumer.close(); + assertEquals(1, srvqueue.getDeliveringCount()); + + System.out.println(queueControl.listDeliveringMessagesAsJSON()); + + Map<String, Map<String, Object>[]> deliveringMap = queueControl.listDeliveringMessages(); + assertEquals(1, deliveringMap.size()); + + session.deleteQueue(queue); + } + + @Test public void testListScheduledMessages() throws Exception { long delay = 2000; SimpleString address = RandomUtil.randomSimpleString(); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index 4cf5346..7b7890f 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -355,6 +355,16 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } @Override + public void addLingerSession(String sessionId) { + + } + + @Override + public void removeLingerSession(String sessionId) { + + } + + @Override public void addRedistributor(final long delay) { // no-op