Repository: activemq
Updated Branches:
  refs/heads/master 9f812a210 -> 5d53aa2d1


NO-JIRA: Add some more variants of the .NET transaction tests

Adds ability to not settle accepted messages on the client to enable
creation of tests that are equivalent to the AmqpNetLite client's
transaction tests which hold settlement and expect the resource to
handle it on successful discharge.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5d53aa2d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5d53aa2d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5d53aa2d

Branch: refs/heads/master
Commit: 5d53aa2d11edb3b819b4ee862a10c7bd1532e805
Parents: 9f812a2
Author: Timothy Bish <tabish...@gmail.com>
Authored: Mon Sep 19 17:36:58 2016 -0400
Committer: Timothy Bish <tabish...@gmail.com>
Committed: Mon Sep 19 17:36:58 2016 -0400

----------------------------------------------------------------------
 .../transport/amqp/client/AmqpMessage.java      |  31 +++-
 .../transport/amqp/client/AmqpReceiver.java     |  44 +++++-
 .../amqp/interop/AmqpTransactionTest.java       | 153 +++++++++++++++++++
 3 files changed, 221 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/5d53aa2d/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index 8b378e1..2b1b874 100644
--- 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -132,15 +132,28 @@ public class AmqpMessage {
      * @throws Exception if an error occurs during the accept.
      */
     public void accept() throws Exception {
+        accept(true);
+    }
+
+    /**
+     * Accepts the message marking it as consumed on the remote peer.
+     *
+     * @param settle
+     *      true if the client should also settle the delivery when sending 
the accept.
+     *
+     * @throws Exception if an error occurs during the accept.
+     */
+    public void accept(boolean settle) throws Exception {
         if (receiver == null) {
             throw new IllegalStateException("Can't accept non-received 
message.");
         }
 
-        receiver.accept(delivery);
+        receiver.accept(delivery, settle);
     }
 
     /**
-     * Accepts the message marking it as consumed on the remote peer.
+     * Accepts the message marking it as consumed on the remote peer.  This 
method
+     * will automatically settle the accepted delivery.
      *
      * @param session
      *      The session that is used to manage acceptance of the message.
@@ -148,11 +161,23 @@ public class AmqpMessage {
      * @throws Exception if an error occurs during the accept.
      */
     public void accept(AmqpSession txnSession) throws Exception {
+        accept(txnSession, true);
+    }
+
+    /**
+     * Accepts the message marking it as consumed on the remote peer.
+     *
+     * @param session
+     *      The session that is used to manage acceptance of the message.
+     *
+     * @throws Exception if an error occurs during the accept.
+     */
+    public void accept(AmqpSession txnSession, boolean settle) throws 
Exception {
         if (receiver == null) {
             throw new IllegalStateException("Can't accept non-received 
message.");
         }
 
-        receiver.accept(delivery, txnSession);
+        receiver.accept(delivery, txnSession, settle);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/5d53aa2d/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index 999e033..3543ae3 100644
--- 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -414,20 +414,34 @@ public class AmqpReceiver extends 
AmqpAbstractResource<Receiver> {
     }
 
     /**
-     * Accepts a message that was dispatched under the given Delivery instance.
+     * Accepts a message that was dispatched under the given Delivery instance 
and settles the delivery.
      *
      * @param delivery
      *        the Delivery instance to accept.
      *
      * @throws IOException if an error occurs while sending the accept.
      */
-    public void accept(final Delivery delivery) throws IOException {
-        accept(delivery, this.session);
+    public void accept(Delivery delivery) throws IOException {
+        accept(delivery, this.session, true);
     }
 
     /**
      * Accepts a message that was dispatched under the given Delivery instance.
      *
+     * @param delivery
+     *        the Delivery instance to accept.
+     * @param settle
+     *        true if the receiver should settle the delivery or just send the 
disposition.
+     *
+     * @throws IOException if an error occurs while sending the accept.
+     */
+    public void accept(Delivery delivery, boolean settle) throws IOException {
+        accept(delivery, this.session, settle);
+    }
+
+    /**
+     * Accepts a message that was dispatched under the given Delivery instance 
and settles the delivery.
+     *
      * This method allows for the session that is used in the accept to be 
specified by the
      * caller.  This allows for an accepted message to be involved in a 
transaction that is
      * being managed by some other session other than the one that created 
this receiver.
@@ -440,6 +454,26 @@ public class AmqpReceiver extends 
AmqpAbstractResource<Receiver> {
      * @throws IOException if an error occurs while sending the accept.
      */
     public void accept(final Delivery delivery, final AmqpSession session) 
throws IOException {
+        accept(delivery, session, true);
+    }
+
+    /**
+     * Accepts a message that was dispatched under the given Delivery instance.
+     *
+     * This method allows for the session that is used in the accept to be 
specified by the
+     * caller.  This allows for an accepted message to be involved in a 
transaction that is
+     * being managed by some other session other than the one that created 
this receiver.
+     *
+     * @param delivery
+     *        the Delivery instance to accept.
+     * @param session
+     *        the session under which the message is being accepted.
+     * @param settle
+     *        true if the receiver should settle the delivery or just send the 
disposition.
+     *
+     * @throws IOException if an error occurs while sending the accept.
+     */
+    public void accept(final Delivery delivery, final AmqpSession session, 
final boolean settle) throws IOException {
         checkClosed();
 
         if (delivery == null) {
@@ -469,11 +503,13 @@ public class AmqpReceiver extends 
AmqpAbstractResource<Receiver> {
                                 txState.setOutcome(Accepted.getInstance());
                                 txState.setTxnId(txnId);
                                 delivery.disposition(txState);
-                                delivery.settle();
                                 
session.getTransactionContext().registerTxConsumer(AmqpReceiver.this);
                             }
                         } else {
                             delivery.disposition(Accepted.getInstance());
+                        }
+
+                        if (settle) {
                             delivery.settle();
                         }
                     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/5d53aa2d/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
index 7cf6026..994a2e7 100644
--- 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
@@ -32,6 +32,7 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Ignore;
 import org.junit.Test;
 
 /**
@@ -574,4 +575,156 @@ public class AmqpTransactionTest extends 
AmqpClientTestSupport {
 
         connection.close();
     }
+
+    // TODO - Direct ports of the AmqpNetLite client tests that don't 
currently with this broker.
+
+    @Ignore("Fails due to no support for TX enrollment without settlement.")
+    @Test(timeout = 60000)
+    public void 
testReceiversCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement() 
throws Exception {
+        final int NUM_MESSAGES = 10;
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+
+        // Root TXN session controls all TXN send lifetimes.
+        AmqpSession txnSession = connection.createSession();
+
+        // Normal Session which won't create an TXN itself
+        AmqpSession session = connection.createSession();
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+
+        for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            message.setApplicationProperty("msgId", i);
+            sender.send(message, txnSession.getTransactionId());
+        }
+
+        // Read all messages from the Queue, do not accept them yet.
+        AmqpReceiver receiver = session.createReceiver("queue://" + 
getTestName());
+        ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
+        receiver.flow((NUM_MESSAGES + 2) * 2);
+        for (int i = 0; i < NUM_MESSAGES; ++i) {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            messages.add(message);
+        }
+
+        // Commit half the consumed messages
+        txnSession.begin();
+        for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
+            messages.get(i).accept(txnSession, false);
+        }
+        txnSession.commit();
+
+        // Rollback the other half the consumed messages
+        txnSession.begin();
+        for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+            messages.get(i).accept(txnSession, false);
+        }
+        txnSession.rollback();
+
+        // After rollback message should still be acquired so we read last 
sent message.
+        {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            assertEquals(NUM_MESSAGES, 
message.getApplicationProperty("msgId"));
+            message.release();
+        }
+
+        // Commit the other half the consumed messages
+        txnSession.begin();
+        for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+            messages.get(i).accept(txnSession);
+        }
+        txnSession.commit();
+
+        // The final message should still be pending.
+        {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            receiver.flow(1);
+            assertNotNull(message);
+            assertEquals(NUM_MESSAGES, 
message.getApplicationProperty("msgId"));
+            message.accept();
+        }
+
+        // We should have now drained the Queue
+        AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+        receiver.flow(1);
+        assertNull(message);
+
+        connection.close();
+    }
+
+    @Ignore("Fails due to no support for TX enrollment without settlement.")
+    @Test(timeout = 60000)
+    public void 
testCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement() throws 
Exception {
+        final int NUM_MESSAGES = 10;
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+
+        // Root TXN session controls all TXN send lifetimes.
+        AmqpSession txnSession = connection.createSession();
+
+        // Normal Session which won't create an TXN itself
+        AmqpSession session = connection.createSession();
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+
+        for (int i = 0; i < NUM_MESSAGES; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            message.setApplicationProperty("msgId", i);
+            sender.send(message, txnSession.getTransactionId());
+        }
+
+        // Read all messages from the Queue, do not accept them yet.
+        AmqpReceiver receiver = session.createReceiver("queue://" + 
getTestName());
+        receiver.flow(2);
+        AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS);
+        AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS);
+
+        // Accept the first one in a TXN and send a new message in that TXN as 
well
+        txnSession.begin();
+        {
+            message1.accept(txnSession, false);
+
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            message.setApplicationProperty("msgId", NUM_MESSAGES);
+
+            sender.send(message, txnSession.getTransactionId());
+        }
+        txnSession.commit();
+
+        // Accept the second one in a TXN and send a new message in that TXN 
as well but rollback
+        txnSession.begin();
+        {
+            message2.accept(txnSession, false);
+
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            message.setApplicationProperty("msgId", NUM_MESSAGES + 1);
+            sender.send(message, txnSession.getTransactionId());
+        }
+        txnSession.rollback();
+
+        message2.release();
+
+        // Should be two message available for dispatch given that we sent and 
committed one, and
+        // releases another we had previously received.
+        receiver.flow(2);
+        for (int i = 1; i <= NUM_MESSAGES; ++i) {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            assertEquals(i, message.getApplicationProperty("msgId"));
+            message.accept();
+        }
+
+        // Should be nothing left.
+        receiver.flow(1);
+        assertNull(receiver.receive(1, TimeUnit.SECONDS));
+
+        connection.close();
+    }
 }

Reply via email to