Repository: activemq-artemis
Updated Branches:
  refs/heads/master e305cc987 -> f138bc528


ARTEMIS-1300 Deadlock when Core direct deliverying and AMQP receive


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8f5f2bbe
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8f5f2bbe
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8f5f2bbe

Branch: refs/heads/master
Commit: 8f5f2bbe5278de56c0afac4e634f30231f779cc5
Parents: e305cc9
Author: Clebert Suconic <[email protected]>
Authored: Wed Jul 19 19:20:46 2017 -0400
Committer: Clebert Suconic <[email protected]>
Committed: Thu Jul 20 08:59:32 2017 -0400

----------------------------------------------------------------------
 .../amqp/broker/AMQPSessionCallback.java        |  9 ++++--
 .../core/protocol/openwire/amq/AMQSession.java  |  7 +++++
 .../activemq/artemis/core/server/Consumer.java  |  9 ++++++
 .../artemis/core/server/impl/QueueImpl.java     | 30 ++++++++++++++++++--
 .../core/server/impl/ServerConsumerImpl.java    | 10 +++++++
 .../spi/core/protocol/SessionCallback.java      | 11 +++++++
 6 files changed, 71 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f5f2bbe/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index ed15a56..98845cd 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -94,7 +94,7 @@ public class AMQPSessionCallback implements SessionCallback {
 
    private AMQPSessionContext protonSession;
 
-   private final Executor closeExecutor;
+   private final Executor sessionExecutor;
 
    private final AtomicBoolean draining = new AtomicBoolean(false);
 
@@ -109,7 +109,7 @@ public class AMQPSessionCallback implements SessionCallback 
{
       this.storageManager = manager.getServer().getStorageManager();
       this.connection = connection;
       this.transportConnection = transportConnection;
-      this.closeExecutor = executor;
+      this.sessionExecutor = executor;
       this.operationContext = operationContext;
    }
 
@@ -164,6 +164,11 @@ public class AMQPSessionCallback implements 
SessionCallback {
 
    }
 
+   @Override
+   public boolean supportsDirectDelivery() {
+      return false;
+   }
+
    public void init(AMQPSessionContext protonSession, SASLResult saslResult) 
throws Exception {
 
       this.protonSession = protonSession;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f5f2bbe/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index bd07251..9b6670e 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -129,6 +129,13 @@ public class AMQSession implements SessionCallback {
 
    }
 
+
+   @Override
+   public boolean supportsDirectDelivery() {
+      return false;
+   }
+
+
    @Override
    public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, 
MessageReference ref, boolean failed) {
       if (consumer.getProtocolData() != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f5f2bbe/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
index 50c0b01..6df4889 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
@@ -19,10 +19,19 @@ package org.apache.activemq.artemis.core.server;
 import java.util.List;
 
 import org.apache.activemq.artemis.core.filter.Filter;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 
 public interface Consumer {
 
    /**
+    *
+    * @see SessionCallback#supportsDirectDelivery()
+    */
+   default boolean supportsDirectDelivery() {
+      return true;
+   }
+
+   /**
     * There was a change on semantic during 2.3 here.<br>
     * We now first accept the message, and the actual deliver is done as part 
of
     * {@link #proceedDeliver(MessageReference)}. This is to avoid holding a 
lock on the queues while

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f5f2bbe/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index f922c3a..9269eb3 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -227,6 +227,8 @@ public class QueueImpl implements Queue {
 
    private volatile boolean directDeliver = true;
 
+   private volatile boolean supportsDirectDeliver = true;
+
    private AddressSettingsRepositoryListener addressSettingsRepositoryListener;
 
    private final ExpiryScanner expiryScanner = new ExpiryScanner();
@@ -623,7 +625,7 @@ public class QueueImpl implements Queue {
          // The checkDirect flag is periodically set to true, if the delivery 
is specified as direct then this causes the
          // directDeliver flag to be re-computed resulting in direct delivery 
if the queue is empty
          // We don't recompute it on every delivery since executing isEmpty is 
expensive for a ConcurrentQueue
-         if (!directDeliver &&
+         if (supportsDirectDeliver && !directDeliver &&
             direct &&
             System.currentTimeMillis() - lastDirectDeliveryCheck > 
CHECK_QUEUE_SIZE_PERIOD) {
             lastDirectDeliveryCheck = System.currentTimeMillis();
@@ -636,13 +638,13 @@ public class QueueImpl implements Queue {
                // deliveries
                if (flushExecutor() && flushDeliveriesInTransit()) {
                   // Go into direct delivery mode
-                  directDeliver = true;
+                  directDeliver = supportsDirectDeliver;
                }
             }
          }
       }
 
-      if (direct && directDeliver && deliveriesInTransit.getCount() == 0 && 
deliverDirect(ref)) {
+      if (direct && supportsDirectDeliver && directDeliver && 
deliveriesInTransit.getCount() == 0 && deliverDirect(ref)) {
          return;
       }
 
@@ -796,6 +798,10 @@ public class QueueImpl implements Queue {
 
          consumersChanged = true;
 
+         if (!consumer.supportsDirectDelivery()) {
+            this.supportsDirectDeliver = false;
+         }
+
          cancelRedistributor();
 
          consumerList.add(new ConsumerHolder(consumer));
@@ -828,6 +834,8 @@ public class QueueImpl implements Queue {
             }
          }
 
+         this.supportsDirectDeliver = checkConsumerDirectDeliver();
+
          if (pos > 0 && pos >= consumerList.size()) {
             pos = consumerList.size() - 1;
          }
@@ -864,6 +872,16 @@ public class QueueImpl implements Queue {
       }
    }
 
+   private boolean checkConsumerDirectDeliver() {
+      boolean supports = true;
+      for (ConsumerHolder consumerCheck: consumerList) {
+         if (!consumerCheck.consumer.supportsDirectDelivery()) {
+            supports = false;
+         }
+      }
+      return supports;
+   }
+
    @Override
    public synchronized void addRedistributor(final long delay) {
       clearRedistributorFuture();
@@ -2620,6 +2638,12 @@ public class QueueImpl implements Queue {
     */
    private boolean deliverDirect(final MessageReference ref) {
       synchronized (this) {
+         if (!supportsDirectDeliver) {
+            // this should never happen, but who knows?
+            // if someone ever change add and removeConsumer,
+            // this would protect any eventual bug
+            return false;
+         }
          if (paused || consumerList.isEmpty()) {
             return false;
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f5f2bbe/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index f614fa1..a685163 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -336,6 +336,16 @@ public class ServerConsumerImpl implements ServerConsumer, 
ReadyListener {
       return refs;
    }
 
+   /** i
+    *
+    * @see SessionCallback#supportsDirectDelivery()
+    */
+   @Override
+   public boolean supportsDirectDelivery() {
+      return callback.supportsDirectDelivery();
+   }
+
+
    @Override
    public HandleStatus handle(final MessageReference ref) throws Exception {
       if (callback != null && !callback.hasCredits(this) || availableCredits 
!= null && availableCredits.get() <= 0) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f5f2bbe/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index 799e8b0..edfb5dc 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -24,6 +24,17 @@ import 
org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 
 public interface SessionCallback {
 
+   /** A requirement to do direct delivery is:
+    *  no extra locking required at the protocol layer.
+    *  which cannot be guaranteed at AMQP as proton will need the locking.
+    *  So, disable this on AMQP or any other protocol requiring extra lock.
+    * @return
+    */
+   default boolean supportsDirectDelivery() {
+      return true;
+   }
+
+
    /**
     * This one gives a chance for Proton to have its own flow control.
     */

Reply via email to