ARTEMIS-1545 Support JMS 2.0 Completion Listener for Exceptions
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e4ba48a3 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e4ba48a3 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e4ba48a3 Branch: refs/heads/master Commit: e4ba48a31193ac532404d93b37f29d2720f1a863 Parents: 515a2e0 Author: Michael André Pearce <[email protected]> Authored: Thu Dec 14 07:47:30 2017 +0000 Committer: Clebert Suconic <[email protected]> Committed: Thu Sep 27 17:29:18 2018 -0400 ---------------------------------------------------------------------- .../core/client/SendAcknowledgementHandler.java | 7 + .../client/ActiveMQClientMessageBundle.java | 3 + .../artemis/core/protocol/core/Channel.java | 3 + .../protocol/core/CoreRemotingConnection.java | 5 + .../artemis/core/protocol/core/Packet.java | 8 + .../core/protocol/core/ResponseHandler.java | 30 ++++ .../core/impl/ActiveMQSessionContext.java | 78 +++++++-- .../core/protocol/core/impl/ChannelImpl.java | 58 ++++++- .../core/protocol/core/impl/PacketDecoder.java | 34 +++- .../core/protocol/core/impl/PacketImpl.java | 21 +++ .../core/protocol/core/impl/ResponseCache.java | 70 ++++++++ .../wireformat/ActiveMQExceptionMessage.java | 2 +- .../wireformat/ActiveMQExceptionMessage_V2.java | 101 +++++++++++ .../impl/wireformat/CreateAddressMessage.java | 1 + .../impl/wireformat/CreateQueueMessage.java | 1 + .../wireformat/CreateSharedQueueMessage.java | 1 + .../impl/wireformat/NullResponseMessage_V2.java | 96 ++++++++++ .../wireformat/SessionAcknowledgeMessage.java | 1 + .../SessionCreateConsumerMessage.java | 1 + .../SessionIndividualAcknowledgeMessage.java | 1 + .../SessionSendContinuationMessage.java | 31 +++- .../SessionSendContinuationMessage_V2.java | 122 +++++++++++++ .../impl/wireformat/SessionSendMessage.java | 27 ++- .../impl/wireformat/SessionSendMessage_V2.java | 104 +++++++++++ .../wireformat/SessionXAResponseMessage.java | 6 +- .../wireformat/SessionXAResponseMessage_V2.java | 102 +++++++++++ .../main/resources/activemq-version.properties | 2 +- .../jms/client/ActiveMQMessageProducer.java | 30 ++++ .../core/protocol/ServerPacketDecoder.java | 5 +- .../core/ServerSessionPacketHandler.java | 173 ++++++++++++------- pom.xml | 2 +- .../cluster/util/BackupSyncDelay.java | 6 + .../JmsProducerCompletionListenerTest.java | 19 +- .../artemis/jms/tests/SecurityTest.java | 82 ++++++++- tests/jms-tests/src/test/resources/broker.xml | 10 ++ 35 files changed, 1134 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java index c164f6c..0f47536 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java @@ -41,4 +41,11 @@ public interface SendAcknowledgementHandler { * @param message message sent asynchronously */ void sendAcknowledged(Message message); + + default void sendFailed(Message message, Exception e) { + //This is to keep old behaviour that would ack even if error, + // if anyone custom implemented this interface but doesnt update. + sendAcknowledged(message); + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java index bb88e6d..e043ac9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java @@ -228,4 +228,7 @@ public interface ActiveMQClientMessageBundle { @Message(id = 119062, value = "Multi-packet transmission (e.g. Large Messages) interrupted because of a reconnection.") ActiveMQInterruptedException packetTransmissionInterrupted(); + + @Message(id = 119063, value = "Cannot send a packet while response cache is full.") + IllegalStateException cannotSendPacketWhilstResponseCacheFull(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java index 127a69a..56f8259 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java @@ -211,6 +211,9 @@ public interface Channel { */ void setCommandConfirmationHandler(CommandConfirmationHandler handler); + void setResponseHandler(ResponseHandler handler); + + /** * flushes any confirmations on to the connection. */ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java index b6a5d93..74d9847 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java @@ -36,6 +36,11 @@ public interface CoreRemotingConnection extends RemotingConnection { return (version > 0 && version < PacketImpl.ADDRESSING_CHANGE_VERSION); } + default boolean isVersionBeforeAsyncResponseChange() { + int version = getChannelVersion(); + return (version > 0 && version < PacketImpl.ASYNC_RESPONSE_CHANGE_VERSION); + } + /** * Sets the client protocol used on the communication. This will determine if the client has * support for certain packet types http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java index 1f40314..b658090 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java @@ -41,6 +41,14 @@ public interface Packet { return INITIAL_PACKET_SIZE; } + boolean isRequiresResponse(); + + boolean isResponseAsync(); + + long getCorrelationID(); + + void setCorrelationID(long correlationID); + /** * Returns the channel id of the channel that should handle this packet. * http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java new file mode 100644 index 0000000..21e9879 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core; + +/** + * A CommandConfirmationHandler is used by the channel to confirm confirmations of packets. + */ +public interface ResponseHandler { + + /** + * called by channel after a confirmation has been received. + * + * @param packet the packet confirmed + */ + void responseHandler(Packet packet, Packet response); +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 18227cb..3c0647f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -63,6 +63,7 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler; import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.ResponseHandler; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage; @@ -99,9 +100,11 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage; @@ -168,7 +171,11 @@ public class ActiveMQSessionContext extends SessionContext { sessionChannel.setHandler(handler); if (confirmationWindow >= 0) { - sessionChannel.setCommandConfirmationHandler(confirmationHandler); + if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) { + sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler); + } else { + sessionChannel.setResponseHandler(responseHandler); + } } } @@ -185,28 +192,50 @@ public class ActiveMQSessionContext extends SessionContext { this.killed = true; } - private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler() { + private final CommandConfirmationHandler commandConfirmationHandler = new CommandConfirmationHandler() { + @Override + public void commandConfirmed(Packet packet) { + responseHandler.responseHandler(packet, null); + } + }; + + private final ResponseHandler responseHandler = new ResponseHandler() { @Override - public void commandConfirmed(final Packet packet) { + public void responseHandler(Packet packet, Packet response) { + final ActiveMQException activeMQException; + if (response != null && response.getType() == PacketImpl.EXCEPTION) { + ActiveMQExceptionMessage exceptionResponseMessage = (ActiveMQExceptionMessage) response; + activeMQException = exceptionResponseMessage.getException(); + } else { + activeMQException = null; + } + if (packet.getType() == PacketImpl.SESS_SEND) { SessionSendMessage ssm = (SessionSendMessage) packet; - callSendAck(ssm.getHandler(), ssm.getMessage()); + callSendAck(ssm.getHandler(), ssm.getMessage(), activeMQException); } else if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION) { SessionSendContinuationMessage scm = (SessionSendContinuationMessage) packet; if (!scm.isContinues()) { - callSendAck(scm.getHandler(), scm.getMessage()); + callSendAck(scm.getHandler(), scm.getMessage(), activeMQException); } } } - private void callSendAck(SendAcknowledgementHandler handler, final Message message) { + private void callSendAck(SendAcknowledgementHandler handler, final Message message, final Exception exception) { if (handler != null) { - handler.sendAcknowledged(message); + if (exception == null) { + handler.sendAcknowledged(message); + } else { + handler.sendFailed(message, exception); + } } else if (sendAckHandler != null) { - sendAckHandler.sendAcknowledged(message); + if (exception == null) { + sendAckHandler.sendAcknowledged(message); + } else { + handler.sendFailed(message, exception); + } } } - }; // Failover utility methods @@ -243,7 +272,11 @@ public class ActiveMQSessionContext extends SessionContext { @Override public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) { - sessionChannel.setCommandConfirmationHandler(confirmationHandler); + if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) { + sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler); + } else { + sessionChannel.setResponseHandler(responseHandler); + } this.sendAckHandler = handler; } @@ -472,13 +505,15 @@ public class ActiveMQSessionContext extends SessionContext { boolean sendBlocking, SendAcknowledgementHandler handler, SimpleString defaultAddress) throws ActiveMQException { - SessionSendMessage packet; + final SessionSendMessage packet; if (sessionChannel.getConnection().isVersionBeforeAddressChange()) { packet = new SessionSendMessage_1X(msgI, sendBlocking, handler); - } else { + } else if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) { packet = new SessionSendMessage(msgI, sendBlocking, handler); + } else { + boolean responseRequired = confirmationWindow != -1 || sendBlocking; + packet = new SessionSendMessage_V2(msgI, responseRequired, handler); } - if (sendBlocking) { sessionChannel.sendBlocking(packet, PacketImpl.NULL_RESPONSE); } else { @@ -904,15 +939,20 @@ public class ActiveMQSessionContext extends SessionContext { } } - private static int sendSessionSendContinuationMessage(Channel channel, + private int sendSessionSendContinuationMessage(Channel channel, Message msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException { - final boolean requiresResponse = lastChunk && sendBlocking; - final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler); + final boolean requiresResponse = lastChunk || confirmationWindow != -1; + final SessionSendContinuationMessage chunkPacket; + if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) { + chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler); + } else { + chunkPacket = new SessionSendContinuationMessage_V2(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler); + } final int expectedEncodeSize = chunkPacket.expectedEncodeSize(); //perform a weak form of flow control to avoid OOM on tight loops final CoreRemotingConnection connection = channel.getConnection(); @@ -929,7 +969,11 @@ public class ActiveMQSessionContext extends SessionContext { } if (requiresResponse) { // When sending it blocking, only the last chunk will be blocking. - channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE); + if (sendBlocking) { + channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE); + } else { + channel.send(chunkPacket); + } } else { channel.send(chunkPacket); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index 4d73cf8..9cb2a83 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler; import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.ResponseHandler; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -96,6 +97,8 @@ public final class ChannelImpl implements Channel { private final java.util.Queue<Packet> resendCache; + private final ResponseCache responseAsyncCache; + private int firstStoredCommandID; private final AtomicInteger lastConfirmedCommandID = new AtomicInteger(-1); @@ -138,8 +141,10 @@ public final class ChannelImpl implements Channel { if (confWindowSize != -1) { resendCache = new ConcurrentLinkedQueue<>(); + responseAsyncCache = new ResponseCache(); } else { resendCache = null; + responseAsyncCache = null; } this.interceptors = interceptors; @@ -211,7 +216,11 @@ public final class ChannelImpl implements Channel { lock.lock(); try { - response = new ActiveMQExceptionMessage(ActiveMQClientMessageBundle.BUNDLE.unblockingACall(cause)); + ActiveMQException activeMQException = ActiveMQClientMessageBundle.BUNDLE.unblockingACall(cause); + if (responseAsyncCache != null) { + responseAsyncCache.errorAll(activeMQException); + } + response = new ActiveMQExceptionMessage(activeMQException); sendCondition.signal(); } finally { @@ -270,6 +279,10 @@ public final class ChannelImpl implements Channel { synchronized (sendLock) { packet.setChannelID(id); + if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { + packet.setCorrelationID(responseAsyncCache.nextCorrelationID()); + } + if (logger.isTraceEnabled()) { logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id); } @@ -291,6 +304,7 @@ public final class ChannelImpl implements Channel { if (resendCache != null && packet.isRequiresConfirmations()) { addResendPacket(packet); } + } finally { lock.unlock(); } @@ -301,9 +315,30 @@ public final class ChannelImpl implements Channel { checkReconnectID(reconnectID); + //We do this outside the lock as ResponseCache is threadsafe and allows responses to come in, + //As the send could block if the response cache is cannot add, preventing responses to be handled. + if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { + while (!responseAsyncCache.add(packet)) { + try { + Thread.sleep(1); + } catch (Exception e) { + // Ignore + } + } + } + // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp // buffer is full, preventing any incoming buffers being handled and blocking failover - connection.getTransportConnection().write(buffer, flush, batch); + try { + connection.getTransportConnection().write(buffer, flush, batch); + } catch (Throwable t) { + //If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full. + //The client would get still know about this as the exception bubbles up the call stack instead. + if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { + responseAsyncCache.remove(packet.getCorrelationID()); + } + throw t; + } return true; } } @@ -478,6 +513,18 @@ public final class ChannelImpl implements Channel { } @Override + public void setResponseHandler(final ResponseHandler responseHandler) { + if (confWindowSize < 0) { + final String msg = "You can't set responseHandler on a connection with confirmation-window-size < 0." + " Look at the documentation for more information."; + if (logger.isTraceEnabled()) { + logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " " + msg); + } + throw new IllegalStateException(msg); + } + responseAsyncCache.setResponseHandler(responseHandler); + } + + @Override public void setHandler(final ChannelHandler handler) { if (logger.isTraceEnabled()) { logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Setting handler on " + this + " as " + handler); @@ -595,6 +642,12 @@ public final class ChannelImpl implements Channel { } } + public void handleResponse(Packet packet) { + if (responseAsyncCache != null && packet.isResponseAsync()) { + responseAsyncCache.handleResponse(packet); + } + } + @Override public void confirm(final Packet packet) { if (resendCache != null && packet.isRequiresConfirmations()) { @@ -647,6 +700,7 @@ public final class ChannelImpl implements Channel { if (packet.isResponse()) { confirm(packet); + handleResponse(packet); lock.lock(); try { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java index 5e46848..9a8166e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java @@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Disconnect import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Ping; @@ -71,6 +72,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQue import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage; @@ -81,6 +83,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAG import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAJoinMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAPrepareMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResumeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXARollbackMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage; @@ -88,6 +91,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAS import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CHECK_FOR_FAILOVER; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY; @@ -184,13 +188,25 @@ public abstract class PacketDecoder implements Serializable { break; } case EXCEPTION: { - packet = new ActiveMQExceptionMessage(); + if (connection.isVersionBeforeAsyncResponseChange()) { + packet = new ActiveMQExceptionMessage(); + } else { + packet = new ActiveMQExceptionMessage_V2(); + } break; } case PACKETS_CONFIRMED: { packet = new PacketsConfirmedMessage(); break; } + case NULL_RESPONSE: { + if (connection.isVersionBeforeAsyncResponseChange()) { + packet = new NullResponseMessage(); + } else { + packet = new NullResponseMessage_V2(); + } + break; + } case CREATESESSION: { packet = new CreateSessionMessage(); break; @@ -316,7 +332,11 @@ public abstract class PacketDecoder implements Serializable { break; } case SESS_XA_RESP: { - packet = new SessionXAResponseMessage(); + if (connection.isVersionBeforeAsyncResponseChange()) { + packet = new SessionXAResponseMessage(); + } else { + packet = new SessionXAResponseMessage_V2(); + } break; } case SESS_XA_ROLLBACK: { @@ -383,16 +403,16 @@ public abstract class PacketDecoder implements Serializable { packet = new SessionIndividualAcknowledgeMessage(); break; } - case NULL_RESPONSE: { - packet = new NullResponseMessage(); - break; - } case SESS_RECEIVE_CONTINUATION: { packet = new SessionReceiveContinuationMessage(); break; } case SESS_SEND_CONTINUATION: { - packet = new SessionSendContinuationMessage(); + if (connection.isVersionBeforeAsyncResponseChange()) { + packet = new SessionSendContinuationMessage(); + } else { + packet = new SessionSendContinuationMessage_V2(); + } break; } case SESS_PRODUCER_REQUEST_CREDITS: { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index 87ba0c3..470e3ae 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -32,6 +32,7 @@ public class PacketImpl implements Packet { // 2.0.0 public static final int ADDRESSING_CHANGE_VERSION = 129; public static final int SHARED_QUEUE_SECURITY_FIX_CHANGE_VERSION = 130; + public static final int ASYNC_RESPONSE_CHANGE_VERSION = 130; public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue."); @@ -272,6 +273,7 @@ public class PacketImpl implements Packet { public static final byte SESS_BINDINGQUERY_RESP_V4 = -15; + // Static -------------------------------------------------------- public PacketImpl(final byte type) { @@ -439,5 +441,24 @@ public class PacketImpl implements Packet { return DataConstants.SIZE_BOOLEAN + (str != null ? stringEncodeSize(str) : 0); } + @Override + public boolean isRequiresResponse() { + return false; + } + + @Override + public boolean isResponseAsync() { + return false; + } + + @Override + public long getCorrelationID() { + return -1; + } + + @Override + public void setCorrelationID(long correlationID) { + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java new file mode 100644 index 0000000..f9e8538 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core.impl; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.ResponseHandler; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2; +import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap; +import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet; + +public class ResponseCache { + + private final AtomicLong sequence = new AtomicLong(0); + + private final ConcurrentLongHashMap<Packet> store; + private ResponseHandler responseHandler; + + public ResponseCache() { + this.store = new ConcurrentLongHashMap<>(); + } + + public long nextCorrelationID() { + return sequence.incrementAndGet(); + } + + public boolean add(Packet packet) { + this.store.put(packet.getCorrelationID(), packet); + return true; + } + + public Packet remove(long correlationID) { + return store.remove(correlationID); + } + + public void handleResponse(Packet response) { + long correlationID = response.getCorrelationID(); + Packet packet = remove(correlationID); + if (packet != null) { + responseHandler.responseHandler(packet, response); + } + } + + public void errorAll(ActiveMQException exception) { + ConcurrentLongHashSet keys = store.keysLongHashSet(); + keys.forEach(correlationID -> { + handleResponse(new ActiveMQExceptionMessage_V2(correlationID, exception)); + }); + } + + public void setResponseHandler(ResponseHandler responseHandler) { + this.responseHandler = responseHandler; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java index da34d2e..51637f3 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java @@ -23,7 +23,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; public class ActiveMQExceptionMessage extends PacketImpl { - private ActiveMQException exception; + protected ActiveMQException exception; // Static -------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage_V2.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage_V2.java new file mode 100644 index 0000000..661a040 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage_V2.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.utils.DataConstants; + +public class ActiveMQExceptionMessage_V2 extends ActiveMQExceptionMessage { + + private long correlationID; + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + public ActiveMQExceptionMessage_V2(final long correlationID, final ActiveMQException exception) { + super(exception); + this.correlationID = correlationID; + } + + public ActiveMQExceptionMessage_V2() { + super(); + } + + // Public -------------------------------------------------------- + + @Override + public boolean isResponse() { + return true; + } + + @Override + public void encodeRest(final ActiveMQBuffer buffer) { + super.encodeRest(buffer); + buffer.writeLong(correlationID); + } + + @Override + public void decodeRest(final ActiveMQBuffer buffer) { + super.decodeRest(buffer); + if (buffer.readableBytes() >= DataConstants.SIZE_LONG) { + correlationID = buffer.readLong(); + } + } + + @Override + public final boolean isResponseAsync() { + return true; + } + + @Override + public long getCorrelationID() { + return this.correlationID; + } + + @Override + public String toString() { + return getParentString() + ", exception= " + exception + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + (int) (correlationID ^ (correlationID >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!super.equals(obj)) { + return false; + } + if (!(obj instanceof ActiveMQExceptionMessage_V2)) { + return false; + } + ActiveMQExceptionMessage_V2 other = (ActiveMQExceptionMessage_V2) obj; + if (correlationID != other.correlationID) { + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java index a98f888..8c84a9b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java @@ -65,6 +65,7 @@ public class CreateAddressMessage extends PacketImpl { return address; } + @Override public boolean isRequiresResponse() { return requiresResponse; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java index 2ebf147..985d5f4 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java @@ -100,6 +100,7 @@ public class CreateQueueMessage extends PacketImpl { return temporary; } + @Override public boolean isRequiresResponse() { return requiresResponse; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java index af25ae9..3c072e0 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java @@ -80,6 +80,7 @@ public class CreateSharedQueueMessage extends PacketImpl { return filterString; } + @Override public boolean isRequiresResponse() { return requiresResponse; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java new file mode 100644 index 0000000..e3453af --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.utils.DataConstants; + +public class NullResponseMessage_V2 extends NullResponseMessage { + + private long correlationID; + + public NullResponseMessage_V2(final long correlationID) { + super(); + this.correlationID = correlationID; + } + + public NullResponseMessage_V2() { + super(); + } + + // Public -------------------------------------------------------- + + @Override + public long getCorrelationID() { + return correlationID; + } + + @Override + public void encodeRest(final ActiveMQBuffer buffer) { + super.encodeRest(buffer); + buffer.writeLong(correlationID); + } + + @Override + public void decodeRest(final ActiveMQBuffer buffer) { + super.decodeRest(buffer); + if (buffer.readableBytes() >= DataConstants.SIZE_LONG) { + correlationID = buffer.readLong(); + } + } + + @Override + public final boolean isResponse() { + return true; + } + + @Override + public final boolean isResponseAsync() { + return true; + } + + @Override + public String toString() { + return getParentString() + ", correlationID=" + correlationID + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + (int) (correlationID ^ (correlationID >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!super.equals(obj)) { + return false; + } + if (!(obj instanceof NullResponseMessage_V2)) { + return false; + } + NullResponseMessage_V2 other = (NullResponseMessage_V2) obj; + if (correlationID != other.correlationID) { + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java index 542c34c..67d9f67 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java @@ -51,6 +51,7 @@ public class SessionAcknowledgeMessage extends PacketImpl { return messageID; } + @Override public boolean isRequiresResponse() { return requiresResponse; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java index f09beeb..e07b50c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java @@ -71,6 +71,7 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket { return browseOnly; } + @Override public boolean isRequiresResponse() { return requiresResponse; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java index 7d06081..3164c23 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java @@ -60,6 +60,7 @@ public class SessionIndividualAcknowledgeMessage extends PacketImpl { return messageID; } + @Override public boolean isRequiresResponse() { return requiresResponse; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java index 26eedd7..4105b11 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java @@ -26,10 +26,10 @@ import org.apache.activemq.artemis.utils.DataConstants; */ public class SessionSendContinuationMessage extends SessionContinuationMessage { - private boolean requiresResponse; + protected boolean requiresResponse; // Used on confirmation handling - private Message message; + protected Message message; /** * In case, we are using a different handler than the one set on the {@link org.apache.activemq.artemis.api.core.client.ClientSession} * <br> @@ -43,7 +43,7 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage { /** * to be sent on the last package */ - private long messageBodySize = -1; + protected long messageBodySize = -1; // Static -------------------------------------------------------- @@ -54,6 +54,11 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage { handler = null; } + protected SessionSendContinuationMessage(byte type) { + super(type); + handler = null; + } + /** * @param body * @param continues @@ -72,11 +77,31 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage { this.messageBodySize = messageBodySize; } + /** + * @param body + * @param continues + * @param requiresResponse + */ + protected SessionSendContinuationMessage(final byte type, + final Message message, + final byte[] body, + final boolean continues, + final boolean requiresResponse, + final long messageBodySize, + SendAcknowledgementHandler handler) { + super(type, body, continues); + this.requiresResponse = requiresResponse; + this.message = message; + this.handler = handler; + this.messageBodySize = messageBodySize; + } + // Public -------------------------------------------------------- /** * @return the requiresResponse */ + @Override public boolean isRequiresResponse() { return requiresResponse; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java new file mode 100644 index 0000000..2a3071c --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; +import org.apache.activemq.artemis.utils.DataConstants; + +/** + * A SessionSendContinuationMessage<br> + */ +public class SessionSendContinuationMessage_V2 extends SessionSendContinuationMessage { + + private long correlationID; + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + public SessionSendContinuationMessage_V2() { + super(); + } + + /** + * @param body + * @param continues + * @param requiresResponse + */ + public SessionSendContinuationMessage_V2(final Message message, + final byte[] body, + final boolean continues, + final boolean requiresResponse, + final long messageBodySize, + SendAcknowledgementHandler handler) { + super(message, body, continues, requiresResponse, messageBodySize, handler); + } + + // Public -------------------------------------------------------- + + @Override + public int expectedEncodeSize() { + return super.expectedEncodeSize() + DataConstants.SIZE_LONG; + } + + @Override + public void encodeRest(final ActiveMQBuffer buffer) { + super.encodeRest(buffer); + buffer.writeLong(correlationID); + } + + @Override + public void decodeRest(final ActiveMQBuffer buffer) { + super.decodeRest(buffer); + if (buffer.readableBytes() >= DataConstants.SIZE_LONG) { + correlationID = buffer.readLong(); + } + } + + @Override + public long getCorrelationID() { + return this.correlationID; + } + + @Override + public void setCorrelationID(long correlationID) { + this.correlationID = correlationID; + } + + @Override + public boolean isResponseAsync() { + return true; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + (int) (correlationID ^ (correlationID >>> 32)); + return result; + } + + @Override + public String toString() { + StringBuffer buff = new StringBuffer(getParentString()); + buff.append(", continues=" + continues); + buff.append(", message=" + message); + buff.append(", messageBodySize=" + messageBodySize); + buff.append(", requiresResponse=" + requiresResponse); + buff.append(", correlationID=" + correlationID); + buff.append("]"); + return buff.toString(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (!super.equals(obj)) + return false; + if (!(obj instanceof SessionSendContinuationMessage_V2)) + return false; + SessionSendContinuationMessage_V2 other = (SessionSendContinuationMessage_V2) obj; + if (correlationID != other.correlationID) + return false; + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java index b56ae30..e8dbdc1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java @@ -21,6 +21,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.utils.DataConstants; public class SessionSendMessage extends MessagePacket { @@ -37,6 +38,22 @@ public class SessionSendMessage extends MessagePacket { private final transient SendAcknowledgementHandler handler; /** This will be using the CoreMessage because it is meant for the core-protocol */ + protected SessionSendMessage(final byte id, + final ICoreMessage message, + final boolean requiresResponse, + final SendAcknowledgementHandler handler) { + super(id, message); + this.handler = handler; + this.requiresResponse = requiresResponse; + } + + protected SessionSendMessage(final byte id, + final CoreMessage message) { + super(id, message); + this.handler = null; + } + + /** This will be using the CoreMessage because it is meant for the core-protocol */ public SessionSendMessage(final ICoreMessage message, final boolean requiresResponse, final SendAcknowledgementHandler handler) { @@ -52,6 +69,7 @@ public class SessionSendMessage extends MessagePacket { // Public -------------------------------------------------------- + @Override public boolean isRequiresResponse() { return requiresResponse; } @@ -62,7 +80,7 @@ public class SessionSendMessage extends MessagePacket { @Override public int expectedEncodeSize() { - return message.getEncodeSize() + PACKET_HEADERS_SIZE + 1; + return message.getEncodeSize() + PACKET_HEADERS_SIZE + fieldsEncodeSize(); } @Override @@ -75,13 +93,16 @@ public class SessionSendMessage extends MessagePacket { public void decodeRest(final ActiveMQBuffer buffer) { // Buffer comes in after having read standard headers and positioned at Beginning of body part - ByteBuf messageBuffer = copyMessageBuffer(buffer.byteBuf(), 1); + ByteBuf messageBuffer = copyMessageBuffer(buffer.byteBuf(), fieldsEncodeSize()); receiveMessage(messageBuffer); - buffer.readerIndex(buffer.capacity() - 1); + buffer.readerIndex(buffer.capacity() - fieldsEncodeSize()); requiresResponse = buffer.readBoolean(); + } + protected int fieldsEncodeSize() { + return DataConstants.SIZE_BOOLEAN; } protected void receiveMessage(ByteBuf messageBuffer) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_V2.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_V2.java new file mode 100644 index 0000000..63c9a34 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_V2.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.utils.DataConstants; + +public class SessionSendMessage_V2 extends SessionSendMessage { + + private long correlationID; + + /** This will be using the CoreMessage because it is meant for the core-protocol */ + public SessionSendMessage_V2(final ICoreMessage message, + final boolean requiresResponse, + final SendAcknowledgementHandler handler) { + super(SESS_SEND, message, requiresResponse, handler); + } + + public SessionSendMessage_V2(final CoreMessage message) { + super(SESS_SEND, message); + } + + @Override + public void encodeRest(ActiveMQBuffer buffer) { + super.encodeRest(buffer); + buffer.writeLong(correlationID); + } + + @Override + public void decodeRest(final ActiveMQBuffer buffer) { + super.decodeRest(buffer); + correlationID = buffer.readLong(); + } + + @Override + protected int fieldsEncodeSize() { + return super.fieldsEncodeSize() + DataConstants.SIZE_LONG; + } + + @Override + public long getCorrelationID() { + return this.correlationID; + } + + @Override + public void setCorrelationID(long correlationID) { + this.correlationID = correlationID; + } + + @Override + public boolean isResponseAsync() { + return true; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + (int) (correlationID ^ (correlationID >>> 32)); + return result; + } + + + @Override + public String toString() { + StringBuffer buff = new StringBuffer(getParentString()); + buff.append(", correlationID=" + correlationID); + buff.append(", requiresResponse=" + super.isRequiresResponse()); + buff.append("]"); + return buff.toString(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (!super.equals(obj)) + return false; + if (!(obj instanceof SessionSendMessage_V2)) + return false; + SessionSendMessage_V2 other = (SessionSendMessage_V2) obj; + if (correlationID != other.correlationID) + return false; + return true; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java index 086b851..f88e0c8 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java @@ -21,11 +21,11 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; public class SessionXAResponseMessage extends PacketImpl { - private boolean error; + protected boolean error; - private int responseCode; + protected int responseCode; - private String message; + protected String message; public SessionXAResponseMessage(final boolean isError, final int responseCode, final String message) { super(SESS_XA_RESP); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java new file mode 100644 index 0000000..4e949bd --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.utils.DataConstants; + +public class SessionXAResponseMessage_V2 extends SessionXAResponseMessage { + + private long correlationID; + + public SessionXAResponseMessage_V2(final long correlationID, final boolean isError, final int responseCode, final String message) { + super(isError, responseCode, message); + this.correlationID = correlationID; + } + + public SessionXAResponseMessage_V2() { + super(); + } + + // Public -------------------------------------------------------- + + @Override + public long getCorrelationID() { + return correlationID; + } + + @Override + public void encodeRest(final ActiveMQBuffer buffer) { + super.encodeRest(buffer); + buffer.writeLong(correlationID); + } + + @Override + public void decodeRest(final ActiveMQBuffer buffer) { + super.decodeRest(buffer); + if (buffer.readableBytes() >= DataConstants.SIZE_LONG) { + correlationID = buffer.readLong(); + } + } + + @Override + public final boolean isResponse() { + return true; + } + + @Override + public final boolean isResponseAsync() { + return true; + } + + @Override + public String toString() { + StringBuffer buff = new StringBuffer(getParentString()); + buff.append(", error=" + error); + buff.append(", message=" + message); + buff.append(", responseCode=" + responseCode); + buff.append(", correlationID=" + correlationID); + buff.append("]"); + return buff.toString(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + (int) (correlationID ^ (correlationID >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!super.equals(obj)) { + return false; + } + if (!(obj instanceof SessionXAResponseMessage_V2)) { + return false; + } + SessionXAResponseMessage_V2 other = (SessionXAResponseMessage_V2) obj; + if (correlationID != other.correlationID) { + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/resources/activemq-version.properties ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/resources/activemq-version.properties b/artemis-core-client/src/main/resources/activemq-version.properties index a39b422..ff65ff9 100644 --- a/artemis-core-client/src/main/resources/activemq-version.properties +++ b/artemis-core-client/src/main/resources/activemq-version.properties @@ -20,4 +20,4 @@ activemq.version.minorVersion=${activemq.version.minorVersion} activemq.version.microVersion=${activemq.version.microVersion} activemq.version.incrementingVersion=${activemq.version.incrementingVersion} activemq.version.versionTag=${activemq.version.versionTag} -activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129 +activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129,130 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java index ae1d270..fc15d5e 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java @@ -601,6 +601,36 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To } @Override + public void sendFailed(org.apache.activemq.artemis.api.core.Message clientMessage, Exception exception) { + if (jmsMessage instanceof StreamMessage) { + try { + ((StreamMessage) jmsMessage).reset(); + } catch (JMSException e) { + // HORNETQ-1209 XXX ignore? + } + } + if (jmsMessage instanceof BytesMessage) { + try { + ((BytesMessage) jmsMessage).reset(); + } catch (JMSException e) { + // HORNETQ-1209 XXX ignore? + } + } + + try { + producer.connection.getThreadAwareContext().setCurrentThread(true); + if (exception instanceof ActiveMQException) { + exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQException)exception); + } else if (exception instanceof ActiveMQInterruptedException) { + exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQInterruptedException) exception); + } + completionListener.onException(jmsMessage, exception); + } finally { + producer.connection.getThreadAwareContext().clearCurrentThread(true); + } + } + + @Override public String toString() { return CompletionListenerWrapper.class.getSimpleName() + "( completionListener=" + completionListener + ")"; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java index d38f45f..0428abe 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java @@ -53,6 +53,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReq import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V2; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE; @@ -90,8 +91,10 @@ public class ServerPacketDecoder extends ClientPacketDecoder { if (connection.isVersionBeforeAddressChange()) { sendMessage = new SessionSendMessage_1X(new CoreMessage(this.coreMessageObjectPools)); - } else { + } else if (connection.isVersionBeforeAsyncResponseChange()) { sendMessage = new SessionSendMessage(new CoreMessage(this.coreMessageObjectPools)); + } else { + sendMessage = new SessionSendMessage_V2(new CoreMessage(this.coreMessageObjectPools)); } sendMessage.decode(in);
