NO-JIRA AMQP Test updates Adds support for doing sends and receives that are enrolled in a transaction created in a session other than the session that created the sender or receiver. Adds some tests that show this in action.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fa551498 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fa551498 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fa551498 Branch: refs/heads/activemq-5.14.x Commit: fa5514985d74ff2d0bfd7faebb28452a2a7eed2e Parents: 14c5c52 Author: Timothy Bish <[email protected]> Authored: Wed Sep 14 18:23:52 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Tue Sep 27 12:15:06 2016 -0400 ---------------------------------------------------------------------- .../transport/amqp/client/AmqpMessage.java | 19 +- .../transport/amqp/client/AmqpReceiver.java | 26 +++ .../transport/amqp/client/AmqpSender.java | 29 ++- .../transport/amqp/client/AmqpSession.java | 8 +- .../amqp/interop/AmqpTransactionTest.java | 194 +++++++++++++++++++ 5 files changed, 269 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/fa551498/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index 99f4cfb..8b378e1 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -140,6 +140,22 @@ public class AmqpMessage { } /** + * Accepts the message marking it as consumed on the remote peer. + * + * @param session + * The session that is used to manage acceptance of the message. + * + * @throws Exception if an error occurs during the accept. + */ + public void accept(AmqpSession txnSession) throws Exception { + if (receiver == null) { + throw new IllegalStateException("Can't accept non-received message."); + } + + receiver.accept(delivery, txnSession); + } + + /** * Marks the message as Modified, indicating whether it failed to deliver and is not deliverable here. * * @param deliveryFailed @@ -374,7 +390,7 @@ public class AmqpMessage { * @param key * the name used to lookup the property in the application properties. * - * @return the propety value or null if not set. + * @return the property value or null if not set. */ public Object getApplicationProperty(String key) { if (applicationPropertiesMap == null) { @@ -560,6 +576,7 @@ public class AmqpMessage { message.setHeader(new Header()); } } + private void lazyCreateProperties() { if (message.getProperties() == null) { message.setProperties(new Properties()); http://git-wip-us.apache.org/repos/asf/activemq/blob/fa551498/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index 77a529d..999e033 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -422,12 +422,38 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> { * @throws IOException if an error occurs while sending the accept. */ public void accept(final Delivery delivery) throws IOException { + accept(delivery, this.session); + } + + /** + * Accepts a message that was dispatched under the given Delivery instance. + * + * This method allows for the session that is used in the accept to be specified by the + * caller. This allows for an accepted message to be involved in a transaction that is + * being managed by some other session other than the one that created this receiver. + * + * @param delivery + * the Delivery instance to accept. + * @param session + * the session under which the message is being accepted. + * + * @throws IOException if an error occurs while sending the accept. + */ + public void accept(final Delivery delivery, final AmqpSession session) throws IOException { checkClosed(); if (delivery == null) { throw new IllegalArgumentException("Delivery to accept cannot be null"); } + if (session == null) { + throw new IllegalArgumentException("Session given cannot be null"); + } + + if (session.getConnection() != this.session.getConnection()) { + throw new IllegalArgumentException("The session used for accept must originate from the connection that created this receiver."); + } + final ClientFuture request = new ClientFuture(); session.getScheduler().execute(new Runnable() { http://git-wip-us.apache.org/repos/asf/activemq/blob/fa551498/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java index f9d6435..dd3a371 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java @@ -127,6 +127,21 @@ public class AmqpSender extends AmqpAbstractResource<Sender> { */ public void send(final AmqpMessage message) throws IOException { checkClosed(); + send(message, null); + } + + /** + * Sends the given message to this senders assigned address using the supplied transaction ID. + * + * @param message + * the message to send. + * @param txId + * the transaction ID to assign the outgoing send. + * + * @throws IOException if an error occurs during the send. + */ + public void send(final AmqpMessage message, final AmqpTransactionId txId) throws IOException { + checkClosed(); final ClientFuture sendRequest = new ClientFuture(); session.getScheduler().execute(new Runnable() { @@ -134,7 +149,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> { @Override public void run() { try { - doSend(message, sendRequest); + doSend(message, sendRequest, txId); session.pumpToProtonTransport(sendRequest); } catch (Exception e) { sendRequest.onFailure(e); @@ -319,7 +334,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> { } } - private void doSend(AmqpMessage message, AsyncResult request) throws Exception { + private void doSend(AmqpMessage message, AsyncResult request, AmqpTransactionId txId) throws Exception { LOG.trace("Producer sending message: {}", message); Delivery delivery = null; @@ -332,8 +347,14 @@ public class AmqpSender extends AmqpAbstractResource<Sender> { delivery.setContext(request); - if (session.isInTransaction()) { - Binary amqpTxId = session.getTransactionId().getRemoteTxId(); + Binary amqpTxId = null; + if (txId != null) { + amqpTxId = txId.getRemoteTxId(); + } else if (session.isInTransaction()) { + amqpTxId = session.getTransactionId().getRemoteTxId(); + } + + if (amqpTxId != null) { TransactionalState state = new TransactionalState(); state.setTxnId(amqpTxId); delivery.disposition(state); http://git-wip-us.apache.org/repos/asf/activemq/blob/fa551498/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index ae99f65..3804603 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -464,8 +464,12 @@ public class AmqpSession extends AmqpAbstractResource<Session> { connection.pumpToProtonTransport(request); } - AmqpTransactionId getTransactionId() { - return txContext.getTransactionId(); + public AmqpTransactionId getTransactionId() { + if (txContext != null && txContext.isInTransaction()) { + return txContext.getTransactionId(); + } + + return null; } AmqpTransactionContext getTransactionContext() { http://git-wip-us.apache.org/repos/asf/activemq/blob/fa551498/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java index a998290..97089a9 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java @@ -178,4 +178,198 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { sender.close(); connection.close(); } + + @Test(timeout = 60000) + public void testMultipleSessionReceiversInSingleTXNWithCommit() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + + // Load up the Queue with some messages + { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender("queue://" + getTestName()); + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message); + sender.send(message); + sender.send(message); + sender.close(); + } + + // Root TXN session controls all TXN send lifetimes. + AmqpSession txnSession = connection.createSession(); + + // Create some sender sessions + AmqpSession session1 = connection.createSession(); + AmqpSession session2 = connection.createSession(); + AmqpSession session3 = connection.createSession(); + + // Sender linked to each session + AmqpReceiver receiver1 = session1.createReceiver("queue://" + getTestName()); + AmqpReceiver receiver2 = session2.createReceiver("queue://" + getTestName()); + AmqpReceiver receiver3 = session3.createReceiver("queue://" + getTestName()); + + final QueueViewMBean queue = getProxyToQueue(getTestName()); + assertEquals(3, queue.getQueueSize()); + + // Begin the transaction that all senders will operate in. + txnSession.begin(); + + assertTrue(txnSession.isInTransaction()); + + receiver1.flow(1); + receiver2.flow(1); + receiver3.flow(1); + + AmqpMessage message1 = receiver1.receive(5, TimeUnit.SECONDS); + AmqpMessage message2 = receiver2.receive(5, TimeUnit.SECONDS); + AmqpMessage message3 = receiver3.receive(5, TimeUnit.SECONDS); + + message1.accept(txnSession); + message2.accept(txnSession); + message3.accept(txnSession); + + assertEquals(3, queue.getQueueSize()); + + txnSession.commit(); + + assertEquals(0, queue.getQueueSize()); + } + + @Test(timeout = 60000) + public void testMultipleSessionReceiversInSingleTXNWithRollback() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + + // Load up the Queue with some messages + { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender("queue://" + getTestName()); + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message); + sender.send(message); + sender.send(message); + sender.close(); + } + + // Root TXN session controls all TXN send lifetimes. + AmqpSession txnSession = connection.createSession(); + + // Create some sender sessions + AmqpSession session1 = connection.createSession(); + AmqpSession session2 = connection.createSession(); + AmqpSession session3 = connection.createSession(); + + // Sender linked to each session + AmqpReceiver receiver1 = session1.createReceiver("queue://" + getTestName()); + AmqpReceiver receiver2 = session2.createReceiver("queue://" + getTestName()); + AmqpReceiver receiver3 = session3.createReceiver("queue://" + getTestName()); + + final QueueViewMBean queue = getProxyToQueue(getTestName()); + assertEquals(3, queue.getQueueSize()); + + // Begin the transaction that all senders will operate in. + txnSession.begin(); + + assertTrue(txnSession.isInTransaction()); + + receiver1.flow(1); + receiver2.flow(1); + receiver3.flow(1); + + AmqpMessage message1 = receiver1.receive(5, TimeUnit.SECONDS); + AmqpMessage message2 = receiver2.receive(5, TimeUnit.SECONDS); + AmqpMessage message3 = receiver3.receive(5, TimeUnit.SECONDS); + + message1.accept(txnSession); + message2.accept(txnSession); + message3.accept(txnSession); + + assertEquals(3, queue.getQueueSize()); + + txnSession.rollback(); + + assertEquals(3, queue.getQueueSize()); + } + + @Test(timeout = 60000) + public void testMultipleSessionSendersInSingleTXNWithCommit() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + + // Root TXN session controls all TXN send lifetimes. + AmqpSession txnSession = connection.createSession(); + + // Create some sender sessions + AmqpSession session1 = connection.createSession(); + AmqpSession session2 = connection.createSession(); + AmqpSession session3 = connection.createSession(); + + // Sender linked to each session + AmqpSender sender1 = session1.createSender("queue://" + getTestName()); + AmqpSender sender2 = session2.createSender("queue://" + getTestName()); + AmqpSender sender3 = session3.createSender("queue://" + getTestName()); + + final QueueViewMBean queue = getProxyToQueue(getTestName()); + assertEquals(0, queue.getQueueSize()); + + // Begin the transaction that all senders will operate in. + txnSession.begin(); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + + assertTrue(txnSession.isInTransaction()); + + sender1.send(message, txnSession.getTransactionId()); + sender2.send(message, txnSession.getTransactionId()); + sender3.send(message, txnSession.getTransactionId()); + + assertEquals(0, queue.getQueueSize()); + + txnSession.commit(); + + assertEquals(3, queue.getQueueSize()); + } + + @Test(timeout = 60000) + public void testMultipleSessionSendersInSingleTXNWithRollback() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + + // Root TXN session controls all TXN send lifetimes. + AmqpSession txnSession = connection.createSession(); + + // Create some sender sessions + AmqpSession session1 = connection.createSession(); + AmqpSession session2 = connection.createSession(); + AmqpSession session3 = connection.createSession(); + + // Sender linked to each session + AmqpSender sender1 = session1.createSender("queue://" + getTestName()); + AmqpSender sender2 = session2.createSender("queue://" + getTestName()); + AmqpSender sender3 = session3.createSender("queue://" + getTestName()); + + final QueueViewMBean queue = getProxyToQueue(getTestName()); + assertEquals(0, queue.getQueueSize()); + + // Begin the transaction that all senders will operate in. + txnSession.begin(); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + + assertTrue(txnSession.isInTransaction()); + + sender1.send(message, txnSession.getTransactionId()); + sender2.send(message, txnSession.getTransactionId()); + sender3.send(message, txnSession.getTransactionId()); + + assertEquals(0, queue.getQueueSize()); + + txnSession.rollback(); + + assertEquals(0, queue.getQueueSize()); + } }
