ARTEMIS-1938 Update proton-j to 0.30.0 and Qpid JMS 0.37.0 Update to latest proton-j release and refactor the dispostion code to use the new type enums to better deal with the dispistions. Updates to Qpid JMS 0.37.0 which still uses the current netty 4.1.28.Final dependency.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/593348b9 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/593348b9 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/593348b9 Branch: refs/heads/master Commit: 593348b9ada7e7bec4b62b2b65d537b2ac25dd29 Parents: 9263bb4 Author: Timothy Bish <[email protected]> Authored: Wed Nov 14 16:31:46 2018 -0500 Committer: Clebert Suconic <[email protected]> Committed: Thu Nov 15 20:18:37 2018 -0500 ---------------------------------------------------------------------- .../amqp/proton/ProtonServerSenderContext.java | 83 ++++++++++++-------- .../protocol/amqp/util/NettyWritable.java | 5 ++ pom.xml | 4 +- 3 files changed, 58 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/593348b9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 24dcff0..c4aca48 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -56,14 +56,13 @@ import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.Modified; import org.apache.qpid.proton.amqp.messaging.Outcome; -import org.apache.qpid.proton.amqp.messaging.Rejected; -import org.apache.qpid.proton.amqp.messaging.Released; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.TerminusDurability; import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; import org.apache.qpid.proton.amqp.transaction.TransactionalState; import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.DeliveryState; +import org.apache.qpid.proton.amqp.transport.DeliveryState.DeliveryStateType; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; @@ -546,22 +545,45 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr Message message = ((MessageReference) delivery.getContext()).getMessage(); DeliveryState remoteState = delivery.getRemoteState(); - boolean settleImmediate = true; - if (remoteState instanceof Accepted) { + if (remoteState != null && remoteState.getType() == DeliveryStateType.Accepted) { // this can happen in the twice ack mode, that is the receiver accepts and settles separately // acking again would show an exception but would have no negative effect but best to handle anyway. - if (delivery.isSettled()) { - return; - } - // we have to individual ack as we can't guarantee we will get the delivery updates - // (including acks) in order from dealer, a performance hit but a must - try { - sessionSPI.ack(null, brokerConsumer, message); - } catch (Exception e) { - log.warn(e.toString(), e); - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage()); + if (!delivery.isSettled()) { + // we have to individual ack as we can't guarantee we will get the delivery updates + // (including acks) in order from dealer, a performance hit but a must + try { + sessionSPI.ack(null, brokerConsumer, message); + } catch (Exception e) { + log.warn(e.toString(), e); + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage()); + } + + delivery.settle(); } - } else if (remoteState instanceof TransactionalState) { + } else { + handleExtendedDeliveryOutcomes(message, delivery, remoteState); + } + + if (!preSettle) { + protonSession.replaceTag(delivery.getTag()); + } + } finally { + sessionSPI.afterIO(connectionFlusher); + sessionSPI.resetContext(oldContext); + } + } + + private boolean handleExtendedDeliveryOutcomes(Message message, Delivery delivery, DeliveryState remoteState) throws ActiveMQAMQPException { + boolean settleImmediate = true; + boolean handled = true; + + if (remoteState == null) { + log.debug("Received null disposition for delivery update: " + remoteState); + return true; + } + + switch (remoteState.getType()) { + case Transactional: // When the message arrives with a TransactionState disposition the ack should // enlist the message into the transaction associated with the given txn ID. TransactionalState txState = (TransactionalState) remoteState; @@ -587,19 +609,22 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } } - } else if (remoteState instanceof Released) { + break; + case Released: try { sessionSPI.cancel(brokerConsumer, message, false); } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); } - } else if (remoteState instanceof Rejected) { + break; + case Rejected: try { sessionSPI.reject(brokerConsumer, message); } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); } - } else if (remoteState instanceof Modified) { + break; + case Modified: try { Modified modification = (Modified) remoteState; @@ -615,23 +640,17 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); } - } else { + break; + default: log.debug("Received null or unknown disposition for delivery update: " + remoteState); - return; - } - - if (!preSettle) { - protonSession.replaceTag(delivery.getTag()); - } - - if (settleImmediate) { - delivery.settle(); - } + handled = false; + } - } finally { - sessionSPI.afterIO(connectionFlusher); - sessionSPI.resetContext(oldContext); + if (settleImmediate) { + delivery.settle(); } + + return handled; } private final class ConnectionFlushIOCallback implements IOCallback { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/593348b9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java index d752bd7..6f363db 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java @@ -86,6 +86,11 @@ public class NettyWritable implements WritableBuffer { } @Override + public void ensureRemaining(int remaining) { + nettyBuffer.ensureWritable(remaining); + } + + @Override public int position() { return nettyBuffer.writerIndex(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/593348b9/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 87acfca..f435121 100644 --- a/pom.xml +++ b/pom.xml @@ -92,10 +92,10 @@ <mockito.version>2.8.47</mockito.version> <netty.version>4.1.28.Final</netty.version> <netty-tcnative-version>2.0.12.Final</netty-tcnative-version> - <proton.version>0.29.0</proton.version> + <proton.version>0.30.0</proton.version> <resteasy.version>3.0.19.Final</resteasy.version> <slf4j.version>1.7.21</slf4j.version> - <qpid.jms.version>0.36.0</qpid.jms.version> + <qpid.jms.version>0.37.0</qpid.jms.version> <johnzon.version>0.9.5</johnzon.version> <json-p.spec.version>1.0-alpha-1</json-p.spec.version> <javax.inject.version>1</javax.inject.version>
