Repository: qpid-jms Updated Branches: refs/heads/master a6d030753 -> d51d41911
add test demonstrating ordering issue after rollback Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/d51d4191 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/d51d4191 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/d51d4191 Branch: refs/heads/master Commit: d51d419112b8544eb03a208f21033c01153b44c5 Parents: a6d0307 Author: Robert Gemmell <[email protected]> Authored: Wed Nov 26 10:20:57 2014 +0000 Committer: Robert Gemmell <[email protected]> Committed: Wed Nov 26 10:20:57 2014 +0000 ---------------------------------------------------------------------- .../qpid/jms/support/QpidJmsTestSupport.java | 5 ++- .../transactions/JmsTransactedConsumerTest.java | 42 ++++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d51d4191/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/QpidJmsTestSupport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/QpidJmsTestSupport.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/QpidJmsTestSupport.java index 509a551..f41c778 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/QpidJmsTestSupport.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/QpidJmsTestSupport.java @@ -67,6 +67,8 @@ import org.slf4j.LoggerFactory; */ public class QpidJmsTestSupport { + public static final String MESSAGE_NUMBER = "MessageNumber"; + public static final String KAHADB_DIRECTORY = "target/activemq-data"; @Rule public TestName name = new TestName(); @@ -258,9 +260,10 @@ public class QpidJmsTestSupport { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer p = session.createProducer(destination); - for (int i = 0; i < count; i++) { + for (int i = 1; i <= count; i++) { TextMessage message = session.createTextMessage(); message.setText("TextMessage: " + i); + message.setIntProperty(MESSAGE_NUMBER, i); p.send(message); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d51d4191/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java index cfe6328..c4b61fa 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java @@ -29,6 +29,8 @@ import javax.jms.TextMessage; import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.qpid.jms.support.AmqpTestSupport; +import org.apache.qpid.jms.support.QpidJmsTestSupport; +import org.junit.Ignore; import org.junit.Test; /** @@ -162,6 +164,46 @@ public class JmsTransactedConsumerTest extends AmqpTestSupport { assertEquals(0, proxy.getQueueSize()); } + @Ignore //TODO: enable after fixing ordering issue + @Test(timeout = 60000) + public void testReceiveSomeThenRollback() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + int totalCount = 5; + int consumeBeforeRollback = 2; + sendToAmqQueue(totalCount); + + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(totalCount, proxy.getQueueSize()); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(queue); + + for(int i = 1; i <= consumeBeforeRollback; i++) { + Message message = consumer.receive(1000); + assertNotNull(message); + assertEquals("Unexpected message number", i, message.getIntProperty(QpidJmsTestSupport.MESSAGE_NUMBER)); + } + + session.rollback(); + + assertEquals(totalCount, proxy.getQueueSize()); + + // Consume again.. the previously consumed messages should get delivered + // again after the rollback and then the remainder should follow + for(int i = 1; i <= totalCount; i++) { + Message message = consumer.receive(1000); + assertNotNull(message); + assertEquals("Unexpected message number after rollback", i, message.getIntProperty(QpidJmsTestSupport.MESSAGE_NUMBER)); + } + + session.commit(); + + assertEquals(0, proxy.getQueueSize()); + } + @Test(timeout = 60000) public void testCloseConsumerBeforeCommit() throws Exception { connection = createAmqpConnection(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
