ARTEMIS-722 Add DELAYED_DELIVERY capability to server connection open The server should indicate to clients that it supports the message annotation that allows message delivery to be delayed 'x-opt-delivery-time'
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/42ff4a60 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/42ff4a60 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/42ff4a60 Branch: refs/heads/master Commit: 42ff4a60482d3e991e1df05313b88807b4ea0be5 Parents: 795ddfc Author: Timothy Bish <[email protected]> Authored: Wed Sep 7 16:24:52 2016 -0400 Committer: Clebert Suconic <[email protected]> Committed: Thu Sep 8 19:01:40 2016 -0400 ---------------------------------------------------------------------- .../org/proton/plug/AMQPConnectionContext.java | 10 ++ .../main/java/org/proton/plug/AmqpSupport.java | 1 + .../plug/context/AbstractConnectionContext.java | 19 +++- .../server/ProtonServerConnectionContext.java | 6 + .../tests/integration/proton/ProtonTest.java | 114 +++++++++++++++++++ 5 files changed, 144 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/42ff4a60/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContext.java index 45f9804..9123006 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContext.java @@ -16,6 +16,8 @@ */ package org.proton.plug; +import org.apache.qpid.proton.amqp.Symbol; + import io.netty.buffer.ByteBuf; public interface AMQPConnectionContext { @@ -31,6 +33,14 @@ public interface AMQPConnectionContext { SASLResult getSASLResult(); /** + * Load and return a <code>[]Symbol</code> that contains the connection capabilities + * offered to new connections + * + * @return the capabilities that are offered to new remote peers on connect. + */ + Symbol[] getConnectionCapabilitiesOffered(); + + /** * Even though we are currently always sending packets asynchronsouly * we have a possibility to start trusting on the network flow control * and always sync on the send of the packet http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/42ff4a60/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java index f57fd81..1580855 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java @@ -46,6 +46,7 @@ public class AmqpSupport { // Symbols used to announce connection information to remote peer. public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY"); + public static final Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY"); public static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix"); public static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix"); public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed"); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/42ff4a60/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java index c881031..9ece790 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java @@ -16,6 +16,9 @@ */ package org.proton.plug.context; +import static org.proton.plug.AmqpSupport.PRODUCT; +import static org.proton.plug.AmqpSupport.VERSION; + import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -76,11 +79,13 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl ScheduledExecutorService scheduledPool) { this.connectionCallback = connectionCallback; this.containerId = (containerId != null) ? containerId : UUID.randomUUID().toString(); - connectionProperties.put(Symbol.valueOf("product"), "apache-activemq-artemis"); - connectionProperties.put(Symbol.valueOf("version"), VersionLoader.getVersion().getFullVersion()); + + connectionProperties.put(PRODUCT, "apache-activemq-artemis"); + connectionProperties.put(VERSION, VersionLoader.getVersion().getFullVersion()); + this.scheduledPool = scheduledPool; connectionCallback.setConnection(this); - this.handler = ProtonHandler.Factory.create(dispatchExecutor); + this.handler = ProtonHandler.Factory.create(dispatchExecutor); Transport transport = handler.getTransport(); transport.setEmitFlowEventOnSend(false); if (idleTimeout > 0) { @@ -211,6 +216,7 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl connection.setContext(AbstractConnectionContext.this); connection.setContainer(containerId); connection.setProperties(connectionProperties); + connection.setOfferedCapabilities(getConnectionCapabilitiesOffered()); connection.open(); } initialise(); @@ -326,9 +332,10 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl System.err.println("Handler is null, can't delivery " + delivery); } } - } - - + @Override + public Symbol[] getConnectionCapabilitiesOffered() { + return null; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/42ff4a60/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java index bdb3a69..4124c2f 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java @@ -16,6 +16,9 @@ */ package org.proton.plug.context.server; +import static org.proton.plug.AmqpSupport.DELAYED_DELIVERY; + +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transaction.Coordinator; import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.engine.Receiver; @@ -79,4 +82,7 @@ public class ProtonServerConnectionContext extends AbstractConnectionContext imp } } + public Symbol[] getConnectionCapabilitiesOffered() { + return new Symbol[]{DELAYED_DELIVERY}; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/42ff4a60/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java index 711f6ff..8da5aa2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java @@ -16,6 +16,11 @@ */ package org.apache.activemq.artemis.tests.integration.proton; +import static org.proton.plug.AmqpSupport.contains; +import static org.proton.plug.AmqpSupport.DELAYED_DELIVERY; +import static org.proton.plug.AmqpSupport.PRODUCT; +import static org.proton.plug.AmqpSupport.VERSION; + import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -63,6 +68,7 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.AmqpValue; @@ -210,6 +216,114 @@ public class ProtonTest extends ProtonTestBase { } } + @Test(timeout = 60000) + public void testConnectionCarriesExpectedCapabilities() throws Exception { + if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol + + AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); + assertNotNull(client); + + client.setValidator(new AmqpValidator() { + + @Override + public void inspectOpenedResource(org.apache.qpid.proton.engine.Connection connection) { + + Symbol[] offered = connection.getRemoteOfferedCapabilities(); + + if (!contains(offered, DELAYED_DELIVERY)) { + markAsInvalid("Broker did not indicate it support delayed message delivery"); + return; + } + + Map<Symbol, Object> properties = connection.getRemoteProperties(); + if (!properties.containsKey(PRODUCT)) { + markAsInvalid("Broker did not send a queue product name value"); + return; + } + + if (!properties.containsKey(VERSION)) { + markAsInvalid("Broker did not send a queue version value"); + return; + } + } + }); + + AmqpConnection connection = client.connect(); + try { + assertNotNull(connection); + connection.getStateInspector().assertValid(); + } + finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testSendWithDeliveryTimeHoldsMessage() throws Exception { + if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol + + AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); + assertNotNull(client); + + AmqpConnection connection = client.connect(); + try { + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(address); + AmqpReceiver receiver = session.createReceiver(address); + + AmqpMessage message = new AmqpMessage(); + long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5); + message.setMessageAnnotation("x-opt-delivery-time", deliveryTime); + message.setText("Test-Message"); + sender.send(message); + + // Now try and get the message + receiver.flow(1); + + // Shouldn't get this since we delayed the message. + assertNull(receiver.receive(5, TimeUnit.SECONDS)); + } + finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testSendWithDeliveryTimeDeliversMessageAfterDelay() throws Exception { + if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol + + AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); + assertNotNull(client); + + AmqpConnection connection = client.connect(); + try { + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(address); + AmqpReceiver receiver = session.createReceiver(address); + + AmqpMessage message = new AmqpMessage(); + long deliveryTime = System.currentTimeMillis() + 2000; + message.setMessageAnnotation("x-opt-delivery-time", deliveryTime); + message.setText("Test-Message"); + sender.send(message); + + // Now try and get the message + receiver.flow(1); + + AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(); + Long msgDeliveryTime = (Long) received.getMessageAnnotation("x-opt-delivery-time"); + assertNotNull(msgDeliveryTime); + assertEquals(deliveryTime, msgDeliveryTime.longValue()); + } + finally { + connection.close(); + } + } + @Test public void testCreditsAreAllocatedOnlyOnceOnLinkCreate() throws Exception { if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
