Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x 5b7f2d5c2 -> dea166ab8


ARTEMIS-1850 QueueControl.listDeliveringMessages returns empty result

With AMQP protocol when some messages are received in a transaction,
calling JMX QueueControl.listDeliveringMessages() returns empty list
before the transaction is committed.

(cherry picked from commit 72eadb201d870b097c8659497823f27bf2401d6f)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/dea166ab
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/dea166ab
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/dea166ab

Branch: refs/heads/2.6.x
Commit: dea166ab83287ed15dba799aa9ed5effac75d416
Parents: 5b7f2d5
Author: Howard Gao <howard....@gmail.com>
Authored: Mon May 7 14:33:16 2018 +0800
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Wed Nov 14 10:52:16 2018 -0500

----------------------------------------------------------------------
 .../amqp/broker/AMQPSessionCallback.java        | 19 ++++
 .../transaction/ProtonTransactionHandler.java   | 11 ++-
 .../core/server/impl/ServerSessionImpl.java     | 10 +++
 .../spi/core/protocol/SessionCallback.java      |  5 ++
 .../integration/amqp/JMXManagementTest.java     | 92 ++++++++++++++++++++
 5 files changed, 136 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dea166ab/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 61816af..14c1042 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -56,6 +56,7 @@ import 
org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
 import 
org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
 import 
org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
+import 
org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionHandler;
 import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
 import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@@ -109,6 +110,8 @@ public class AMQPSessionCallback implements SessionCallback 
{
 
    private final AddressQueryCache<AddressQueryResult> addressQueryCache = new 
AddressQueryCache<>();
 
+   private ProtonTransactionHandler transactionHandler;
+
    public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
                               ProtonProtocolManager manager,
                               AMQPConnectionContext connection,
@@ -690,6 +693,14 @@ public class AMQPSessionCallback implements 
SessionCallback {
       }
    }
 
+   @Override
+   public Transaction getCurrentTransaction() {
+      if (this.transactionHandler != null) {
+         return this.transactionHandler.getCurrentTransaction();
+      }
+      return null;
+   }
+
    public Transaction getTransaction(Binary txid, boolean remove) throws 
ActiveMQAMQPException {
       return protonSPI.getTransaction(txid, remove);
    }
@@ -740,6 +751,14 @@ public class AMQPSessionCallback implements 
SessionCallback {
       serverSession.removeProducer(name);
    }
 
+   public void setTransactionHandler(ProtonTransactionHandler 
transactionHandler) {
+      this.transactionHandler = transactionHandler;
+   }
+
+   public ProtonTransactionHandler getTransactionHandler() {
+      return this.transactionHandler;
+   }
+
 
    class AddressQueryCache<T> {
       SimpleString address;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dea166ab/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
index 9ccc196..78a5b33 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
@@ -19,6 +19,7 @@ package 
org.apache.activemq.artemis.protocol.amqp.proton.transaction;
 import java.nio.ByteBuffer;
 
 import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
@@ -47,6 +48,7 @@ public class ProtonTransactionHandler implements 
ProtonDeliveryHandler {
 
    private final int amqpCredit;
    private final int amqpLowMark;
+   private Transaction currentTx;
 
    final AMQPSessionCallback sessionSPI;
    final AMQPConnectionContext connection;
@@ -58,6 +60,7 @@ public class ProtonTransactionHandler implements 
ProtonDeliveryHandler {
       this.connection = connection;
       this.amqpCredit = connection.getAmqpCredits();
       this.amqpLowMark = connection.getAmqpLowCredits();
+      this.sessionSPI.setTransactionHandler(this);
    }
 
    @Override
@@ -100,6 +103,7 @@ public class ProtonTransactionHandler implements 
ProtonDeliveryHandler {
             Binary txID = sessionSPI.newTransaction();
             Declared declared = new Declared();
             declared.setTxnId(txID);
+            currentTx = sessionSPI.getTransaction(txID, false);
             IOCallback ioAction = new IOCallback() {
                @Override
                public void done() {
@@ -115,7 +119,7 @@ public class ProtonTransactionHandler implements 
ProtonDeliveryHandler {
 
                @Override
                public void onError(int errorCode, String errorMessage) {
-
+                  currentTx = null;
                }
             };
             sessionSPI.afterIO(ioAction);
@@ -133,6 +137,7 @@ public class ProtonTransactionHandler implements 
ProtonDeliveryHandler {
                   try {
                      delivery.settle();
                      delivery.disposition(new Accepted());
+                     currentTx = null;
                   } finally {
                      connection.unlock();
                      connection.flush();
@@ -192,4 +197,8 @@ public class ProtonTransactionHandler implements 
ProtonDeliveryHandler {
       message.decode(encoded);
       return message;
    }
+
+   public Transaction getCurrentTransaction() {
+      return currentTx;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dea166ab/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 252da69..1d1f834 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1833,6 +1833,16 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
             return oper.getListOnConsumer(consumerId);
          }
       } else {
+         //amqp handles the transaction in callback
+         if (callback != null) {
+            Transaction transaction = callback.getCurrentTransaction();
+            if (transaction != null) {
+               RefsOperation operation = (RefsOperation) 
transaction.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
+               if (operation != null) {
+                  return operation.getListOnConsumer(consumerId);
+               }
+            }
+         }
          return Collections.emptyList();
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dea166ab/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index c4a2dbe..5577522 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 
 public interface SessionCallback {
@@ -93,4 +94,8 @@ public interface SessionCallback {
    default void close(boolean failed) {
 
    }
+
+   default Transaction getCurrentTransaction() {
+      return null;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dea166ab/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java
new file mode 100644
index 0000000..3be3e88
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import 
org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.Map;
+
+public class JMXManagementTest extends JMSClientTestSupport {
+
+   @Test
+   public void testListDeliveringMessages() throws Exception {
+      SimpleString queue = new SimpleString(getQueueName());
+
+      Connection connection1 = createConnection();
+      Connection connection2 = createConnection();
+      Session prodSession = connection1.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      Session consSession = connection2.createSession(true, 
Session.SESSION_TRANSACTED);
+
+      javax.jms.Queue jmsQueue = prodSession.createQueue(queue.toString());
+
+      QueueControl queueControl = createManagementControl(queue, queue);
+
+      MessageProducer producer = prodSession.createProducer(jmsQueue);
+      final int num = 20;
+
+      for (int i = 0; i < num; i++) {
+         TextMessage message = prodSession.createTextMessage("hello" + i);
+         producer.send(message);
+      }
+
+      connection2.start();
+      MessageConsumer consumer = consSession.createConsumer(jmsQueue);
+
+      for (int i = 0; i < num; i++) {
+         TextMessage msgRec = (TextMessage) consumer.receive(5000);
+         assertNotNull(msgRec);
+         assertEquals(msgRec.getText(), "hello" + i);
+      }
+
+      //before commit
+      assertEquals(num, queueControl.getDeliveringCount());
+
+      Map<String, Map<String, Object>[]> result = 
queueControl.listDeliveringMessages();
+      assertEquals(1, result.size());
+
+      Map<String, Object>[] msgMaps = 
result.entrySet().iterator().next().getValue();
+
+      assertEquals(num, msgMaps.length);
+
+      consSession.commit();
+      result = queueControl.listDeliveringMessages();
+
+      assertEquals(0, result.size());
+
+      consSession.close();
+      prodSession.close();
+
+      connection1.close();
+      connection2.close();
+   }
+
+   protected QueueControl createManagementControl(final SimpleString address,
+                                                  final SimpleString queue) 
throws Exception {
+      QueueControl queueControl = 
ManagementControlHelper.createQueueControl(address, queue, RoutingType.ANYCAST, 
this.mBeanServer);
+
+      return queueControl;
+   }
+}

Reply via email to