Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 461f842a6 -> 4b76f5408


QPID-6933: [System Tests] Refactor DUPS_OK_ACKNOWLEDGEDUPS_OK tests as JMS 1.1 
system test


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/4b76f540
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/4b76f540
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/4b76f540

Branch: refs/heads/master
Commit: 4b76f54084a7b3ff453a7279626ab96cecb998cf
Parents: 461f842
Author: Alex Rudyy <[email protected]>
Authored: Sat Jan 6 14:29:39 2018 +0000
Committer: Alex Rudyy <[email protected]>
Committed: Sat Jan 6 14:29:39 2018 +0000

----------------------------------------------------------------------
 .../systests/jms_1_1/consumer/DupsOkTest.java   | 122 +++++++++++++
 .../org/apache/qpid/test/client/DupsOkTest.java | 177 -------------------
 2 files changed, 122 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4b76f540/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/consumer/DupsOkTest.java
----------------------------------------------------------------------
diff --git 
a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/consumer/DupsOkTest.java
 
b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/consumer/DupsOkTest.java
new file mode 100644
index 0000000..9e95121
--- /dev/null
+++ 
b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/consumer/DupsOkTest.java
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systests.jms_1_1.consumer;
+
+import static org.apache.qpid.systests.Utils.INDEX;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.junit.Test;
+
+import org.apache.qpid.systests.JmsTestBase;
+import org.apache.qpid.systests.Utils;
+
+public class DupsOkTest extends JmsTestBase
+{
+
+    @Test
+    public void synchronousReceive() throws Exception
+    {
+        Queue queue = createQueue(getTestName());
+        Connection connection = getConnection();
+        final int numberOfMessages = 3;
+        try
+        {
+            connection.start();
+            Utils.sendMessages(connection, queue, numberOfMessages);
+
+            Session session = connection.createSession(false, 
Session.DUPS_OK_ACKNOWLEDGE);
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            for (int i = 0; i < numberOfMessages; i++)
+            {
+                Message received = consumer.receive(getReceiveTimeout());
+                assertNotNull(String.format("Expected message (%d) not 
received", i), received);
+                assertEquals("Unexpected message received", i, 
received.getIntProperty(INDEX));
+            }
+
+            assertNull("Received too many messages", 
consumer.receive(getReceiveTimeout()/4));
+
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+
+    @Test
+    public void asynchronousReceive() throws Exception
+    {
+        Queue queue = createQueue(getTestName());
+        Connection connection = getConnection();
+        final int numberOfMessages = 3;
+        try
+        {
+            connection.start();
+            Utils.sendMessages(connection, queue, numberOfMessages);
+
+            Session session = connection.createSession(false, 
Session.DUPS_OK_ACKNOWLEDGE);
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            AtomicReference<Throwable> exception = new AtomicReference<>();
+            CountDownLatch completionLatch = new 
CountDownLatch(numberOfMessages);
+            AtomicInteger expectedIndex = new AtomicInteger();
+
+            consumer.setMessageListener(message -> {
+                try
+                {
+                    Object index = message.getObjectProperty(INDEX);
+                    assertEquals("Unexpected message received", 
expectedIndex.getAndIncrement(), message.getIntProperty(INDEX));
+                }
+                catch (Throwable e)
+                {
+                    exception.set(e);
+                }
+                finally
+                {
+                    completionLatch.countDown();
+                }
+            });
+
+            boolean completed = completionLatch.await(getReceiveTimeout() * 
numberOfMessages, TimeUnit.MILLISECONDS);
+            assertTrue("Message listener did not receive all messages within 
expected", completed);
+            assertNull("Message listener encountered unexpected exception", 
exception.get());
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4b76f540/systests/src/test/java/org/apache/qpid/test/client/DupsOkTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/test/client/DupsOkTest.java 
b/systests/src/test/java/org/apache/qpid/test/client/DupsOkTest.java
deleted file mode 100644
index a8ef28e..0000000
--- a/systests/src/test/java/org/apache/qpid/test/client/DupsOkTest.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.client;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-
-public class DupsOkTest extends QpidBrokerTestCase
-{
-
-    private Queue _queue;
-    private static final int MSG_COUNT = 100;
-    private CountDownLatch _awaitCompletion = new CountDownLatch(1);
-
-    @Override
-    public void setUp() throws Exception
-    {
-        super.setUp();
-
-        //Declare the queue
-        Connection consumerConnection = getConnection();
-        Session session = 
consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        _queue = createTestQueue(session);
-        session.close();
-
-        //Create Producer put some messages on the queue
-        Connection producerConnection = getConnection();
-
-        producerConnection.start();
-
-        Session producerSession = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-
-        MessageProducer producer = producerSession.createProducer(_queue);
-
-        for (int count = 1; count <= MSG_COUNT; count++)
-        {
-            Message msg = producerSession.createTextMessage("Message " + 
count);
-            msg.setIntProperty("count", count);
-            producer.send(msg);
-        }
-
-        producerConnection.close();
-    }
-
-    /**
-     * This test sends x messages and receives them with an async consumer.
-     * Waits for all messages to be received or for 60 s
-     * and checks whether the queue is empty.
-     *
-     * @throws Exception
-     */
-    public void testDupsOK() throws Exception
-    {
-        //Create Client
-        Connection clientConnection = getConnection();
-
-        final Session clientSession = clientConnection.createSession(false, 
Session.DUPS_OK_ACKNOWLEDGE);
-
-        MessageConsumer consumer = clientSession.createConsumer(_queue);
-
-        if(!isBroker10())
-        {
-            assertEquals("The queue should have msgs at start",
-                         MSG_COUNT,
-                         ((AMQSession) 
clientSession).getQueueDepth((AMQDestination) _queue));
-        }
-
-        clientConnection.start();
-
-        consumer.setMessageListener(new MessageListener()
-        {
-            private int _msgCount = 0;
-
-            @Override
-            public void onMessage(Message message)
-            {
-                _msgCount++;
-                if (message == null)
-                {
-                    fail("Should not get null messages");
-                }
-
-                if (message instanceof TextMessage)
-                {
-                    try
-                    {
-                        if (message.getIntProperty("count") == MSG_COUNT)
-                        {
-                            try
-                            {
-                                if(_msgCount != MSG_COUNT)
-                                {
-                                    assertEquals("Wrong number of messages 
seen.", MSG_COUNT, _msgCount);
-                                }
-                            }
-                            finally
-                            {
-                                //This is the last message so release test.
-                                _awaitCompletion.countDown();
-                            }
-                        }
-                    }
-                    catch (JMSException e)
-                    {
-                        fail("Unable to get int property 'count'");
-                    }
-                }
-                else
-                {
-                    fail("Got wrong message type");
-                }
-            }
-        });
-
-        try
-        {
-            if (!_awaitCompletion.await(120, TimeUnit.SECONDS))
-            {
-                fail("Test did not complete in 120 seconds");
-            }
-        }
-        catch (InterruptedException e)
-        {
-            fail("Unable to wait for test completion");
-            throw e;
-        }
-
-        //Close consumer to give broker time to process in bound Acks. As The 
main thread will be released while
-        // before the dispatcher has sent the ack back to the broker.
-        consumer.close();
-
-        clientSession.close();
-
-        final Session clientSession2 = clientConnection.createSession(false, 
Session.DUPS_OK_ACKNOWLEDGE);
-        if(!isBroker10())
-        {
-            assertEquals("The queue should have 0 msgs left",
-                         0,
-                         ((AMQSession) 
clientSession2).getQueueDepth((AMQDestination) _queue));
-        }
-        clientConnection.close();
-    }
-
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to