Repository: activemq-artemis
Updated Branches:
  refs/heads/master c08429337 -> a18ad2784


ARTEMIS-724 Implement no-local consumer filter AMQP


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/553f2df7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/553f2df7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/553f2df7

Branch: refs/heads/master
Commit: 553f2df7455c74c6f7daca0c43bf1738822416dc
Parents: c084293
Author: Martyn Taylor <[email protected]>
Authored: Fri Sep 9 11:05:31 2016 +0100
Committer: Martyn Taylor <[email protected]>
Committed: Fri Sep 9 11:07:53 2016 +0100

----------------------------------------------------------------------
 .../plug/ProtonSessionIntegrationCallback.java    | 14 ++++++++++----
 .../java/org/proton/plug/AMQPSessionCallback.java |  4 ++--
 .../context/server/ProtonServerSenderContext.java | 18 +++++++++++-------
 .../test/minimalserver/MinimalSessionSPI.java     |  4 ++--
 4 files changed, 25 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/553f2df7/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index 107df8a..c3ac671 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
 import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
@@ -193,13 +194,13 @@ public class ProtonSessionIntegrationCallback implements 
AMQPSessionCallback, Se
    }
 
    @Override
-   public void createTemporaryQueue(String address, String queueName) throws 
Exception {
-      serverSession.createQueue(SimpleString.toSimpleString(address), 
SimpleString.toSimpleString(queueName), null, false, true);
+   public void createTemporaryQueue(String address, String queueName, String 
filter) throws Exception {
+      serverSession.createQueue(SimpleString.toSimpleString(address), 
SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), 
false, true);
    }
 
    @Override
-   public void createDurableQueue(String address, String queueName) throws 
Exception {
-      serverSession.createQueue(SimpleString.toSimpleString(address), 
SimpleString.toSimpleString(queueName), null, false, true);
+   public void createDurableQueue(String address, String queueName, String 
filter) throws Exception {
+      serverSession.createQueue(SimpleString.toSimpleString(address), 
SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), 
false, true);
    }
 
    @Override
@@ -404,7 +405,10 @@ public class ProtonSessionIntegrationCallback implements 
AMQPSessionCallback, Se
 
    private void serverSend(final ServerMessage message, final Delivery 
delivery, final Receiver receiver) throws Exception {
       try {
+
+         
message.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString(),
 receiver.getSession().getConnection().getRemoteContainer());
          serverSession.send(message, false);
+
          // FIXME Potential race here...
          manager.getServer().getStorageManager().afterCompleteOperations(new 
IOCallback() {
             @Override
@@ -483,6 +487,8 @@ public class ProtonSessionIntegrationCallback implements 
AMQPSessionCallback, Se
    @Override
    public int sendMessage(MessageReference ref, ServerMessage message, 
ServerConsumer consumer, int deliveryCount) {
 
+      
message.removeProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString());
+
       ProtonPlugSender plugSender = (ProtonPlugSender) 
consumer.getProtocolContext();
 
       try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/553f2df7/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
index 4599318..8406431 100644
--- 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
+++ 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
@@ -40,9 +40,9 @@ public interface AMQPSessionCallback {
 
    void createTemporaryQueue(String queueName) throws Exception;
 
-   void createTemporaryQueue(String address, String queueName) throws 
Exception;
+   void createTemporaryQueue(String address, String queueName, String filter) 
throws Exception;
 
-   void createDurableQueue(String address, String queueName) throws Exception;
+   void createDurableQueue(String address, String queueName, String filter) 
throws Exception;
 
    void offerProducerCredit(String address, int credits, int threshold, 
Receiver receiver);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/553f2df7/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
index 564c9ba..78b1668 100644
--- 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
+++ 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
@@ -18,6 +18,7 @@ package org.proton.plug.context.server;
 
 import java.util.Map;
 
+import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
 import org.apache.activemq.artemis.selector.filter.FilterException;
 import org.apache.activemq.artemis.selector.impl.SelectorParser;
 import org.apache.qpid.proton.amqp.DescribedType;
@@ -40,6 +41,7 @@ import org.apache.qpid.proton.engine.Sender;
 import org.apache.qpid.proton.message.ProtonJMessage;
 import org.jboss.logging.Logger;
 import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.AmqpSupport;
 import org.proton.plug.context.AbstractConnectionContext;
 import org.proton.plug.context.AbstractProtonContextSender;
 import org.proton.plug.context.AbstractProtonSessionContext;
@@ -114,6 +116,8 @@ public class ProtonServerSenderContext extends 
AbstractProtonContextSender imple
 
       String selector = null;
 
+      String noLocalFilter = null;
+
       /*
       * even tho the filter is a map it will only return a single filter 
unless a nolocal is also provided
       * */
@@ -130,6 +134,11 @@ public class ProtonServerSenderContext extends 
AbstractProtonContextSender imple
                return;
             }
          }
+
+         if (findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS) 
!= null) {
+            String remoteContainerId = 
sender.getSession().getConnection().getRemoteContainer();
+            noLocalFilter = 
ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + 
remoteContainerId + "'";
+         }
       }
 
       /*
@@ -138,11 +147,6 @@ public class ProtonServerSenderContext extends 
AbstractProtonContextSender imple
       * */
       boolean isPubSub = hasCapabilities(TOPIC, source) || isPubSub(source);
 
-      //filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS);
-
-      //if (filter != null) {
-         //todo implement nolocal filter
-      //}
       if (source == null) {
          // Attempt to recover a previous subscription happens when a link 
reattach happens on a subscription queue
          String clientId = connection.getRemoteContainer();
@@ -195,14 +199,14 @@ public class ProtonServerSenderContext extends 
AbstractProtonContextSender imple
                   queue = clientId + ":" + pubId;
                   boolean exists = sessionSPI.queueQuery(queue);
                   if (!exists) {
-                     sessionSPI.createDurableQueue(source.getAddress(), queue);
+                     sessionSPI.createDurableQueue(source.getAddress(), queue, 
noLocalFilter);
                   }
                }
                //otherwise we are a volatile subscription
                else {
                   queue = java.util.UUID.randomUUID().toString();
                   try {
-                     sessionSPI.createTemporaryQueue(source.getAddress(), 
queue);
+                     sessionSPI.createTemporaryQueue(source.getAddress(), 
queue, noLocalFilter);
                   }
                   catch (Exception e) {
                      throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/553f2df7/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
 
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
index 7397612..0701b17 100644
--- 
a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
+++ 
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
@@ -71,7 +71,7 @@ public class MinimalSessionSPI implements AMQPSessionCallback 
{
    }
 
    @Override
-   public void createDurableQueue(String address, String queueName) throws 
Exception {
+   public void createDurableQueue(String address, String queueName, String 
filter) throws Exception {
 
    }
 
@@ -81,7 +81,7 @@ public class MinimalSessionSPI implements AMQPSessionCallback 
{
    }
 
    @Override
-   public void createTemporaryQueue(String address, String queueName) throws 
Exception {
+   public void createTemporaryQueue(String address, String queueName, String 
filter) throws Exception {
 
    }
 

Reply via email to