Repository: activemq-artemis
Updated Branches:
  refs/heads/1.x 3b33705b8 -> af443d705


ARTEMIS-891 - upgrade proton to 0.16

https://issues.apache.org/jira/browse/ARTEMIS-891
(cherry picked from commit 881615e6464c4779917e275c2e9635442ccd77f4)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/af443d70
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/af443d70
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/af443d70

Branch: refs/heads/1.x
Commit: af443d70526989b3f3c759b0aaa9c44b33e69874
Parents: 3b33705
Author: Andy Taylor <[email protected]>
Authored: Wed Dec 14 18:11:09 2016 +0000
Committer: Clebert Suconic <[email protected]>
Committed: Wed Dec 14 14:30:21 2016 -0500

----------------------------------------------------------------------
 .../proton/ProtonServerReceiverContext.java     | 10 ++++
 pom.xml                                         |  2 +-
 .../transport/amqp/client/AmqpSender.java       | 34 +++++++++++++
 .../transport/amqp/client/AmqpSession.java      | 51 ++++++++++++++++++++
 .../amqp/client/util/UnmodifiableLink.java      | 30 ++++++++++++
 .../amqp/client/util/UnmodifiableReceiver.java  |  6 +++
 .../amqp/client/util/UnmodifiableSender.java    |  6 +++
 .../amqp/client/util/UnmodifiableSession.java   | 48 ++++++++++++++++++
 .../integration/amqp/AmqpSendReceiveTest.java   | 30 ++++++++++++
 9 files changed, 216 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af443d70/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 0cc293a..c30fe5b 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -33,6 +33,9 @@ import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
 import org.jboss.logging.Logger;
 
+import java.util.Arrays;
+import java.util.List;
+
 public class ProtonServerReceiverContext extends ProtonInitializable 
implements ProtonDeliveryHandler {
 
    private static final Logger log = 
Logger.getLogger(ProtonServerReceiverContext.class);
@@ -106,6 +109,13 @@ public class ProtonServerReceiverContext extends 
ProtonInitializable implements
                throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
             }
          }
+         Symbol[] remoteDesiredCapabilities = 
receiver.getRemoteDesiredCapabilities();
+         if (remoteDesiredCapabilities != null) {
+            List<Symbol> list = Arrays.asList(remoteDesiredCapabilities);
+            if (list.contains(AmqpSupport.DELAYED_DELIVERY)) {
+               receiver.setOfferedCapabilities(new Symbol[] 
{AmqpSupport.DELAYED_DELIVERY});
+            }
+         }
       }
       flow(maxCreditAllocation, minCreditRefresh);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af443d70/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index de1e9d6..265b3ae 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,7 +84,7 @@
       <jgroups.version>3.6.9.Final</jgroups.version>
       <maven.assembly.plugin.version>2.4</maven.assembly.plugin.version>
       <netty.version>4.1.5.Final</netty.version>
-      <proton.version>0.15.0</proton.version>
+      <proton.version>0.16.0</proton.version>
       <resteasy.version>3.0.19.Final</resteasy.version>
       <slf4j.version>1.7.21</slf4j.version>
       <qpid.jms.version>0.11.0</qpid.jms.version>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af443d70/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
index 9b2a70d..350a201 100644
--- 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -72,6 +73,10 @@ public class AmqpSender extends AmqpAbstractResource<Sender> 
{
    private final Set<Delivery> pending = new LinkedHashSet<>();
    private byte[] encodeBuffer = new byte[1024 * 8];
 
+   private Symbol[] desiredCapabilities;
+   private Symbol[] offeredCapabilities;
+   private Map<Symbol, Object> properties;
+
    /**
     * Create a new sender instance.
     *
@@ -231,6 +236,31 @@ public class AmqpSender extends 
AmqpAbstractResource<Sender> {
       this.sendTimeout = sendTimeout;
    }
 
+
+   public void setDesiredCapabilities(Symbol[] desiredCapabilities) {
+      if (getEndpoint() != null) {
+         throw new IllegalStateException("Endpoint already established");
+      }
+
+      this.desiredCapabilities = desiredCapabilities;
+   }
+
+   public void setOfferedCapabilities(Symbol[] offeredCapabilities) {
+      if (getEndpoint() != null) {
+         throw new IllegalStateException("Endpoint already established");
+      }
+
+      this.offeredCapabilities = offeredCapabilities;
+   }
+
+   public void setProperties(Map<Symbol, Object> properties) {
+      if (getEndpoint() != null) {
+         throw new IllegalStateException("Endpoint already established");
+      }
+
+      this.properties = properties;
+   }
+
    //----- Private Sender implementation ------------------------------------//
 
    private void checkClosed() {
@@ -265,6 +295,10 @@ public class AmqpSender extends 
AmqpAbstractResource<Sender> {
       }
       sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
 
+      sender.setDesiredCapabilities(desiredCapabilities);
+      sender.setOfferedCapabilities(offeredCapabilities);
+      sender.setProperties(properties);
+
       setEndpoint(sender);
 
       super.doOpen();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af443d70/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index fc3fdf7..0863fb0 100644
--- 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport.amqp.client;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -24,6 +25,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.activemq.transport.amqp.client.util.AsyncResult;
 import org.apache.activemq.transport.amqp.client.util.ClientFuture;
 import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession;
+import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.Target;
 import org.apache.qpid.proton.engine.Connection;
@@ -102,16 +104,47 @@ public class AmqpSession extends 
AmqpAbstractResource<Session> {
    /**
     * Create a sender instance using the given address
     *
+    * @param address the address to which the sender will produce its messages.
+    * @param desiredCapabilities the capabilities that the caller wants the 
remote to support.
+    * @return a newly created sender that is ready for use.
+    * @throws Exception if an error occurs while creating the sender.
+    */
+   public AmqpSender createSender(final String address, Symbol[] 
desiredCapabilities) throws Exception {
+      return createSender(address, false, desiredCapabilities, null, null);
+   }
+
+
+   /**
+    * Create a sender instance using the given address
+    *
     * @param address   the address to which the sender will produce its 
messages.
     * @param presettle controls if the created sender produces message that 
have already been marked settled.
     * @return a newly created sender that is ready for use.
     * @throws Exception if an error occurs while creating the sender.
     */
    public AmqpSender createSender(final String address, boolean presettle) 
throws Exception {
+      return createSender(address, presettle, null, null, null);
+   }
+
+   /**
+    * Create a sender instance using the given address
+    *
+    * @param address   the address to which the sender will produce its 
messages.
+    * @param presettle controls if the created sender produces message that 
have already been marked settled.
+    * @param desiredCapabilities the capabilities that the caller wants the 
remote to support.
+    * @param offeredCapabilities the capabilities that the caller wants the 
advertise support for.
+    * @param properties the properties to send as part of the sender open.
+    * @return a newly created sender that is ready for use.
+    * @throws Exception if an error occurs while creating the sender.
+    */
+   public AmqpSender createSender(final String address, boolean presettle, 
Symbol[] desiredCapabilities, Symbol[] offeredCapabilities, Map<Symbol, Object> 
properties) throws Exception {
       checkClosed();
 
       final AmqpSender sender = new AmqpSender(AmqpSession.this, address, 
getNextSenderId());
       sender.setPresettle(presettle);
+      sender.setDesiredCapabilities(desiredCapabilities);
+      sender.setOfferedCapabilities(offeredCapabilities);
+      sender.setProperties(properties);
       final ClientFuture request = new ClientFuture();
 
       connection.getScheduler().execute(new Runnable() {
@@ -150,9 +183,27 @@ public class AmqpSession extends 
AmqpAbstractResource<Session> {
     * @throws Exception if an error occurs while creating the receiver.
     */
    public AmqpSender createSender(Target target, String senderId) throws 
Exception {
+      return createSender(target, senderId, null, null, null);
+   }
+
+   /**
+    * Create a sender instance using the given Target
+    *
+    * @param target the caller created and configured Traget used to create 
the sender link.
+    * @param senderId the sender ID to assign to the newly created Sender.
+    * @param desiredCapabilities the capabilities that the caller wants the 
remote to support.
+    * @param offeredCapabilities the capabilities that the caller wants the 
advertise support for.
+    * @param properties the properties to send as part of the sender open.
+    * @return a newly created sender that is ready for use.
+    * @throws Exception if an error occurs while creating the receiver.
+    */
+   public AmqpSender createSender(Target target, String senderId, Symbol[] 
desiredCapabilities, Symbol[] offeredCapabilities, Map<Symbol, Object> 
properties) throws Exception {
       checkClosed();
 
       final AmqpSender sender = new AmqpSender(AmqpSession.this, target, 
senderId);
+      sender.setDesiredCapabilities(desiredCapabilities);
+      sender.setOfferedCapabilities(offeredCapabilities);
+      sender.setProperties(properties);
       final ClientFuture request = new ClientFuture();
 
       connection.getScheduler().execute(new Runnable() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af443d70/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java
index ac0e83e..7e4319d 100644
--- 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java
@@ -273,4 +273,34 @@ public class UnmodifiableLink implements Link {
    public Map<Symbol, Object> getRemoteProperties() {
       return link.getRemoteProperties();
    }
+
+   @Override
+   public Symbol[] getDesiredCapabilities() {
+      return link.getDesiredCapabilities();
+   }
+
+   @Override
+   public Symbol[] getOfferedCapabilities() {
+      return link.getOfferedCapabilities();
+   }
+
+   @Override
+   public Symbol[] getRemoteDesiredCapabilities() {
+      return link.getRemoteDesiredCapabilities();
+   }
+
+   @Override
+   public Symbol[] getRemoteOfferedCapabilities() {
+      return link.getRemoteOfferedCapabilities();
+   }
+
+   @Override
+   public void setDesiredCapabilities(Symbol[] capabilities) {
+      throw new UnsupportedOperationException("Cannot alter the Link state");
+   }
+
+   @Override
+   public void setOfferedCapabilities(Symbol[] capabilities) {
+      throw new UnsupportedOperationException("Cannot alter the Link state");
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af443d70/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java
index 92760db..f447d87 100644
--- 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.transport.amqp.client.util;
 
+import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.engine.Receiver;
 
 /**
@@ -43,6 +44,11 @@ public class UnmodifiableReceiver extends UnmodifiableLink 
implements Receiver {
    }
 
    @Override
+   public int recv(WritableBuffer buffer) {
+      throw new UnsupportedOperationException("Cannot alter the Link state");
+   }
+
+   @Override
    public void drain(int credit) {
       throw new UnsupportedOperationException("Cannot alter the Link state");
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af443d70/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java
index 89742cb..3c67f68 100644
--- 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.transport.amqp.client.util;
 
+import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.engine.Sender;
 
 /**
@@ -39,6 +40,11 @@ public class UnmodifiableSender extends UnmodifiableLink 
implements Sender {
    }
 
    @Override
+   public int send(ReadableBuffer buffer) {
+      throw new UnsupportedOperationException("Cannot alter the Link state");
+   }
+
+   @Override
    public void abort() {
       throw new UnsupportedOperationException("Cannot alter the Link state");
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af443d70/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java
----------------------------------------------------------------------
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java
index a44028e..3fc26cb 100644
--- 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java
@@ -17,7 +17,9 @@
 package org.apache.activemq.transport.amqp.client.util;
 
 import java.util.EnumSet;
+import java.util.Map;
 
+import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.EndpointState;
@@ -147,4 +149,50 @@ public class UnmodifiableSession implements Session {
    public void setOutgoingWindow(long outgoingWindowSize) {
       throw new UnsupportedOperationException("Cannot alter the Session");
    }
+
+
+   @Override
+   public Symbol[] getDesiredCapabilities() {
+      return session.getDesiredCapabilities();
+   }
+
+   @Override
+   public Symbol[] getOfferedCapabilities() {
+      return session.getOfferedCapabilities();
+   }
+
+   @Override
+   public Map<Symbol, Object> getProperties() {
+      return session.getProperties();
+   }
+
+   @Override
+   public Symbol[] getRemoteDesiredCapabilities() {
+      return session.getRemoteDesiredCapabilities();
+   }
+
+   @Override
+   public Symbol[] getRemoteOfferedCapabilities() {
+      return session.getRemoteOfferedCapabilities();
+   }
+
+   @Override
+   public Map<Symbol, Object> getRemoteProperties() {
+      return session.getRemoteProperties();
+   }
+
+   @Override
+   public void setDesiredCapabilities(Symbol[] capabilities) {
+      throw new UnsupportedOperationException("Cannot alter the Link state");
+   }
+
+   @Override
+   public void setOfferedCapabilities(Symbol[] capabilities) {
+      throw new UnsupportedOperationException("Cannot alter the Link state");
+   }
+
+   @Override
+   public void setProperties(Map<Symbol, Object> capabilities) {
+      throw new UnsupportedOperationException("Cannot alter the Link state");
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af443d70/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
index aae2650..b817834 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.amqp;
 import static 
org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
 import static 
org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
 import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
+import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
 
 import java.util.ArrayList;
 import java.util.LinkedList;
@@ -30,6 +31,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
@@ -41,6 +43,7 @@ import 
org.apache.activemq.transport.amqp.client.AmqpValidator;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -833,6 +836,33 @@ public class AmqpSendReceiveTest extends 
AmqpClientTestSupport {
       connection.close();
    }
 
+   @Test
+   public void testDeliveryDelayOfferedWhenRequested() throws Exception {
+      AmqpClient client = createAmqpClient();
+      client.setValidator(new AmqpValidator() {
+
+         @Override
+         public void inspectOpenedResource(Sender sender) {
+
+            Symbol[] offered = sender.getRemoteOfferedCapabilities();
+            if (!contains(offered, AmqpSupport.DELAYED_DELIVERY)) {
+               markAsInvalid("Broker did not indicate it support delayed 
message delivery");
+            }
+         }
+      });
+
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender("queue://" + getTestName(), new 
Symbol[] {AmqpSupport.DELAYED_DELIVERY});
+      assertNotNull(sender);
+
+      connection.getStateInspector().assertValid();
+
+      sender.close();
+      connection.close();
+   }
+
    public void sendMessages(String destinationName, int count) throws 
Exception {
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());

Reply via email to