ARTEMIS-722 Add DELAYED_DELIVERY capability to server connection open

The server should indicate to clients that it supports the message
annotation that allows message delivery to be delayed
'x-opt-delivery-time'


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/42ff4a60
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/42ff4a60
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/42ff4a60

Branch: refs/heads/master
Commit: 42ff4a60482d3e991e1df05313b88807b4ea0be5
Parents: 795ddfc
Author: Timothy Bish <[email protected]>
Authored: Wed Sep 7 16:24:52 2016 -0400
Committer: Clebert Suconic <[email protected]>
Committed: Thu Sep 8 19:01:40 2016 -0400

----------------------------------------------------------------------
 .../org/proton/plug/AMQPConnectionContext.java  |  10 ++
 .../main/java/org/proton/plug/AmqpSupport.java  |   1 +
 .../plug/context/AbstractConnectionContext.java |  19 +++-
 .../server/ProtonServerConnectionContext.java   |   6 +
 .../tests/integration/proton/ProtonTest.java    | 114 +++++++++++++++++++
 5 files changed, 144 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/42ff4a60/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContext.java
 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContext.java
index 45f9804..9123006 100644
--- 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContext.java
+++ 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContext.java
@@ -16,6 +16,8 @@
  */
 package org.proton.plug;
 
+import org.apache.qpid.proton.amqp.Symbol;
+
 import io.netty.buffer.ByteBuf;
 
 public interface AMQPConnectionContext {
@@ -31,6 +33,14 @@ public interface AMQPConnectionContext {
    SASLResult getSASLResult();
 
    /**
+    * Load and return a <code>[]Symbol</code> that contains the connection 
capabilities
+    * offered to new connections
+    *
+    * @return the capabilities that are offered to new remote peers on connect.
+    */
+   Symbol[] getConnectionCapabilitiesOffered();
+
+   /**
     * Even though we are currently always sending packets asynchronsouly
     * we have a possibility to start trusting on the network flow control
     * and always sync on the send of the packet

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/42ff4a60/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java
 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java
index f57fd81..1580855 100644
--- 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java
+++ 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java
@@ -46,6 +46,7 @@ public class AmqpSupport {
 
    // Symbols used to announce connection information to remote peer.
    public static final Symbol ANONYMOUS_RELAY = 
Symbol.valueOf("ANONYMOUS-RELAY");
+   public static final Symbol DELAYED_DELIVERY = 
Symbol.valueOf("DELAYED_DELIVERY");
    public static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
    public static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix");
    public static final Symbol CONNECTION_OPEN_FAILED = 
Symbol.valueOf("amqp:connection-establishment-failed");

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/42ff4a60/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
index c881031..9ece790 100644
--- 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
+++ 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
@@ -16,6 +16,9 @@
  */
 package org.proton.plug.context;
 
+import static org.proton.plug.AmqpSupport.PRODUCT;
+import static org.proton.plug.AmqpSupport.VERSION;
+
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -76,11 +79,13 @@ public abstract class AbstractConnectionContext extends 
ProtonInitializable impl
                                     ScheduledExecutorService scheduledPool) {
       this.connectionCallback = connectionCallback;
       this.containerId = (containerId != null) ? containerId : 
UUID.randomUUID().toString();
-      connectionProperties.put(Symbol.valueOf("product"), 
"apache-activemq-artemis");
-      connectionProperties.put(Symbol.valueOf("version"), 
VersionLoader.getVersion().getFullVersion());
+
+      connectionProperties.put(PRODUCT, "apache-activemq-artemis");
+      connectionProperties.put(VERSION, 
VersionLoader.getVersion().getFullVersion());
+
       this.scheduledPool = scheduledPool;
       connectionCallback.setConnection(this);
-      this.handler =   ProtonHandler.Factory.create(dispatchExecutor);
+      this.handler = ProtonHandler.Factory.create(dispatchExecutor);
       Transport transport = handler.getTransport();
       transport.setEmitFlowEventOnSend(false);
       if (idleTimeout > 0) {
@@ -211,6 +216,7 @@ public abstract class AbstractConnectionContext extends 
ProtonInitializable impl
             connection.setContext(AbstractConnectionContext.this);
             connection.setContainer(containerId);
             connection.setProperties(connectionProperties);
+            
connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
             connection.open();
          }
          initialise();
@@ -326,9 +332,10 @@ public abstract class AbstractConnectionContext extends 
ProtonInitializable impl
             System.err.println("Handler is null, can't delivery " + delivery);
          }
       }
-
    }
 
-
-
+   @Override
+   public Symbol[] getConnectionCapabilitiesOffered() {
+      return null;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/42ff4a60/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
index bdb3a69..4124c2f 100644
--- 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
+++ 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
@@ -16,6 +16,9 @@
  */
 package org.proton.plug.context.server;
 
+import static org.proton.plug.AmqpSupport.DELAYED_DELIVERY;
+
+import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.transaction.Coordinator;
 import org.apache.qpid.proton.engine.Link;
 import org.apache.qpid.proton.engine.Receiver;
@@ -79,4 +82,7 @@ public class ProtonServerConnectionContext extends 
AbstractConnectionContext imp
       }
    }
 
+   public Symbol[] getConnectionCapabilitiesOffered() {
+      return new Symbol[]{DELAYED_DELIVERY};
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/42ff4a60/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
index 711f6ff..8da5aa2 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
@@ -16,6 +16,11 @@
  */
 package org.apache.activemq.artemis.tests.integration.proton;
 
+import static org.proton.plug.AmqpSupport.contains;
+import static org.proton.plug.AmqpSupport.DELAYED_DELIVERY;
+import static org.proton.plug.AmqpSupport.PRODUCT;
+import static org.proton.plug.AmqpSupport.VERSION;
+
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -63,6 +68,7 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.activemq.transport.amqp.client.AmqpValidator;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
@@ -210,6 +216,114 @@ public class ProtonTest extends ProtonTestBase {
       }
    }
 
+   @Test(timeout = 60000)
+   public void testConnectionCarriesExpectedCapabilities() throws Exception {
+      if (protocol != 0 && protocol != 3) return; // Only run this test for 
AMQP protocol
+
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), 
userName, password);
+      assertNotNull(client);
+
+      client.setValidator(new AmqpValidator() {
+
+         @Override
+         public void 
inspectOpenedResource(org.apache.qpid.proton.engine.Connection connection) {
+
+            Symbol[] offered = connection.getRemoteOfferedCapabilities();
+
+            if (!contains(offered, DELAYED_DELIVERY)) {
+               markAsInvalid("Broker did not indicate it support delayed 
message delivery");
+               return;
+            }
+
+            Map<Symbol, Object> properties = connection.getRemoteProperties();
+            if (!properties.containsKey(PRODUCT)) {
+               markAsInvalid("Broker did not send a queue product name value");
+               return;
+            }
+
+            if (!properties.containsKey(VERSION)) {
+               markAsInvalid("Broker did not send a queue version value");
+               return;
+            }
+         }
+      });
+
+      AmqpConnection connection = client.connect();
+      try {
+         assertNotNull(connection);
+         connection.getStateInspector().assertValid();
+      }
+      finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testSendWithDeliveryTimeHoldsMessage() throws Exception {
+      if (protocol != 0 && protocol != 3) return; // Only run this test for 
AMQP protocol
+
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), 
userName, password);
+      assertNotNull(client);
+
+      AmqpConnection connection = client.connect();
+      try {
+         AmqpSession session = connection.createSession();
+
+         AmqpSender sender = session.createSender(address);
+         AmqpReceiver receiver = session.createReceiver(address);
+
+         AmqpMessage message = new AmqpMessage();
+         long deliveryTime = System.currentTimeMillis() + 
TimeUnit.MINUTES.toMillis(5);
+         message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
+         message.setText("Test-Message");
+         sender.send(message);
+
+         // Now try and get the message
+         receiver.flow(1);
+
+         // Shouldn't get this since we delayed the message.
+         assertNull(receiver.receive(5, TimeUnit.SECONDS));
+      }
+      finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testSendWithDeliveryTimeDeliversMessageAfterDelay() throws 
Exception {
+      if (protocol != 0 && protocol != 3) return; // Only run this test for 
AMQP protocol
+
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), 
userName, password);
+      assertNotNull(client);
+
+      AmqpConnection connection = client.connect();
+      try {
+         AmqpSession session = connection.createSession();
+
+         AmqpSender sender = session.createSender(address);
+         AmqpReceiver receiver = session.createReceiver(address);
+
+         AmqpMessage message = new AmqpMessage();
+         long deliveryTime = System.currentTimeMillis() + 2000;
+         message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
+         message.setText("Test-Message");
+         sender.send(message);
+
+         // Now try and get the message
+         receiver.flow(1);
+
+         AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
+         assertNotNull(received);
+         received.accept();
+         Long msgDeliveryTime = (Long) 
received.getMessageAnnotation("x-opt-delivery-time");
+         assertNotNull(msgDeliveryTime);
+         assertEquals(deliveryTime, msgDeliveryTime.longValue());
+      }
+      finally {
+         connection.close();
+      }
+   }
+
    @Test
    public void testCreditsAreAllocatedOnlyOnceOnLinkCreate() throws Exception {
       if (protocol != 0 && protocol != 3) return; // Only run this test for 
AMQP protocol

Reply via email to