Repository: activemq-artemis
Updated Branches:
  refs/heads/master 8c310a2ce -> 7374d2f72


ARTEMIS-60 Validate AMQP sender applied TransactionState 

Update the AMQP test client to allow for better inspection of the
delivery updates that happen during normal use.  Use those modification
to check that when the broker's sender accepts and settles a non-settled
disposition it adds a proper TransactionState disposition with the
correct outcome and txn-id in that state.  

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a0948928
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a0948928
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a0948928

Branch: refs/heads/master
Commit: a0948928c3b34788e6f371fd49eb4d07273ba08b
Parents: 8c310a2
Author: Timothy Bish <[email protected]>
Authored: Mon Mar 20 16:46:18 2017 -0400
Committer: Timothy Bish <[email protected]>
Committed: Mon Mar 20 16:46:18 2017 -0400

----------------------------------------------------------------------
 .../amqp/client/AmqpAbstractResource.java       | 12 +++-
 .../transport/amqp/client/AmqpConnection.java   |  3 +-
 .../transport/amqp/client/AmqpEventSink.java    |  5 +-
 .../transport/amqp/client/AmqpReceiver.java     |  4 +-
 .../transport/amqp/client/AmqpSender.java       | 14 +---
 .../amqp/client/AmqpTransactionCoordinator.java | 11 ++--
 .../integration/amqp/AmqpTransactionTest.java   | 67 ++++++++++++++++++++
 7 files changed, 94 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0948928/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
index 0ab4596..691c11f 100644
--- 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
@@ -242,7 +242,8 @@ public abstract class AmqpAbstractResource<E extends 
Endpoint> implements AmqpRe
    }
 
    @Override
-   public void processDeliveryUpdates(AmqpConnection connection) throws 
IOException {
+   public void processDeliveryUpdates(AmqpConnection connection, Delivery 
delivery) throws IOException {
+      doDeliveryUpdate(delivery);
    }
 
    @Override
@@ -305,7 +306,14 @@ public abstract class AmqpAbstractResource<E extends 
Endpoint> implements AmqpRe
    }
 
    protected void doDeliveryUpdate(Delivery delivery) {
-
+      AmqpValidator validator = getStateInspector();
+      if (validator != null) {
+         try {
+            validator.inspectDeliveryUpdate(delivery);
+         } catch (Throwable error) {
+            validator.markAsInvalid(error.getMessage());
+         }
+      }
    }
 
    //----- Private implementation utility methods ---------------------------//

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0948928/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
index fa44c02..76717fd 100644
--- 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -43,6 +43,7 @@ import 
org.apache.activemq.transport.amqp.client.util.UnmodifiableConnection;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.engine.Collector;
 import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Event;
 import org.apache.qpid.proton.engine.Event.Type;
@@ -697,7 +698,7 @@ public class AmqpConnection extends 
AmqpAbstractResource<Connection> implements
                   break;
                case DELIVERY:
                   amqpEventSink = (AmqpEventSink) 
protonEvent.getLink().getContext();
-                  amqpEventSink.processDeliveryUpdates(this);
+                  amqpEventSink.processDeliveryUpdates(this, (Delivery) 
protonEvent.getContext());
                   break;
                default:
                   break;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0948928/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpEventSink.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpEventSink.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpEventSink.java
index 1c511a5..5581328 100644
--- 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpEventSink.java
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpEventSink.java
@@ -18,6 +18,8 @@ package org.apache.activemq.transport.amqp.client;
 
 import java.io.IOException;
 
+import org.apache.qpid.proton.engine.Delivery;
+
 /**
  * Interface used by classes that want to process AMQP events sent from
  * the transport layer.
@@ -53,9 +55,10 @@ public interface AmqpEventSink {
     * for the given endpoint.
     *
     * @param connection the AmqpConnection instance for easier access to fire 
events.
+    * @param delivery the Delivery that was updated.
     * @throws IOException if an error occurs while processing the update.
     */
-   void processDeliveryUpdates(AmqpConnection connection) throws IOException;
+   void processDeliveryUpdates(AmqpConnection connection, Delivery delivery) 
throws IOException;
 
    /**
     * Called when the Proton Engine signals an Flow related event has been 
triggered

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0948928/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index cd76501..414f933 100644
--- 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -794,7 +794,7 @@ public class AmqpReceiver extends 
AmqpAbstractResource<Receiver> {
    }
 
    @Override
-   public void processDeliveryUpdates(AmqpConnection connection) throws 
IOException {
+   public void processDeliveryUpdates(AmqpConnection connection, Delivery 
delivery) throws IOException {
       Delivery incoming = null;
       do {
          incoming = getEndpoint().current();
@@ -823,7 +823,7 @@ public class AmqpReceiver extends 
AmqpAbstractResource<Receiver> {
          }
       } while (incoming != null);
 
-      super.processDeliveryUpdates(connection);
+      super.processDeliveryUpdates(connection, delivery);
    }
 
    private void processDelivery(Delivery incoming) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0948928/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
index 350a201..0a41ce6 100644
--- 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.transport.amqp.client;
 
-import javax.jms.InvalidDestinationException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedHashSet;
@@ -26,6 +25,8 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import javax.jms.InvalidDestinationException;
+
 import org.apache.activemq.transport.amqp.client.util.AsyncResult;
 import org.apache.activemq.transport.amqp.client.util.ClientFuture;
 import org.apache.activemq.transport.amqp.client.util.UnmodifiableSender;
@@ -419,7 +420,7 @@ public class AmqpSender extends 
AmqpAbstractResource<Sender> {
    }
 
    @Override
-   public void processDeliveryUpdates(AmqpConnection connection) throws 
IOException {
+   public void processDeliveryUpdates(AmqpConnection connection, Delivery 
updated) throws IOException {
       List<Delivery> toRemove = new ArrayList<>();
 
       for (Delivery delivery : pending) {
@@ -485,13 +486,4 @@ public class AmqpSender extends 
AmqpAbstractResource<Sender> {
    public String toString() {
       return getClass().getSimpleName() + "{ address = " + address + "}";
    }
-
-   @Override
-   protected void doDeliveryUpdate(Delivery delivery) {
-      try {
-         getStateInspector().inspectDeliveryUpdate(delivery);
-      } catch (Throwable error) {
-         getStateInspector().markAsInvalid(error.getMessage());
-      }
-   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0948928/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionCoordinator.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionCoordinator.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionCoordinator.java
index 2e1a3ab..bc1030e 100644
--- 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionCoordinator.java
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionCoordinator.java
@@ -16,9 +16,6 @@
  */
 package org.apache.activemq.transport.amqp.client;
 
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.TransactionRolledBackException;
 import java.io.IOException;
 import java.nio.BufferOverflowException;
 import java.util.HashMap;
@@ -27,6 +24,10 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.TransactionRolledBackException;
+
 import org.apache.activemq.transport.amqp.client.util.AsyncResult;
 import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
@@ -67,7 +68,7 @@ public class AmqpTransactionCoordinator extends 
AmqpAbstractResource<Sender> {
    }
 
    @Override
-   public void processDeliveryUpdates(AmqpConnection connection) throws 
IOException {
+   public void processDeliveryUpdates(AmqpConnection connection, Delivery 
delivery) throws IOException {
       try {
          Iterator<Delivery> deliveries = pendingDeliveries.iterator();
          while (deliveries.hasNext()) {
@@ -112,7 +113,7 @@ public class AmqpTransactionCoordinator extends 
AmqpAbstractResource<Sender> {
             deliveries.remove();
          }
 
-         super.processDeliveryUpdates(connection);
+         super.processDeliveryUpdates(connection, delivery);
       } catch (Exception e) {
          throw IOExceptionSupport.create(e);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0948928/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
index d49d499..3a9d498 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
@@ -41,6 +41,7 @@ import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.apache.activemq.transport.amqp.client.AmqpValidator;
 import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.transaction.TransactionalState;
 import org.apache.qpid.proton.amqp.transport.DeliveryState;
 import org.apache.qpid.proton.engine.Delivery;
@@ -920,6 +921,72 @@ public class AmqpTransactionTest extends 
AmqpClientTestSupport {
          sendConnection.close();
          consumerConnection.close();
       }
+   }
+
+   @Test(timeout = 30000)
+   public void testUnsettledTXMessageGetTransactedDispostion() throws 
Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+      assertNotNull(session);
+
+      AmqpSender sender = session.createSender(getTestName());
+      AmqpMessage message = new AmqpMessage();
+      message.setText("Test-Message");
+      sender.send(message);
+
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+      receiver.setStateInspector(new AmqpValidator() {
 
+         @Override
+         public void inspectDeliveryUpdate(Delivery delivery) {
+            if (delivery.remotelySettled()) {
+               LOG.info("Receiver got delivery update for: {}", delivery);
+               if (!(delivery.getRemoteState() instanceof TransactionalState)) 
{
+                  markAsInvalid("Transactionally acquire work no tagged as 
being in a transaction.");
+               } else {
+                  TransactionalState txState = (TransactionalState) 
delivery.getRemoteState();
+                  if (!(txState.getOutcome() instanceof Accepted)) {
+                     markAsInvalid("Transaction state lacks any outcome");
+                  } else if (txState.getTxnId() == null) {
+                     markAsInvalid("Transaction state lacks any TX Id");
+                  }
+               }
+
+               if (!(delivery.getLocalState() instanceof TransactionalState)) {
+                  markAsInvalid("Transactionally acquire work no tagged as 
being in a transaction.");
+               } else {
+                  TransactionalState txState = (TransactionalState) 
delivery.getLocalState();
+                  if (!(txState.getOutcome() instanceof Accepted)) {
+                     markAsInvalid("Transaction state lacks any outcome");
+                  } else if (txState.getTxnId() == null) {
+                     markAsInvalid("Transaction state lacks any TX Id");
+                  }
+               }
+
+               TransactionalState localTxState = (TransactionalState) 
delivery.getLocalState();
+               TransactionalState remoteTxState = (TransactionalState) 
delivery.getRemoteState();
+
+               if (!localTxState.getTxnId().equals(remoteTxState)) {
+                  markAsInvalid("Message not enrolled in expected 
transaction");
+               }
+            }
+         }
+      });
+
+      session.begin();
+
+      assertTrue(session.isInTransaction());
+
+      receiver.flow(1);
+      AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS);
+      assertNotNull(received);
+      received.accept(false);
+
+      session.commit();
+
+      sender.getStateInspector().assertValid();
+
+      connection.close();
    }
 }

Reply via email to