Repository: qpid-jms
Updated Branches:
  refs/heads/master 3ffb4e5ef -> 2d5f0632d


Adds a small test case that sends a set of larger sized messages
assigned to group id and group seq.  Currently the second receive fails.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/2d5f0632
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/2d5f0632
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/2d5f0632

Branch: refs/heads/master
Commit: 2d5f0632d25f3117eb998b8027156327fc7967be
Parents: 3ffb4e5
Author: Timothy Bish <[email protected]>
Authored: Tue Jan 20 10:54:13 2015 -0500
Committer: Timothy Bish <[email protected]>
Committed: Tue Jan 20 10:54:13 2015 -0500

----------------------------------------------------------------------
 .../usecases/JmsLargeMessagesInGroupsTest.java  | 96 ++++++++++++++++++++
 1 file changed, 96 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2d5f0632/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/usecases/JmsLargeMessagesInGroupsTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/usecases/JmsLargeMessagesInGroupsTest.java
 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/usecases/JmsLargeMessagesInGroupsTest.java
new file mode 100644
index 0000000..e8d8fa5
--- /dev/null
+++ 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/usecases/JmsLargeMessagesInGroupsTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.usecases;
+
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Ignore("Currently fails")
+public class JmsLargeMessagesInGroupsTest extends AmqpTestSupport {
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(JmsLargeMessagesInGroupsTest.class);
+
+    private static final int MESSAGE_COUNT = 10;
+    private static final int MESSAGE_SIZE = 100 * 1024;
+    private static final int RECEIVE_TIMEOUT = 5000;
+    private static final String JMSX_GROUP_ID = "JmsGroupsTest";
+
+    @Test
+    public void testGroupSeqIsNeverLost() throws Exception {
+        connection = createAmqpConnection();
+
+        sendMessagesToBroker(MESSAGE_COUNT);
+        readMessagesOnBroker(MESSAGE_COUNT);
+        sendMessagesToBroker(MESSAGE_COUNT);
+        readMessagesOnBroker(MESSAGE_COUNT);
+    }
+
+    protected void readMessagesOnBroker(int count) throws Exception {
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        for (int i = 0; i < MESSAGE_COUNT; ++i) {
+            Message message = consumer.receive(RECEIVE_TIMEOUT);
+            assertNotNull(message);
+            LOG.info("Read message #{}: type = {}", i, 
message.getClass().getSimpleName());
+            String gid = message.getStringProperty("JMSXGroupID");
+            String seq = message.getStringProperty("JMSXGroupSeq");
+            LOG.info("Message assigned JMSXGroupID := {}", gid);
+            LOG.info("Message assigned JMSXGroupSeq := {}", seq);
+        }
+    }
+
+    protected void sendMessagesToBroker(int count) throws Exception {
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+        MessageProducer producer = session.createProducer(queue);
+
+        byte[] buffer = new byte[MESSAGE_SIZE];
+        for (count = 0; count < MESSAGE_SIZE; count++) {
+            String s = String.valueOf(count % 10);
+            Character c = s.charAt(0);
+            int value = c.charValue();
+            buffer[count] = (byte) value;
+        }
+
+        LOG.debug("Sending {} messages to destination: {}", MESSAGE_COUNT, 
queue);
+        for (int i = 1; i <= MESSAGE_COUNT; i++) {
+            BytesMessage message = session.createBytesMessage();
+            message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+            message.setStringProperty("JMSXGroupID", JMSX_GROUP_ID);
+            message.setIntProperty("JMSXGroupSeq", i);
+            message.writeBytes(buffer);
+            producer.send(message);
+        }
+
+        producer.close();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to