This is an automated email from the ASF dual-hosted git repository.

wy96f pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a61d2d  ARTEMIS-2380 Fix delivering message in the case of consume 
close
     new 9c1cbf3  This closes #2703
4a61d2d is described below

commit 4a61d2dc769ebc76b5cb18a240931ad169c0123e
Author: Wei Yang <wy96...@gmail.com>
AuthorDate: Fri Aug 30 17:38:04 2019 +0800

    ARTEMIS-2380 Fix delivering message in the case of consume close
---
 .../apache/activemq/artemis/core/server/Queue.java |  4 +++
 .../artemis/core/server/ServerSession.java         |  6 ++++
 .../artemis/core/server/impl/QueueImpl.java        | 23 +++++++++++++
 .../artemis/core/server/impl/RefsOperation.java    | 28 ++++++++++++++-
 .../core/server/impl/ServerConsumerImpl.java       | 13 ++++++-
 .../core/server/impl/ServerSessionImpl.java        | 40 ++++++++++++++++++++++
 .../server/impl/ScheduledDeliveryHandlerTest.java  | 10 ++++++
 .../tests/integration/client/ReceiveTest.java      | 39 +++++++++++++++++++++
 .../integration/management/QueueControlTest.java   | 37 ++++++++++++++++++++
 .../tests/unit/core/postoffice/impl/FakeQueue.java | 10 ++++++
 10 files changed, 208 insertions(+), 2 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 3ff1b84..adcb72e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -129,6 +129,10 @@ public interface Queue extends Bindable,CriticalComponent {
 
    void addConsumer(Consumer consumer) throws Exception;
 
+   void addLingerSession(String sessionId);
+
+   void removeLingerSession(String sessionId);
+
    void removeConsumer(Consumer consumer);
 
    int getConsumerCount();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index 210fe89..f940ec2 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -429,6 +429,10 @@ public interface ServerSession extends SecurityAuth {
 
    List<MessageReference> getInTXMessagesForConsumer(long consumerId);
 
+   List<MessageReference> getInTxLingerMessages();
+
+   void addLingerConsumer(ServerConsumer consumer);
+
    String getValidatedUser();
 
    SimpleString getMatchingQueue(SimpleString address, RoutingType 
routingType) throws Exception;
@@ -490,4 +494,6 @@ public interface ServerSession extends SecurityAuth {
    int getProducerCount();
 
    int getDefaultConsumerWindowSize(SimpleString address);
+
+   String toManagementString();
 }
\ No newline at end of file
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 04ca5d4..a9eab4f 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -80,6 +80,7 @@ import org.apache.activemq.artemis.core.server.QueueFactory;
 import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
 import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
@@ -101,6 +102,7 @@ import org.apache.activemq.artemis.utils.Env;
 import org.apache.activemq.artemis.utils.ReferenceCounter;
 import org.apache.activemq.artemis.utils.ReusableLatch;
 import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
 import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl;
@@ -321,6 +323,8 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
     */
    private final Object directDeliveryGuard = new Object();
 
+   private final ConcurrentHashSet<String> lingerSessionIds = new 
ConcurrentHashSet<>();
+
    public String debug() {
       StringWriter str = new StringWriter();
       PrintWriter out = new PrintWriter(str);
@@ -1261,6 +1265,16 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    }
 
    @Override
+   public void addLingerSession(String sessionId) {
+      lingerSessionIds.add(sessionId);
+   }
+
+   @Override
+   public void removeLingerSession(String sessionId) {
+      lingerSessionIds.remove(sessionId);
+   }
+
+   @Override
    public void removeConsumer(final Consumer consumer) {
 
       enterCritical(CRITICAL_CONSUMER);
@@ -1585,6 +1599,15 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
             mapReturn.put(holder.consumer.toManagementString(), msgs);
          }
       }
+
+      for (String lingerSessionId : lingerSessionIds) {
+         ServerSession serverSession = server.getSessionByID(lingerSessionId);
+         List<MessageReference> refs = serverSession == null ? null : 
serverSession.getInTxLingerMessages();
+         if (refs != null && !refs.isEmpty()) {
+            mapReturn.put(serverSession.toManagementString(), refs);
+         }
+      }
+
       return mapReturn;
    }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index f0b6d34..d3cd425 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -52,6 +52,8 @@ public class RefsOperation extends 
TransactionOperationAbstract {
     */
    protected boolean ignoreRedeliveryCheck = false;
 
+   private String lingerSessionId = null;
+
    public RefsOperation(Queue queue, AckReason reason, StorageManager 
storageManager) {
       this.queue = queue;
       this.reason = reason;
@@ -97,6 +99,8 @@ public class RefsOperation extends 
TransactionOperationAbstract {
       List<MessageReference> ackedRefs = new ArrayList<>();
 
       for (MessageReference ref : refsToAck) {
+         clearLingerRef(ref);
+
          ref.emptyConsumerID();
 
          if (logger.isTraceEnabled()) {
@@ -175,8 +179,10 @@ public class RefsOperation extends 
TransactionOperationAbstract {
    @Override
    public void afterCommit(final Transaction tx) {
       for (MessageReference ref : refsToAck) {
+         clearLingerRef(ref);
+
          synchronized (ref.getQueue()) {
-            queue.postAcknowledge(ref, reason);
+            ref.getQueue().postAcknowledge(ref, reason);
          }
       }
 
@@ -190,6 +196,12 @@ public class RefsOperation extends 
TransactionOperationAbstract {
       }
    }
 
+   private void clearLingerRef(MessageReference ref) {
+      if (!ref.hasConsumerId() && lingerSessionId != null) {
+         ref.getQueue().removeLingerSession(lingerSessionId);
+      }
+   }
+
    private void decrementRefCount(MessageReference refmsg) {
       try {
          refmsg.getMessage().decrementRefCount();
@@ -228,4 +240,18 @@ public class RefsOperation extends 
TransactionOperationAbstract {
       return refsToAck;
    }
 
+   public synchronized List<MessageReference> getLingerMessages() {
+      List<MessageReference> list = new LinkedList<>();
+      for (MessageReference ref : refsToAck) {
+         if (!ref.hasConsumerId() && lingerSessionId != null) {
+            list.add(ref);
+         }
+      }
+
+      return list;
+   }
+
+   public void setLingerSession(String lingerSessionId) {
+      this.lingerSessionId = lingerSessionId;
+   }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index ddba797..c668f6e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -18,8 +18,8 @@ package org.apache.activemq.artemis.core.server.impl;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -571,6 +571,8 @@ public class ServerConsumerImpl implements ServerConsumer, 
ReadyListener {
 
       tx.rollback();
 
+      addLingerRefs();
+
       if (!browseOnly) {
          TypedProperties props = new TypedProperties();
 
@@ -607,6 +609,15 @@ public class ServerConsumerImpl implements ServerConsumer, 
ReadyListener {
       }
    }
 
+   private void addLingerRefs() throws Exception {
+      if (!browseOnly) {
+         List<MessageReference> lingerRefs = 
session.getInTXMessagesForConsumer(this.id);
+         if (lingerRefs != null && !lingerRefs.isEmpty()) {
+            session.addLingerConsumer(this);
+         }
+      }
+   }
+
    @Override
    public void removeItself() throws Exception {
       if (browseOnly) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 7c26028..08f2d4e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -2114,6 +2114,41 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
    }
 
    @Override
+   public List<MessageReference> getInTxLingerMessages() {
+      Transaction transaction = tx;
+      if (transaction == null && callback != null) {
+         transaction = callback.getCurrentTransaction();
+      }
+      RefsOperation operation = transaction == null ? null : (RefsOperation) 
transaction.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
+
+      return operation == null ? null : operation.getLingerMessages();
+   }
+
+   @Override
+   public void addLingerConsumer(ServerConsumer consumer) {
+      Transaction transaction = tx;
+      if (transaction == null && callback != null) {
+         transaction = callback.getCurrentTransaction();
+      }
+      if (transaction != null) {
+         synchronized (transaction) {
+            // Transaction might be committed/rolledback, we need to 
synchronize and judge state
+            if (transaction.getState() != State.COMMITTED && 
transaction.getState() != State.ROLLEDBACK) {
+               RefsOperation operation = (RefsOperation) 
transaction.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
+               List<MessageReference> refs = operation == null ? null : 
operation.getListOnConsumer(consumer.getID());
+               if (refs != null && !refs.isEmpty()) {
+                  for (MessageReference ref : refs) {
+                     ref.emptyConsumerID();
+                  }
+                  operation.setLingerSession(name);
+                  consumer.getQueue().addLingerSession(name);
+               }
+            }
+         }
+      }
+   }
+
+   @Override
    public SimpleString removePrefix(SimpleString address) {
       if (prefixEnabled && address != null) {
          return PrefixUtil.getAddress(address, prefixes);
@@ -2183,4 +2218,9 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
       AddressSettings as = 
server.getAddressSettingsRepository().getMatch(address.toString());
       return as.getDefaultConsumerWindowSize();
    }
+
+   @Override
+   public String toManagementString() {
+      return "ServerSession [id=" + getConnectionID() + ":" + getName() + "]";
+   }
 }
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 5646905..e266ef5 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -1011,6 +1011,16 @@ public class ScheduledDeliveryHandlerTest extends Assert 
{
       }
 
       @Override
+      public void addLingerSession(String sessionId) {
+
+      }
+
+      @Override
+      public void removeLingerSession(String sessionId) {
+
+      }
+
+      @Override
       public void removeConsumer(Consumer consumer) {
 
       }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ReceiveTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ReceiveTest.java
index 40b8333..cca8b10 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ReceiveTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ReceiveTest.java
@@ -41,8 +41,12 @@ public class ReceiveTest extends ActiveMQTestBase {
 
    SimpleString addressA;
 
+   SimpleString addressB;
+
    SimpleString queueA;
 
+   SimpleString queueB;
+
    private ServerLocator locator;
 
    private ActiveMQServer server;
@@ -54,6 +58,8 @@ public class ReceiveTest extends ActiveMQTestBase {
 
       addressA = RandomUtil.randomSimpleString();
       queueA = RandomUtil.randomSimpleString();
+      addressB = RandomUtil.randomSimpleString();
+      queueB = RandomUtil.randomSimpleString();
 
       locator = createInVMNonHALocator();
       server = createServer(false);
@@ -162,4 +168,37 @@ public class ReceiveTest extends ActiveMQTestBase {
       session.close();
       sendSession.close();
    }
+
+   @Test
+   public void testMultiConsumersOnSession() throws Exception {
+      ClientSessionFactory cf = 
createSessionFactory(locator.setCallTimeout(10000000));
+      ClientSession sendSession = cf.createSession(false, true, true);
+      ClientProducer cp1 = sendSession.createProducer(addressA);
+      ClientProducer cp2 = sendSession.createProducer(addressB);
+
+      ClientSession session = cf.createSession(false, true, false);
+      session.createQueue(addressA, queueA, false);
+      session.createQueue(addressB, queueB, false);
+
+      ClientConsumer cc1 = session.createConsumer(queueA);
+      ClientConsumer cc2 = session.createConsumer(queueB);
+      session.start();
+
+      cp1.send(sendSession.createMessage(false));
+      cp2.send(sendSession.createMessage(false));
+      Assert.assertNotNull(cc1.receive().acknowledge());
+      Assert.assertNotNull(cc2.receive().acknowledge());
+      session.commit();
+
+      final Queue queue1 = server.locateQueue(queueA);
+      final Queue queue2 = server.locateQueue(queueB);
+
+      Wait.assertTrue(() -> queue1.getMessageCount() == 0, 500, 100);
+      Wait.assertTrue(() -> queue1.getMessagesAcknowledged() == 1, 500, 100);
+      Wait.assertTrue(() -> queue2.getMessageCount() == 0, 500, 100);
+      Wait.assertTrue(() -> queue2.getMessagesAcknowledged() == 1, 500, 100);
+
+      session.close();
+      sendSession.close();
+   }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 15b01b8..32e52cc 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -639,6 +639,43 @@ public class QueueControlTest extends ManagementTestBase {
    }
 
    @Test
+   public void testListDeliveringMessagesOnClosedConsumer() throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString queue = RandomUtil.randomSimpleString();
+      int intValue = RandomUtil.randomInt();
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
+
+      Queue srvqueue = server.locateQueue(queue);
+
+      QueueControl queueControl = createManagementControl(address, queue);
+
+      ClientProducer producer = session.createProducer(address);
+      ClientMessage message = session.createMessage(durable);
+      message.putIntProperty(new SimpleString("key"), intValue);
+      producer.send(message);
+      producer.send(session.createMessage(durable));
+
+      ClientConsumer consumer = session.createConsumer(queue);
+      session.start();
+      ClientMessage msgRec = consumer.receive(5000);
+      assertNotNull(msgRec);
+      assertEquals(msgRec.getIntProperty("key").intValue(), intValue);
+      assertEquals(1, srvqueue.getDeliveringCount());
+      assertEquals(1, queueControl.listDeliveringMessages().size());
+
+      msgRec.acknowledge();
+      consumer.close();
+      assertEquals(1, srvqueue.getDeliveringCount());
+
+      System.out.println(queueControl.listDeliveringMessagesAsJSON());
+
+      Map<String, Map<String, Object>[]> deliveringMap = 
queueControl.listDeliveringMessages();
+      assertEquals(1, deliveringMap.size());
+
+      session.deleteQueue(queue);
+   }
+
+   @Test
    public void testListScheduledMessages() throws Exception {
       long delay = 2000;
       SimpleString address = RandomUtil.randomSimpleString();
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 4cf5346..7b7890f 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -355,6 +355,16 @@ public class FakeQueue extends CriticalComponentImpl 
implements Queue {
    }
 
    @Override
+   public void addLingerSession(String sessionId) {
+
+   }
+
+   @Override
+   public void removeLingerSession(String sessionId) {
+
+   }
+
+   @Override
    public void addRedistributor(final long delay) {
       // no-op
 

Reply via email to