Repository: activemq-artemis Updated Branches: refs/heads/master 0230a4026 -> 0e0dec664
ARTEMIS-46 Adds AMQP Drain Support Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/54752a9c Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/54752a9c Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/54752a9c Branch: refs/heads/master Commit: 54752a9cedcceef9715345730eae4766be2c8458 Parents: 0230a40 Author: Martyn Taylor <[email protected]> Authored: Tue May 10 14:55:20 2016 +0100 Committer: Martyn Taylor <[email protected]> Committed: Wed May 11 12:04:58 2016 +0100 ---------------------------------------------------------------------- .../plug/ProtonSessionIntegrationCallback.java | 29 +++++++- artemis-protocols/artemis-proton-plug/pom.xml | 17 +++++ .../proton/plug/AMQPClientReceiverContext.java | 6 ++ .../org/proton/plug/AMQPSessionCallback.java | 2 +- .../plug/context/AbstractConnectionContext.java | 3 +- .../context/AbstractProtonContextSender.java | 2 +- .../context/AbstractProtonReceiverContext.java | 16 +++++ .../plug/context/ProtonDeliveryHandler.java | 2 +- .../plug/context/ProtonTransactionHandler.java | 2 +- .../client/ProtonClientReceiverContext.java | 3 +- .../server/ProtonServerReceiverContext.java | 2 +- .../server/ProtonServerSenderContext.java | 6 +- .../java/org/proton/plug/test/ProtonTest.java | 12 ++-- .../test/minimalserver/MinimalSessionSPI.java | 2 +- .../core/server/impl/ServerConsumerImpl.java | 26 ++++--- tests/integration-tests/pom.xml | 6 ++ .../tests/integration/proton/ProtonTest.java | 75 ++++++++++++++++++-- 17 files changed, 176 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index 421a382..2350f9d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -17,11 +17,13 @@ package org.apache.activemq.artemis.core.protocol.proton.plug; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.protocol.proton.converter.message.EncodedMessage; import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; @@ -70,6 +72,8 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se private final Executor closeExecutor; + private final AtomicBoolean draining = new AtomicBoolean(false); + public ProtonSessionIntegrationCallback(ActiveMQProtonConnectionCallback protonSPI, ProtonProtocolManager manager, AMQPConnectionContext connection, @@ -88,9 +92,28 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se } @Override - public void onFlowConsumer(Object consumer, int credits) { - // We have our own flow control on AMQP, so we set activemq's flow control to 0 - ((ServerConsumer) consumer).receiveCredits(-1); + public void onFlowConsumer(Object consumer, int credits, final boolean drain) { + ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer; + if (drain) { + // If the draining is already running, then don't do anything + if (draining.compareAndSet(false, true)) { + final ProtonPlugSender plugSender = (ProtonPlugSender) serverConsumer.getProtocolContext(); + serverConsumer.forceDelivery(1, new Runnable() { + @Override + public void run() { + try { + plugSender.getSender().drained(); + } + finally { + draining.set(false); + } + } + }); + } + } + else { + serverConsumer.receiveCredits(-1); + } } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-proton-plug/pom.xml ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-proton-plug/pom.xml b/artemis-protocols/artemis-proton-plug/pom.xml index b23e08f..c667e45 100644 --- a/artemis-protocols/artemis-proton-plug/pom.xml +++ b/artemis-protocols/artemis-proton-plug/pom.xml @@ -110,6 +110,23 @@ </dependencies> + <!-- We use the proton plug test classes in some of the Artemis Integration tests --> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.6</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> <packaging>bundle</packaging> </project> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientReceiverContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientReceiverContext.java index 26d539e..514ee19 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientReceiverContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientReceiverContext.java @@ -25,4 +25,10 @@ public interface AMQPClientReceiverContext { ProtonJMessage receiveMessage(int time, TimeUnit unit) throws Exception; void flow(int credits); + + void drain(int i); + + int drained(); + + boolean isDraining(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java index cce8e0c..0c0dbe0 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java @@ -32,7 +32,7 @@ public interface AMQPSessionCallback { void start(); - void onFlowConsumer(Object consumer, int credits); + void onFlowConsumer(Object consumer, int credits, boolean drain); Object createSender(ProtonPlugSender protonSender, String queue, String filer, boolean browserOnly) throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java index 93442de..262dc2a 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java @@ -70,6 +70,7 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl connectionCallback.setConnection(this); this.handler = ProtonHandler.Factory.create(dispatchExecutor); Transport transport = handler.getTransport(); + transport.setEmitFlowEventOnSend(false); if (idleTimeout > 0) { transport.setIdleTimeout(idleTimeout); } @@ -256,7 +257,7 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl @Override public void onFlow(Link link) throws Exception { - ((ProtonDeliveryHandler) link.getContext()).onFlow(link.getCredit()); + ((ProtonDeliveryHandler) link.getContext()).onFlow(link.getCredit(), link.getDrain()); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java index 7a4d295..6b209b8 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java @@ -51,7 +51,7 @@ public abstract class AbstractProtonContextSender extends ProtonInitializable im } @Override - public void onFlow(int credits) { + public void onFlow(int credits, boolean drain) { this.creditsSemaphore.setCredits(credits); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java index 8481853..4286140 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java @@ -69,4 +69,20 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable } connection.flush(); } + + + public void drain(int credits) { + synchronized (connection.getLock()) { + receiver.drain(credits); + } + connection.flush(); + } + + public int drained() { + return receiver.drained(); + } + + public boolean isDraining() { + return receiver.draining(); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java index 128ea65..ad7ff4f 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java @@ -25,7 +25,7 @@ import org.proton.plug.exceptions.ActiveMQAMQPException; */ public interface ProtonDeliveryHandler { - void onFlow(int currentCredits); + void onFlow(int currentCredits, boolean drain); void onMessage(Delivery delivery) throws ActiveMQAMQPException; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java index 6a9ad6a..1b32b32 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java @@ -111,7 +111,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { } @Override - public void onFlow(int credits) { + public void onFlow(int credits, boolean drain) { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java index ca8dc98..884af60 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java @@ -46,7 +46,7 @@ public class ProtonClientReceiverContext extends AbstractProtonReceiverContext i } @Override - public void onFlow(int credits) { + public void onFlow(int credits, boolean drain) { } LinkedBlockingDeque<MessageImpl> queues = new LinkedBlockingDeque<>(); @@ -83,4 +83,5 @@ public class ProtonClientReceiverContext extends AbstractProtonReceiverContext i public ProtonJMessage receiveMessage(int time, TimeUnit unit) throws Exception { return queues.poll(time, unit); } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java index 0406919..d0f798a 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java @@ -46,7 +46,7 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext { } @Override - public void onFlow(int credits) { + public void onFlow(int credits, boolean drain) { } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java index dfc69df..ae1caa4 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java @@ -65,9 +65,9 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple } @Override - public void onFlow(int currentCredits) { - super.onFlow(currentCredits); - sessionSPI.onFlowConsumer(brokerConsumer, currentCredits); + public void onFlow(int currentCredits, boolean drain) { + super.onFlow(currentCredits, drain); + sessionSPI.onFlowConsumer(brokerConsumer, currentCredits, drain); } /* http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/ProtonTest.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/ProtonTest.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/ProtonTest.java index acbb697..4c3aaf4 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/ProtonTest.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/ProtonTest.java @@ -36,19 +36,19 @@ import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.impl.MessageImpl; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.proton.plug.AMQPClientConnectionContext; import org.proton.plug.AMQPClientSenderContext; import org.proton.plug.AMQPClientSessionContext; import org.proton.plug.sasl.ClientSASLPlain; import org.proton.plug.test.minimalclient.SimpleAMQPConnector; import org.proton.plug.test.minimalserver.DumbServer; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import org.proton.plug.util.ByteUtil; /** http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java index c702957..3578926 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java @@ -71,7 +71,7 @@ public class MinimalSessionSPI implements AMQPSessionCallback { } @Override - public void onFlowConsumer(Object consumer, int credits) { + public void onFlowConsumer(Object consumer, int credits, boolean drain) { } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 859b57d..0224c7d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -510,7 +510,21 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { * there are no other messages to be delivered. */ @Override - public synchronized void forceDelivery(final long sequence) { + public void forceDelivery(final long sequence) { + forceDelivery(sequence, new Runnable() { + @Override + public void run() { + ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateID(), 50); + + forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence); + forcedDeliveryMessage.setAddress(messageQueue.getName()); + + callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0); + } + }); + } + + public synchronized void forceDelivery(final long sequence, final Runnable r) { promptDelivery(); // JBPAPP-6030 - Using the executor to avoid distributed dead locks @@ -527,17 +541,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { messageQueue.getExecutor().execute(new Runnable() { @Override public void run() { - forceDelivery(sequence); + forceDelivery(sequence, r); } }); } else { - ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateID(), 50); - - forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence); - forcedDeliveryMessage.setAddress(messageQueue.getName()); - - callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0); + r.run(); } } } @@ -546,7 +555,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } } }); - } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/tests/integration-tests/pom.xml ---------------------------------------------------------------------- diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml index f0e1d14..38ac4c2 100644 --- a/tests/integration-tests/pom.xml +++ b/tests/integration-tests/pom.xml @@ -124,6 +124,12 @@ </dependency> <dependency> <groupId>org.apache.activemq</groupId> + <artifactId>artemis-proton-plug</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> <artifactId>artemis-stomp-protocol</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java index efd5a85..9534681 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java @@ -40,6 +40,7 @@ import java.util.Collection; import java.util.Enumeration; import java.util.HashMap; import java.util.Random; +import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; @@ -50,12 +51,18 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.proton.message.ProtonJMessage; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.proton.plug.AMQPClientConnectionContext; +import org.proton.plug.AMQPClientReceiverContext; +import org.proton.plug.AMQPClientSessionContext; +import org.proton.plug.test.Constants; +import org.proton.plug.test.minimalclient.SimpleAMQPConnector; @RunWith(Parameterized.class) public class ProtonTest extends ActiveMQTestBase { @@ -214,10 +221,8 @@ public class ProtonTest extends ActiveMQTestBase { /* // Uncomment testLoopBrowser to validate the hunging on the test @Test - public void testLoopBrowser() throws Throwable - { - for (int i = 0 ; i < 1000; i++) - { + public void testLoopBrowser() throws Throwable { + for (int i = 0 ; i < 1000; i++) { System.out.println("#test " + i); testBrowser(); tearDown(); @@ -230,7 +235,7 @@ public class ProtonTest extends ActiveMQTestBase { * * @throws Throwable */ - // @Test TODO: re-enable this when we can get a version free of QPID-4901 bug + //@Test // TODO: re-enable this when we can get a version free of QPID-4901 bug public void testBrowser() throws Throwable { boolean success = false; @@ -272,7 +277,7 @@ public class ProtonTest extends ActiveMQTestBase { connection.close(); Assert.assertEquals(getMessageCount(q), numMessages); } - }, 1000); + }, 5000); if (success) { break; @@ -290,6 +295,64 @@ public class ProtonTest extends ActiveMQTestBase { } @Test + public void testReceiveImmediate() throws Exception { + testReceiveImmediate(1000, 1000); + } + + @Test + public void testReceiveImmediateMoreCredits() throws Exception { + testReceiveImmediate(1000, 100); + } + + @Test + public void testReceiveImmediateMoreMessages() throws Exception { + testReceiveImmediate(100, 1000); + } + + public void testReceiveImmediate(int noCredits, int noMessages) throws Exception { + + if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = createQueue(address); + MessageProducer p = session.createProducer(queue); + + TextMessage message = session.createTextMessage(); + message.setText("Message temporary"); + for (int i = 0; i < noMessages; i++) { + message.setText("msg:" + i); + p.send(message); + } + + SimpleAMQPConnector connector = new SimpleAMQPConnector(); + connector.start(); + AMQPClientConnectionContext clientConnection = connector.connect("127.0.0.1", Constants.PORT); + + clientConnection.clientOpen(null); + + AMQPClientSessionContext csession = clientConnection.createClientSession(); + AMQPClientReceiverContext receiver = csession.createReceiver(address); + receiver.drain(noCredits); + + int expectedNumberMessages = noCredits > noMessages ? noMessages : noCredits; + for (int i = 0; i < expectedNumberMessages; i++) { + ProtonJMessage protonJMessage = receiver.receiveMessage(500, TimeUnit.SECONDS); + Assert.assertNotNull(protonJMessage); + } + ProtonJMessage protonJMessage = receiver.receiveMessage(500, TimeUnit.MILLISECONDS); + Assert.assertNull(protonJMessage); + + assertFalse(receiver.isDraining()); + if (noCredits >= noMessages) { + assertEquals(noCredits - noMessages, receiver.drained()); + } + else { + assertEquals(0, receiver.drained()); + } + } + + + @Test public void testConnection() throws Exception { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
