ARTEMIS-1938 Update proton-j to 0.30.0 and Qpid JMS 0.37.0

Update to latest proton-j release and refactor the dispostion code to use
the new type enums to better deal with the dispistions.  Updates to Qpid JMS
0.37.0 which still uses the current netty 4.1.28.Final dependency.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/593348b9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/593348b9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/593348b9

Branch: refs/heads/master
Commit: 593348b9ada7e7bec4b62b2b65d537b2ac25dd29
Parents: 9263bb4
Author: Timothy Bish <[email protected]>
Authored: Wed Nov 14 16:31:46 2018 -0500
Committer: Clebert Suconic <[email protected]>
Committed: Thu Nov 15 20:18:37 2018 -0500

----------------------------------------------------------------------
 .../amqp/proton/ProtonServerSenderContext.java  | 83 ++++++++++++--------
 .../protocol/amqp/util/NettyWritable.java       |  5 ++
 pom.xml                                         |  4 +-
 3 files changed, 58 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/593348b9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 24dcff0..c4aca48 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -56,14 +56,13 @@ import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.Modified;
 import org.apache.qpid.proton.amqp.messaging.Outcome;
-import org.apache.qpid.proton.amqp.messaging.Rejected;
-import org.apache.qpid.proton.amqp.messaging.Released;
 import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
 import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.proton.amqp.transaction.TransactionalState;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.DeliveryState.DeliveryStateType;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
@@ -546,22 +545,45 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
          Message message = ((MessageReference) 
delivery.getContext()).getMessage();
          DeliveryState remoteState = delivery.getRemoteState();
 
-         boolean settleImmediate = true;
-         if (remoteState instanceof Accepted) {
+         if (remoteState != null && remoteState.getType() == 
DeliveryStateType.Accepted) {
             // this can happen in the twice ack mode, that is the receiver 
accepts and settles separately
             // acking again would show an exception but would have no negative 
effect but best to handle anyway.
-            if (delivery.isSettled()) {
-               return;
-            }
-            // we have to individual ack as we can't guarantee we will get the 
delivery updates
-            // (including acks) in order from dealer, a performance hit but a 
must
-            try {
-               sessionSPI.ack(null, brokerConsumer, message);
-            } catch (Exception e) {
-               log.warn(e.toString(), e);
-               throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(),
 e.getMessage());
+            if (!delivery.isSettled()) {
+               // we have to individual ack as we can't guarantee we will get 
the delivery updates
+               // (including acks) in order from dealer, a performance hit but 
a must
+               try {
+                  sessionSPI.ack(null, brokerConsumer, message);
+               } catch (Exception e) {
+                  log.warn(e.toString(), e);
+                  throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(),
 e.getMessage());
+               }
+
+               delivery.settle();
             }
-         } else if (remoteState instanceof TransactionalState) {
+         } else {
+            handleExtendedDeliveryOutcomes(message, delivery, remoteState);
+         }
+
+         if (!preSettle) {
+            protonSession.replaceTag(delivery.getTag());
+         }
+      } finally {
+         sessionSPI.afterIO(connectionFlusher);
+         sessionSPI.resetContext(oldContext);
+      }
+   }
+
+   private boolean handleExtendedDeliveryOutcomes(Message message, Delivery 
delivery, DeliveryState remoteState) throws ActiveMQAMQPException {
+      boolean settleImmediate = true;
+      boolean handled = true;
+
+      if (remoteState == null) {
+         log.debug("Received null disposition for delivery update: " + 
remoteState);
+         return true;
+      }
+
+      switch (remoteState.getType()) {
+         case Transactional:
             // When the message arrives with a TransactionState disposition 
the ack should
             // enlist the message into the transaction associated with the 
given txn ID.
             TransactionalState txState = (TransactionalState) remoteState;
@@ -587,19 +609,22 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
                   }
                }
             }
-         } else if (remoteState instanceof Released) {
+            break;
+         case Released:
             try {
                sessionSPI.cancel(brokerConsumer, message, false);
             } catch (Exception e) {
                throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(),
 e.getMessage());
             }
-         } else if (remoteState instanceof Rejected) {
+            break;
+         case Rejected:
             try {
                sessionSPI.reject(brokerConsumer, message);
             } catch (Exception e) {
                throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(),
 e.getMessage());
             }
-         } else if (remoteState instanceof Modified) {
+            break;
+         case Modified:
             try {
                Modified modification = (Modified) remoteState;
 
@@ -615,23 +640,17 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
             } catch (Exception e) {
                throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(),
 e.getMessage());
             }
-         } else {
+            break;
+         default:
             log.debug("Received null or unknown disposition for delivery 
update: " + remoteState);
-            return;
-         }
-
-         if (!preSettle) {
-            protonSession.replaceTag(delivery.getTag());
-         }
-
-         if (settleImmediate) {
-            delivery.settle();
-         }
+            handled = false;
+      }
 
-      } finally {
-         sessionSPI.afterIO(connectionFlusher);
-         sessionSPI.resetContext(oldContext);
+      if (settleImmediate) {
+         delivery.settle();
       }
+
+      return handled;
    }
 
    private final class ConnectionFlushIOCallback implements IOCallback {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/593348b9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java
index d752bd7..6f363db 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java
@@ -86,6 +86,11 @@ public class NettyWritable implements WritableBuffer {
    }
 
    @Override
+   public void ensureRemaining(int remaining) {
+      nettyBuffer.ensureWritable(remaining);
+   }
+
+   @Override
    public int position() {
       return nettyBuffer.writerIndex();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/593348b9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 87acfca..f435121 100644
--- a/pom.xml
+++ b/pom.xml
@@ -92,10 +92,10 @@
       <mockito.version>2.8.47</mockito.version>
       <netty.version>4.1.28.Final</netty.version>
       <netty-tcnative-version>2.0.12.Final</netty-tcnative-version>
-      <proton.version>0.29.0</proton.version>
+      <proton.version>0.30.0</proton.version>
       <resteasy.version>3.0.19.Final</resteasy.version>
       <slf4j.version>1.7.21</slf4j.version>
-      <qpid.jms.version>0.36.0</qpid.jms.version>
+      <qpid.jms.version>0.37.0</qpid.jms.version>
       <johnzon.version>0.9.5</johnzon.version>
       <json-p.spec.version>1.0-alpha-1</json-p.spec.version>
       <javax.inject.version>1</javax.inject.version>

Reply via email to