Repository: activemq-artemis
Updated Branches:
  refs/heads/master 0a1e6bdd5 -> 9743043fb


ARTEMIS-792 Add additional tests for AMQP protocol

Adds several tests for AMQP expectations in various use cases.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7a8b7e9c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7a8b7e9c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7a8b7e9c

Branch: refs/heads/master
Commit: 7a8b7e9cfba8d4a8adfe96ee99d8a9df1273ac18
Parents: 0a1e6bd
Author: Timothy Bish <tabish...@gmail.com>
Authored: Tue Oct 11 12:16:28 2016 -0400
Committer: Timothy Bish <tabish...@gmail.com>
Committed: Thu Oct 13 16:07:50 2016 -0400

----------------------------------------------------------------------
 .../integration/amqp/AmqpClientTestSupport.java |  67 ++---
 .../amqp/AmqpDeliveryAnnotationsTest.java       |  64 +++++
 .../amqp/AmqpDescribedTypePayloadTest.java      | 151 ++++++++++
 .../integration/amqp/AmqpReceiverDrainTest.java | 165 +++++++++++
 .../amqp/AmqpScheduledMessageTest.java          | 136 +++++++++
 .../integration/amqp/AmqpSendReceiveTest.java   | 279 ++++++++++++++++++-
 6 files changed, 823 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a8b7e9c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
index 2c7ce6f..14f9b61 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
@@ -21,8 +21,12 @@ import java.net.URI;
 import java.util.LinkedList;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.server.JMSServerManager;
+import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
@@ -35,9 +39,11 @@ import org.junit.Before;
  */
 public class AmqpClientTestSupport extends ActiveMQTestBase {
 
-   ActiveMQServer server;
+   private boolean useSSL;
 
-   LinkedList<AmqpConnection> connections = new LinkedList<>();
+   protected JMSServerManager serverManager;
+   protected ActiveMQServer server;
+   protected LinkedList<AmqpConnection> connections = new LinkedList<>();
 
    protected AmqpConnection addConnection(AmqpConnection connection) {
       connections.add(connection);
@@ -48,9 +54,7 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
    @Override
    public void setUp() throws Exception {
       super.setUp();
-
-      server = createServer(true, true);
-      server.start();
+      server = createServer();
    }
 
    @After
@@ -63,18 +67,36 @@ public class AmqpClientTestSupport extends ActiveMQTestBase 
{
             ignored.printStackTrace();
          }
       }
+
+      if (serverManager != null) {
+         try {
+            serverManager.stop();
+         } catch (Throwable ignored) {
+            ignored.printStackTrace();
+         }
+         serverManager = null;
+      }
+
       server.stop();
 
       super.tearDown();
    }
 
+   protected ActiveMQServer createServer() throws Exception {
+      ActiveMQServer server = createServer(true, true);
+      serverManager = new JMSServerManagerImpl(server);
+      Configuration serverConfig = server.getConfiguration();
+      serverConfig.getAddressesSettings().put("jms.queue.#", new 
AddressSettings().setAutoCreateJmsQueues(true).setDeadLetterAddress(new 
SimpleString("jms.queue.ActiveMQ.DLQ")));
+      serverConfig.setSecurityEnabled(false);
+      serverManager.start();
+      server.start();
+      return server;
+   }
+
    public Queue getProxyToQueue(String queueName) {
       return server.locateQueue(SimpleString.toSimpleString(queueName));
    }
 
-   private String connectorScheme = "amqp";
-   private boolean useSSL;
-
    public String getTestName() {
       return "jms.queue." + getName();
    }
@@ -83,14 +105,9 @@ public class AmqpClientTestSupport extends ActiveMQTestBase 
{
    }
 
    public AmqpClientTestSupport(String connectorScheme, boolean useSSL) {
-      this.connectorScheme = connectorScheme;
       this.useSSL = useSSL;
    }
 
-   public String getConnectorScheme() {
-      return connectorScheme;
-   }
-
    public boolean isUseSSL() {
       return useSSL;
    }
@@ -99,30 +116,6 @@ public class AmqpClientTestSupport extends ActiveMQTestBase 
{
       return "";
    }
 
-   protected boolean isUseTcpConnector() {
-      return !isUseSSL() && !connectorScheme.contains("nio") && 
!connectorScheme.contains("ws");
-   }
-
-   protected boolean isUseSslConnector() {
-      return isUseSSL() && !connectorScheme.contains("nio") && 
!connectorScheme.contains("wss");
-   }
-
-   protected boolean isUseNioConnector() {
-      return !isUseSSL() && connectorScheme.contains("nio");
-   }
-
-   protected boolean isUseNioPlusSslConnector() {
-      return isUseSSL() && connectorScheme.contains("nio");
-   }
-
-   protected boolean isUseWsConnector() {
-      return !isUseSSL() && connectorScheme.contains("ws");
-   }
-
-   protected boolean isUseWssConnector() {
-      return isUseSSL() && connectorScheme.contains("wss");
-   }
-
    public URI getBrokerAmqpConnectionURI() {
       boolean webSocket = false;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a8b7e9c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDeliveryAnnotationsTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDeliveryAnnotationsTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDeliveryAnnotationsTest.java
new file mode 100644
index 0000000..93ff22b
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDeliveryAnnotationsTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+
+/**
+ * Test around the handling of Deliver Annotations in messages sent and 
received.
+ */
+public class AmqpDeliveryAnnotationsTest extends AmqpClientTestSupport {
+
+   private final String DELIVERY_ANNOTATION_NAME = "TEST-DELIVERY-ANNOTATION";
+
+   @Test(timeout = 60000)
+   public void testDeliveryAnnotationsStrippedFromIncoming() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(getTestName());
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+
+      AmqpMessage message = new AmqpMessage();
+
+      message.setText("Test-Message");
+      message.setDeliveryAnnotation(DELIVERY_ANNOTATION_NAME, getTestName());
+
+      sender.send(message);
+      receiver.flow(1);
+
+      Queue queue = getProxyToQueue(getTestName());
+      assertEquals(1, queue.getMessageCount());
+
+      AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(received);
+      assertNull(received.getDeliveryAnnotation(DELIVERY_ANNOTATION_NAME));
+
+      sender.close();
+      connection.close();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a8b7e9c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java
new file mode 100644
index 0000000..bbb9c26
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpNoLocalFilter;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+
+/**
+ * Test that the broker can pass through an AMQP message with a described type 
in the message
+ * body regardless of transformer in use.
+ */
+public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 60000)
+   public void testSendMessageWithDescribedTypeInBody() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(getTestName());
+      AmqpMessage message = new AmqpMessage();
+      message.setDescribedType(new AmqpNoLocalFilter());
+      sender.send(message);
+      sender.close();
+
+      Queue queue = getProxyToQueue(getTestName());
+      assertEquals(1, queue.getMessageCount());
+
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+      receiver.flow(1);
+      AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(received);
+      assertNotNull(received.getDescribedType());
+      receiver.close();
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testSendMessageWithDescribedTypeInBodyReceiveOverOpenWire() 
throws Exception {
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(getTestName());
+      AmqpMessage message = new AmqpMessage();
+      message.setDescribedType(new AmqpNoLocalFilter());
+      sender.send(message);
+      sender.close();
+      connection.close();
+
+      Queue queue = getProxyToQueue(getTestName());
+      assertEquals(1, queue.getMessageCount());
+
+      ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory("tcp://localhost:61616");
+      Connection jmsConnection = factory.createConnection();
+      try {
+         Session jmsSession = jmsConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Destination destination = jmsSession.createQueue(getName());
+         MessageConsumer jmsConsumer = jmsSession.createConsumer(destination);
+         jmsConnection.start();
+
+         Message received = jmsConsumer.receive(5000);
+         assertNotNull(received);
+         assertTrue(received instanceof BytesMessage);
+      } finally {
+         jmsConnection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testDescribedTypeMessageRoundTrips() throws Exception {
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      // Send with AMQP client.
+      AmqpSender sender = session.createSender(getTestName());
+      AmqpMessage message = new AmqpMessage();
+      message.setDescribedType(new AmqpNoLocalFilter());
+      sender.send(message);
+      sender.close();
+
+      Queue queue = getProxyToQueue(getTestName());
+      assertEquals(1, queue.getMessageCount());
+
+      // Receive and resend with OpenWire JMS client
+      ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory("tcp://localhost:61616");
+      Connection jmsConnection = factory.createConnection();
+      try {
+         Session jmsSession = jmsConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Destination destination = jmsSession.createQueue(getName());
+         MessageConsumer jmsConsumer = jmsSession.createConsumer(destination);
+         jmsConnection.start();
+
+         Message received = jmsConsumer.receive(5000);
+         assertNotNull(received);
+         assertTrue(received instanceof BytesMessage);
+
+         MessageProducer jmsProducer = jmsSession.createProducer(destination);
+         jmsProducer.send(received);
+      } finally {
+         jmsConnection.close();
+      }
+
+      assertEquals(1, queue.getMessageCount());
+
+      // Now lets receive it with AMQP and see that we get back what we 
expected.
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+      receiver.flow(1);
+      AmqpMessage returned = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(returned);
+      assertNotNull(returned.getDescribedType());
+      receiver.close();
+      connection.close();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a8b7e9c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java
new file mode 100644
index 0000000..1af9028
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+
+/**
+ * Tests various behaviors of broker side drain support.
+ */
+public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 60000)
+   public void testReceiverCanDrainMessages() throws Exception {
+      int MSG_COUNT = 20;
+      sendMessages(getTestName(), MSG_COUNT);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = client.connect();
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+
+      Queue queueView = getProxyToQueue(getTestName());
+      assertEquals(MSG_COUNT, queueView.getMessageCount());
+
+      receiver.drain(MSG_COUNT);
+      for (int i = 0; i < MSG_COUNT; ++i) {
+         AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+         assertNotNull(message);
+         message.accept();
+      }
+      receiver.close();
+
+      assertEquals(0, queueView.getMessageCount());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testPullWithNoMessageGetDrained() throws Exception {
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = client.connect();
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+
+      receiver.flow(10);
+
+      Queue queueView = getProxyToQueue(getTestName());
+      assertEquals(0, queueView.getMessageCount());
+      assertEquals(0, queueView.getDeliveringCount());
+
+      assertEquals(10, receiver.getReceiver().getRemoteCredit());
+
+      assertNull(receiver.pull(1, TimeUnit.SECONDS));
+
+      assertEquals(0, receiver.getReceiver().getRemoteCredit());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testPullOneFromRemote() throws Exception {
+      int MSG_COUNT = 20;
+      sendMessages(getTestName(), MSG_COUNT);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = client.connect();
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+
+      Queue queueView = getProxyToQueue(getTestName());
+      assertEquals(MSG_COUNT, queueView.getMessageCount());
+
+      assertEquals(0, receiver.getReceiver().getRemoteCredit());
+
+      AmqpMessage message = receiver.pull(5, TimeUnit.SECONDS);
+      assertNotNull(message);
+      message.accept();
+
+      assertEquals(0, receiver.getReceiver().getRemoteCredit());
+
+      receiver.close();
+
+      assertEquals(MSG_COUNT - 1, queueView.getMessageCount());
+      assertEquals(1, queueView.getMessagesAcknowledged());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testMultipleZeroResultPulls() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = client.connect();
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+
+      receiver.flow(10);
+
+      Queue queueView = getProxyToQueue(getTestName());
+      assertEquals(0, queueView.getMessageCount());
+
+      assertEquals(10, receiver.getReceiver().getRemoteCredit());
+
+      assertNull(receiver.pull(1, TimeUnit.SECONDS));
+
+      assertEquals(0, receiver.getReceiver().getRemoteCredit());
+
+      assertNull(receiver.pull(1, TimeUnit.SECONDS));
+      assertNull(receiver.pull(1, TimeUnit.SECONDS));
+
+      assertEquals(0, receiver.getReceiver().getRemoteCredit());
+
+      connection.close();
+   }
+
+   public void sendMessages(String destinationName, int count) throws 
Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = null;
+
+      try {
+         connection = client.connect();
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(destinationName);
+
+         for (int i = 0; i < count; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message-" + i);
+            sender.send(message);
+         }
+
+         sender.close();
+      } finally {
+         if (connection != null) {
+            connection.close();
+         }
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a8b7e9c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java
new file mode 100644
index 0000000..689c23c
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+
+/**
+ * Test for scheduled message support using AMQP message annotations.
+ */
+public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 60000)
+   public void testSendWithDeliveryTimeIsScheduled() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(getTestName());
+
+      // Get the Queue View early to avoid racing the delivery.
+      final Queue queueView = getProxyToQueue(getTestName());
+      assertNotNull(queueView);
+
+      AmqpMessage message = new AmqpMessage();
+      long deliveryTime = System.currentTimeMillis() + 
TimeUnit.MINUTES.toMillis(2);
+      message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
+      message.setText("Test-Message");
+      sender.send(message);
+      sender.close();
+
+      assertEquals(1, queueView.getScheduledCount());
+
+      // Now try and get the message
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+      receiver.flow(1);
+      AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+      assertNull(received);
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testSendRecvWithDeliveryTime() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(getTestName());
+
+      // Get the Queue View early to avoid racing the delivery.
+      final Queue queueView = getProxyToQueue(getTestName());
+      assertNotNull(queueView);
+
+      AmqpMessage message = new AmqpMessage();
+      long deliveryTime = System.currentTimeMillis() + 6000;
+      message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
+      message.setText("Test-Message");
+      sender.send(message);
+      sender.close();
+
+      assertEquals(1, queueView.getScheduledCount());
+
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+      receiver.flow(1);
+
+      // Now try and get the message, should not due to being scheduled.
+      AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS);
+      assertNull(received);
+
+      // Now try and get the message, should get it now
+      received = receiver.receive(10, TimeUnit.SECONDS);
+      assertNotNull(received);
+      received.accept();
+
+      connection.close();
+   }
+
+   @Test
+   public void testScheduleWithDelay() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(getTestName());
+
+      // Get the Queue View early to avoid racing the delivery.
+      final Queue queueView = getProxyToQueue(getTestName());
+      assertNotNull(queueView);
+
+      AmqpMessage message = new AmqpMessage();
+      long delay = 6000;
+      message.setMessageAnnotation("x-opt-delivery-delay", delay);
+      message.setText("Test-Message");
+      sender.send(message);
+      sender.close();
+
+      assertEquals(1, queueView.getScheduledCount());
+
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+      receiver.flow(1);
+
+      // Now try and get the message, should not due to being scheduled.
+      AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS);
+      assertNull(received);
+
+      // Now try and get the message, should get it now
+      received = receiver.receive(10, TimeUnit.SECONDS);
+      assertNotNull(received);
+      received.accept();
+
+      connection.close();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a8b7e9c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
index 6597a62..6c50b86 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
@@ -16,22 +16,31 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
+import static 
org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
+import static 
org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
+import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
+
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.activemq.transport.amqp.client.AmqpValidator;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.engine.Receiver;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +53,255 @@ public class AmqpSendReceiveTest extends 
AmqpClientTestSupport {
    protected static final Logger LOG = 
LoggerFactory.getLogger(AmqpSendReceiveTest.class);
 
    @Test(timeout = 60000)
+   public void testCreateQueueReceiver() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+
+      Queue queue = getProxyToQueue(getTestName());
+      assertNotNull(queue);
+
+      receiver.close();
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testCreateQueueReceiverWithJMSSelector() throws Exception {
+      AmqpClient client = createAmqpClient();
+
+      client.setValidator(new AmqpValidator() {
+
+         @SuppressWarnings("unchecked")
+         @Override
+         public void inspectOpenedResource(Receiver receiver) {
+
+            if (receiver.getRemoteSource() == null) {
+               markAsInvalid("Link opened with null source.");
+            }
+
+            Source source = (Source) receiver.getRemoteSource();
+            Map<Symbol, Object> filters = source.getFilter();
+
+            if (findFilter(filters, JMS_SELECTOR_FILTER_IDS) == null) {
+               markAsInvalid("Broker did not return the JMS Filter on Attach");
+            }
+         }
+      });
+
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      session.createReceiver(getTestName(), "JMSPriority > 8");
+
+      connection.getStateInspector().assertValid();
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testCreateQueueReceiverWithNoLocalSet() throws Exception {
+      AmqpClient client = createAmqpClient();
+
+      client.setValidator(new AmqpValidator() {
+
+         @SuppressWarnings("unchecked")
+         @Override
+         public void inspectOpenedResource(Receiver receiver) {
+
+            if (receiver.getRemoteSource() == null) {
+               markAsInvalid("Link opened with null source.");
+            }
+
+            Source source = (Source) receiver.getRemoteSource();
+            Map<Symbol, Object> filters = source.getFilter();
+
+            // Currently don't support noLocal on a Queue
+            if (findFilter(filters, NO_LOCAL_FILTER_IDS) != null) {
+               markAsInvalid("Broker did not return the NoLocal Filter on 
Attach");
+            }
+         }
+      });
+
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      session.createReceiver(getTestName(), null, true);
+
+      connection.getStateInspector().assertValid();
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testQueueReceiverReadMessage() throws Exception {
+      sendMessages(getTestName(), 1);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+
+      Queue queueView = getProxyToQueue(getTestName());
+      assertEquals(1, queueView.getMessageCount());
+
+      receiver.flow(1);
+      assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
+      receiver.close();
+
+      assertEquals(1, queueView.getMessageCount());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void 
testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws 
Exception {
+      int MSG_COUNT = 4;
+      sendMessages(getTestName(), MSG_COUNT);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver1 = session.createReceiver(getTestName());
+
+      Queue queueView = getProxyToQueue(getTestName());
+      assertEquals(MSG_COUNT, queueView.getMessageCount());
+
+      receiver1.flow(2);
+      assertNotNull(receiver1.receive(5, TimeUnit.SECONDS));
+      assertNotNull(receiver1.receive(5, TimeUnit.SECONDS));
+
+      AmqpReceiver receiver2 = session.createReceiver(getTestName());
+
+      assertEquals(2, server.getTotalConsumerCount());
+
+      receiver2.flow(2);
+      assertNotNull(receiver2.receive(5, TimeUnit.SECONDS));
+      assertNotNull(receiver2.receive(5, TimeUnit.SECONDS));
+
+      assertEquals(0, queueView.getMessagesAcknowledged());
+
+      receiver1.close();
+      receiver2.close();
+
+      assertEquals(MSG_COUNT, queueView.getMessageCount());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testTwoQueueReceiversOnSameConnectionReadMessagesAcceptOnEach() 
throws Exception {
+      int MSG_COUNT = 4;
+      sendMessages(getTestName(), MSG_COUNT);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver1 = session.createReceiver(getTestName());
+
+      final Queue queueView = getProxyToQueue(getTestName());
+      assertEquals(MSG_COUNT, queueView.getMessageCount());
+
+      receiver1.flow(2);
+      AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS);
+      assertNotNull(message);
+      message.accept();
+      message = receiver1.receive(5, TimeUnit.SECONDS);
+      assertNotNull(message);
+      message.accept();
+
+      assertTrue("Should have ack'd two", Wait.waitFor(new Wait.Condition() {
+
+         @Override
+         public boolean isSatisfied() throws Exception {
+            return queueView.getMessagesAcknowledged() == 2;
+         }
+      }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50)));
+
+      AmqpReceiver receiver2 = session.createReceiver(getTestName());
+
+      assertEquals(2, server.getTotalConsumerCount());
+
+      receiver2.flow(2);
+      message = receiver2.receive(5, TimeUnit.SECONDS);
+      assertNotNull(message);
+      message.accept();
+      message = receiver2.receive(5, TimeUnit.SECONDS);
+      assertNotNull(message);
+      message.accept();
+
+      assertTrue("Queue should be empty now", Wait.waitFor(new 
Wait.Condition() {
+
+         @Override
+         public boolean isSatisfied() throws Exception {
+            return queueView.getMessagesAcknowledged() == 4;
+         }
+      }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(10)));
+
+      receiver1.close();
+      receiver2.close();
+
+      assertEquals(0, queueView.getMessageCount());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testSecondReceiverOnQueueGetsAllUnconsumedMessages() throws 
Exception {
+      int MSG_COUNT = 20;
+      sendMessages(getTestName(), MSG_COUNT);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver1 = session.createReceiver(getTestName());
+
+      final Queue queueView = getProxyToQueue(getTestName());
+      assertEquals(MSG_COUNT, queueView.getMessageCount());
+
+      receiver1.flow(20);
+
+      assertTrue("Should have dispatch to prefetch", Wait.waitFor(new 
Wait.Condition() {
+
+         @Override
+         public boolean isSatisfied() throws Exception {
+            return queueView.getDeliveringCount() >= 2;
+         }
+      }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50)));
+
+      receiver1.close();
+
+      AmqpReceiver receiver2 = session.createReceiver(getTestName());
+
+      assertEquals(1, server.getTotalConsumerCount());
+
+      receiver2.flow(MSG_COUNT * 2);
+      AmqpMessage message = receiver2.receive(5, TimeUnit.SECONDS);
+      assertNotNull(message);
+      message.accept();
+      message = receiver2.receive(5, TimeUnit.SECONDS);
+      assertNotNull(message);
+      message.accept();
+
+      assertTrue("Should have ack'd two", Wait.waitFor(new Wait.Condition() {
+
+         @Override
+         public boolean isSatisfied() throws Exception {
+            return queueView.getMessagesAcknowledged() == 2;
+         }
+      }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50)));
+
+      receiver2.close();
+
+      assertEquals(MSG_COUNT - 2, queueView.getMessageCount());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
    public void testSimpleSendOneReceiveOne() throws Exception {
 
       AmqpClient client = createAmqpClient();
@@ -476,7 +734,7 @@ public class AmqpSendReceiveTest extends 
AmqpClientTestSupport {
       assertTrue("Should be no inflight messages: " + 
destinationView.getDeliveringCount(), Wait.waitFor(new Wait.Condition() {
 
          @Override
-         public boolean isSatisified() throws Exception {
+         public boolean isSatisfied() throws Exception {
             return destinationView.getDeliveringCount() == 0;
          }
       }));
@@ -554,4 +812,21 @@ public class AmqpSendReceiveTest extends 
AmqpClientTestSupport {
 
       connection.close();
    }
+
+   public void sendMessages(String destinationName, int count) throws 
Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(destinationName);
+
+         for (int i = 0; i < count; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setMessageId("MessageID:" + i);
+            sender.send(message);
+         }
+      } finally {
+         connection.close();
+      }
+   }
 }

Reply via email to