Reverting JMS Completion listener on 2.6.x.. This change is too big for maintainance branch. Reverting it.
Revert "ARTEMIS-1545 refactor & rework a few incompatible pieces" Revert "ARTEMIS-1545 Support JMS 2.0 Completion Listener for Exceptions" This reverts commit c9d8697a6cf5e4620970da878fc5ab4f8d9d148f. This reverts commit f4734868a5a07dfc6db533a96f9f8e01de5139c5. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2242d244 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2242d244 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2242d244 Branch: refs/heads/2.6.x Commit: 2242d2447cdaaf3f7ad59f1b9c342035e9b8426e Parents: da7fb89 Author: Clebert Suconic <[email protected]> Authored: Fri Sep 28 09:15:16 2018 -0400 Committer: Clebert Suconic <[email protected]> Committed: Fri Sep 28 09:16:58 2018 -0400 ---------------------------------------------------------------------- .../core/client/SendAcknowledgementHandler.java | 9 - .../client/ActiveMQClientMessageBundle.java | 3 - .../artemis/core/protocol/core/Channel.java | 3 - .../protocol/core/CoreRemotingConnection.java | 5 - .../artemis/core/protocol/core/Packet.java | 8 - .../core/protocol/core/ResponseHandler.java | 30 -- .../core/impl/ActiveMQSessionContext.java | 71 +-- .../core/protocol/core/impl/ChannelImpl.java | 67 +-- .../core/protocol/core/impl/PacketDecoder.java | 34 +- .../core/protocol/core/impl/PacketImpl.java | 26 +- .../core/protocol/core/impl/ResponseCache.java | 74 --- .../wireformat/ActiveMQExceptionMessage.java | 2 +- .../wireformat/ActiveMQExceptionMessage_V2.java | 101 ---- .../impl/wireformat/CreateAddressMessage.java | 1 - .../impl/wireformat/CreateQueueMessage.java | 1 - .../wireformat/CreateSharedQueueMessage.java | 1 - .../impl/wireformat/NullResponseMessage_V2.java | 96 ---- .../wireformat/SessionAcknowledgeMessage.java | 1 - .../SessionCreateConsumerMessage.java | 1 - .../SessionIndividualAcknowledgeMessage.java | 1 - .../SessionSendContinuationMessage.java | 31 +- .../SessionSendContinuationMessage_V2.java | 122 ----- .../impl/wireformat/SessionSendMessage.java | 27 +- .../impl/wireformat/SessionSendMessage_V2.java | 104 ---- .../wireformat/SessionXAResponseMessage.java | 6 +- .../wireformat/SessionXAResponseMessage_V2.java | 102 ---- .../main/resources/activemq-version.properties | 2 +- .../protocol/core/impl/ChannelImplTest.java | 512 ------------------- .../jms/client/ActiveMQMessageProducer.java | 76 +-- .../core/protocol/ServerPacketDecoder.java | 5 +- .../core/ServerSessionPacketHandler.java | 176 +++---- pom.xml | 2 +- .../cluster/util/BackupSyncDelay.java | 6 - .../JmsProducerCompletionListenerTest.java | 19 +- .../artemis/jms/tests/SecurityTest.java | 189 +------ tests/jms-tests/src/test/resources/broker.xml | 10 - 36 files changed, 126 insertions(+), 1798 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java index ad45a5f..c164f6c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java @@ -41,13 +41,4 @@ public interface SendAcknowledgementHandler { * @param message message sent asynchronously */ void sendAcknowledged(Message message); - - default void sendFailed(Message message, Exception e) { - /** - * By default ignore failures to preserve compatibility with existing implementations. - * If the message makes it to the broker and a failure occurs sendAcknowledge() will - * still be invoked just like it always was. - */ - } - } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java index e043ac9..bb88e6d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java @@ -228,7 +228,4 @@ public interface ActiveMQClientMessageBundle { @Message(id = 119062, value = "Multi-packet transmission (e.g. Large Messages) interrupted because of a reconnection.") ActiveMQInterruptedException packetTransmissionInterrupted(); - - @Message(id = 119063, value = "Cannot send a packet while response cache is full.") - IllegalStateException cannotSendPacketWhilstResponseCacheFull(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java index 56f8259..127a69a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java @@ -211,9 +211,6 @@ public interface Channel { */ void setCommandConfirmationHandler(CommandConfirmationHandler handler); - void setResponseHandler(ResponseHandler handler); - - /** * flushes any confirmations on to the connection. */ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java index 74d9847..b6a5d93 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java @@ -36,11 +36,6 @@ public interface CoreRemotingConnection extends RemotingConnection { return (version > 0 && version < PacketImpl.ADDRESSING_CHANGE_VERSION); } - default boolean isVersionBeforeAsyncResponseChange() { - int version = getChannelVersion(); - return (version > 0 && version < PacketImpl.ASYNC_RESPONSE_CHANGE_VERSION); - } - /** * Sets the client protocol used on the communication. This will determine if the client has * support for certain packet types http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java index b658090..1f40314 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java @@ -41,14 +41,6 @@ public interface Packet { return INITIAL_PACKET_SIZE; } - boolean isRequiresResponse(); - - boolean isResponseAsync(); - - long getCorrelationID(); - - void setCorrelationID(long correlationID); - /** * Returns the channel id of the channel that should handle this packet. * http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java deleted file mode 100644 index f96ef13..0000000 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.core; - -/** - * A ResponseHandler is used by the channel to handle async responses. - */ -public interface ResponseHandler { - - /** - * called by channel after an async response has been received. - * - * @param packet the packet confirmed - */ - void handleResponse(Packet packet, Packet response); -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 658bfcf..cbbfcab 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -55,7 +55,6 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler; import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.Packet; -import org.apache.activemq.artemis.core.protocol.core.ResponseHandler; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage; @@ -91,11 +90,9 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage; @@ -168,7 +165,7 @@ public class ActiveMQSessionContext extends SessionContext { sessionChannel.setHandler(handler); if (confirmationWindow >= 0) { - setHandlers(); + sessionChannel.setCommandConfirmationHandler(confirmationHandler); } } @@ -185,58 +182,28 @@ public class ActiveMQSessionContext extends SessionContext { this.killed = true; } - private void setHandlers() { - sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler); - - if (!sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) { - sessionChannel.setResponseHandler(responseHandler); - } - } - - private final CommandConfirmationHandler commandConfirmationHandler = new CommandConfirmationHandler() { + private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler() { @Override - public void commandConfirmed(Packet packet) { - responseHandler.handleResponse(packet, null); - } - }; - - private final ResponseHandler responseHandler = new ResponseHandler() { - @Override - public void handleResponse(Packet packet, Packet response) { - final ActiveMQException activeMQException; - if (response != null && response.getType() == PacketImpl.EXCEPTION) { - ActiveMQExceptionMessage exceptionResponseMessage = (ActiveMQExceptionMessage) response; - activeMQException = exceptionResponseMessage.getException(); - } else { - activeMQException = null; - } - + public void commandConfirmed(final Packet packet) { if (packet.getType() == PacketImpl.SESS_SEND) { SessionSendMessage ssm = (SessionSendMessage) packet; - callSendAck(ssm.getHandler(), ssm.getMessage(), activeMQException); + callSendAck(ssm.getHandler(), ssm.getMessage()); } else if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION) { SessionSendContinuationMessage scm = (SessionSendContinuationMessage) packet; if (!scm.isContinues()) { - callSendAck(scm.getHandler(), scm.getMessage(), activeMQException); + callSendAck(scm.getHandler(), scm.getMessage()); } } } - private void callSendAck(SendAcknowledgementHandler handler, final Message message, final Exception exception) { + private void callSendAck(SendAcknowledgementHandler handler, final Message message) { if (handler != null) { - if (exception == null) { - handler.sendAcknowledged(message); - } else { - handler.sendFailed(message, exception); - } + handler.sendAcknowledged(message); } else if (sendAckHandler != null) { - if (exception == null) { - sendAckHandler.sendAcknowledged(message); - } else { - sendAckHandler.sendFailed(message, exception); - } + sendAckHandler.sendAcknowledged(message); } } + }; // Failover utility methods @@ -273,8 +240,7 @@ public class ActiveMQSessionContext extends SessionContext { @Override public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) { - setHandlers(); - + sessionChannel.setCommandConfirmationHandler(confirmationHandler); this.sendAckHandler = handler; } @@ -502,15 +468,13 @@ public class ActiveMQSessionContext extends SessionContext { boolean sendBlocking, SendAcknowledgementHandler handler, SimpleString defaultAddress) throws ActiveMQException { - final SessionSendMessage packet; + SessionSendMessage packet; if (sessionChannel.getConnection().isVersionBeforeAddressChange()) { packet = new SessionSendMessage_1X(msgI, sendBlocking, handler); - } else if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) { - packet = new SessionSendMessage(msgI, sendBlocking, handler); } else { - boolean responseRequired = confirmationWindow != -1 || sendBlocking; - packet = new SessionSendMessage_V2(msgI, responseRequired, handler); + packet = new SessionSendMessage(msgI, sendBlocking, handler); } + if (sendBlocking) { sessionChannel.sendBlocking(packet, PacketImpl.NULL_RESPONSE); } else { @@ -926,7 +890,7 @@ public class ActiveMQSessionContext extends SessionContext { } } - private int sendSessionSendContinuationMessage(Channel channel, + private static int sendSessionSendContinuationMessage(Channel channel, Message msgI, long messageBodySize, boolean sendBlocking, @@ -934,12 +898,7 @@ public class ActiveMQSessionContext extends SessionContext { byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException { final boolean requiresResponse = lastChunk && sendBlocking; - final SessionSendContinuationMessage chunkPacket; - if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) { - chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler); - } else { - chunkPacket = new SessionSendContinuationMessage_V2(msgI, chunk, !lastChunk, requiresResponse || confirmationWindow != -1, messageBodySize, messageHandler); - } + final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler); final int expectedEncodeSize = chunkPacket.expectedEncodeSize(); //perform a weak form of flow control to avoid OOM on tight loops final CoreRemotingConnection connection = channel.getConnection(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index 61268d6..4d73cf8 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -36,7 +36,6 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler; import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.Packet; -import org.apache.activemq.artemis.core.protocol.core.ResponseHandler; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -97,8 +96,6 @@ public final class ChannelImpl implements Channel { private final java.util.Queue<Packet> resendCache; - private final ResponseCache responseAsyncCache; - private int firstStoredCommandID; private final AtomicInteger lastConfirmedCommandID = new AtomicInteger(-1); @@ -141,10 +138,8 @@ public final class ChannelImpl implements Channel { if (confWindowSize != -1) { resendCache = new ConcurrentLinkedQueue<>(); - responseAsyncCache = new ResponseCache(); } else { resendCache = null; - responseAsyncCache = null; } this.interceptors = interceptors; @@ -216,11 +211,7 @@ public final class ChannelImpl implements Channel { lock.lock(); try { - ActiveMQException activeMQException = ActiveMQClientMessageBundle.BUNDLE.unblockingACall(cause); - if (responseAsyncCache != null) { - responseAsyncCache.errorAll(activeMQException); - } - response = new ActiveMQExceptionMessage(activeMQException); + response = new ActiveMQExceptionMessage(ActiveMQClientMessageBundle.BUNDLE.unblockingACall(cause)); sendCondition.signal(); } finally { @@ -253,10 +244,6 @@ public final class ChannelImpl implements Channel { this.transferring = transferring; } - protected ResponseCache getCache() { - return responseAsyncCache; - } - /** * @param timeoutMsg message to log on blocking call failover timeout */ @@ -283,10 +270,6 @@ public final class ChannelImpl implements Channel { synchronized (sendLock) { packet.setChannelID(id); - if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { - packet.setCorrelationID(responseAsyncCache.nextCorrelationID()); - } - if (logger.isTraceEnabled()) { logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id); } @@ -308,7 +291,6 @@ public final class ChannelImpl implements Channel { if (resendCache != null && packet.isRequiresConfirmations()) { addResendPacket(packet); } - } finally { lock.unlock(); } @@ -319,30 +301,9 @@ public final class ChannelImpl implements Channel { checkReconnectID(reconnectID); - //We do this outside the lock as ResponseCache is threadsafe and allows responses to come in, - //As the send could block if the response cache cannot add, preventing responses to be handled. - if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { - while (!responseAsyncCache.add(packet)) { - try { - Thread.sleep(1); - } catch (Exception e) { - // Ignore - } - } - } - // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp // buffer is full, preventing any incoming buffers being handled and blocking failover - try { - connection.getTransportConnection().write(buffer, flush, batch); - } catch (Throwable t) { - //If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full. - //The client would get still know about this as the exception bubbles up the call stack instead. - if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { - responseAsyncCache.remove(packet.getCorrelationID()); - } - throw t; - } + connection.getTransportConnection().write(buffer, flush, batch); return true; } } @@ -430,7 +391,7 @@ public final class ChannelImpl implements Channel { throw new ActiveMQInterruptedException(e); } - if (response != null && response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket && !response.isResponseAsync()) { + if (response != null && response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket) { ActiveMQClientLogger.LOGGER.packetOutOfOrder(response, new Exception("trace")); } @@ -517,18 +478,6 @@ public final class ChannelImpl implements Channel { } @Override - public void setResponseHandler(final ResponseHandler responseHandler) { - if (confWindowSize < 0) { - final String msg = "You can't set responseHandler on a connection with confirmation-window-size < 0." + " Look at the documentation for more information."; - if (logger.isTraceEnabled()) { - logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " " + msg); - } - throw new IllegalStateException(msg); - } - responseAsyncCache.setResponseHandler(responseHandler); - } - - @Override public void setHandler(final ChannelHandler handler) { if (logger.isTraceEnabled()) { logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Setting handler on " + this + " as " + handler); @@ -646,12 +595,6 @@ public final class ChannelImpl implements Channel { } } - public void handleAsyncResponse(Packet packet) { - if (responseAsyncCache != null && packet.isResponseAsync()) { - responseAsyncCache.handleResponse(packet); - } - } - @Override public void confirm(final Packet packet) { if (resendCache != null && packet.isRequiresConfirmations()) { @@ -704,7 +647,6 @@ public final class ChannelImpl implements Channel { if (packet.isResponse()) { confirm(packet); - handleAsyncResponse(packet); lock.lock(); try { @@ -756,9 +698,6 @@ public final class ChannelImpl implements Channel { if (commandConfirmationHandler != null) { commandConfirmationHandler.commandConfirmed(packet); } - if (responseAsyncCache != null) { - responseAsyncCache.handleResponse(packet); - } } firstStoredCommandID += numberToClear; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java index 9a8166e..5e46848 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java @@ -39,7 +39,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Disconnect import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Ping; @@ -72,7 +71,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQue import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage; @@ -83,7 +81,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAG import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAJoinMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAPrepareMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResumeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXARollbackMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage; @@ -91,7 +88,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAS import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CHECK_FOR_FAILOVER; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY; @@ -188,25 +184,13 @@ public abstract class PacketDecoder implements Serializable { break; } case EXCEPTION: { - if (connection.isVersionBeforeAsyncResponseChange()) { - packet = new ActiveMQExceptionMessage(); - } else { - packet = new ActiveMQExceptionMessage_V2(); - } + packet = new ActiveMQExceptionMessage(); break; } case PACKETS_CONFIRMED: { packet = new PacketsConfirmedMessage(); break; } - case NULL_RESPONSE: { - if (connection.isVersionBeforeAsyncResponseChange()) { - packet = new NullResponseMessage(); - } else { - packet = new NullResponseMessage_V2(); - } - break; - } case CREATESESSION: { packet = new CreateSessionMessage(); break; @@ -332,11 +316,7 @@ public abstract class PacketDecoder implements Serializable { break; } case SESS_XA_RESP: { - if (connection.isVersionBeforeAsyncResponseChange()) { - packet = new SessionXAResponseMessage(); - } else { - packet = new SessionXAResponseMessage_V2(); - } + packet = new SessionXAResponseMessage(); break; } case SESS_XA_ROLLBACK: { @@ -403,16 +383,16 @@ public abstract class PacketDecoder implements Serializable { packet = new SessionIndividualAcknowledgeMessage(); break; } + case NULL_RESPONSE: { + packet = new NullResponseMessage(); + break; + } case SESS_RECEIVE_CONTINUATION: { packet = new SessionReceiveContinuationMessage(); break; } case SESS_SEND_CONTINUATION: { - if (connection.isVersionBeforeAsyncResponseChange()) { - packet = new SessionSendContinuationMessage(); - } else { - packet = new SessionSendContinuationMessage_V2(); - } + packet = new SessionSendContinuationMessage(); break; } case SESS_PRODUCER_REQUEST_CREDITS: { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index 0168a47..87ba0c3 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -31,9 +31,7 @@ public class PacketImpl implements Packet { // 2.0.0 public static final int ADDRESSING_CHANGE_VERSION = 129; - - // 2.7.0 - public static final int ASYNC_RESPONSE_CHANGE_VERSION = 130; + public static final int SHARED_QUEUE_SECURITY_FIX_CHANGE_VERSION = 130; public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue."); @@ -274,7 +272,6 @@ public class PacketImpl implements Packet { public static final byte SESS_BINDINGQUERY_RESP_V4 = -15; - // Static -------------------------------------------------------- public PacketImpl(final byte type) { @@ -431,7 +428,7 @@ public class PacketImpl implements Packet { } protected String getParentString() { - return "PACKET(" + this.getClass().getSimpleName() + ")[type=" + type + ", channelID=" + channelID + ", responseAsync=" + isResponseAsync() + ", requiresResponse=" + isRequiresResponse() + ", correlationID=" + getCorrelationID() + ", packetObject=" + this.getClass().getSimpleName(); + return "PACKET(" + this.getClass().getSimpleName() + ")[type=" + type + ", channelID=" + channelID + ", packetObject=" + this.getClass().getSimpleName(); } private int stringEncodeSize(final String str) { @@ -442,24 +439,5 @@ public class PacketImpl implements Packet { return DataConstants.SIZE_BOOLEAN + (str != null ? stringEncodeSize(str) : 0); } - @Override - public boolean isRequiresResponse() { - return false; - } - - @Override - public boolean isResponseAsync() { - return false; - } - - @Override - public long getCorrelationID() { - return -1; - } - - @Override - public void setCorrelationID(long correlationID) { - } - } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java deleted file mode 100644 index 8ee73d7..0000000 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.core.impl; - -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.core.protocol.core.Packet; -import org.apache.activemq.artemis.core.protocol.core.ResponseHandler; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2; -import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap; -import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet; - -public class ResponseCache { - - private final AtomicLong sequence = new AtomicLong(0); - - private final ConcurrentLongHashMap<Packet> store; - private ResponseHandler responseHandler; - - public ResponseCache() { - this.store = new ConcurrentLongHashMap<>(); - } - - public long nextCorrelationID() { - return sequence.incrementAndGet(); - } - - public boolean add(Packet packet) { - this.store.put(packet.getCorrelationID(), packet); - return true; - } - - public Packet remove(long correlationID) { - return store.remove(correlationID); - } - - public void handleResponse(Packet response) { - long correlationID = response.getCorrelationID(); - Packet packet = remove(correlationID); - if (packet != null) { - responseHandler.handleResponse(packet, response); - } - } - - public void errorAll(ActiveMQException exception) { - ConcurrentLongHashSet keys = store.keysLongHashSet(); - keys.forEach(correlationID -> { - handleResponse(new ActiveMQExceptionMessage_V2(correlationID, exception)); - }); - } - - public void setResponseHandler(ResponseHandler responseHandler) { - this.responseHandler = responseHandler; - } - - public int size() { - return this.store.size(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java index 51637f3..da34d2e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java @@ -23,7 +23,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; public class ActiveMQExceptionMessage extends PacketImpl { - protected ActiveMQException exception; + private ActiveMQException exception; // Static -------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage_V2.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage_V2.java deleted file mode 100644 index 661a040..0000000 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage_V2.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; - -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.utils.DataConstants; - -public class ActiveMQExceptionMessage_V2 extends ActiveMQExceptionMessage { - - private long correlationID; - - // Static -------------------------------------------------------- - - // Constructors -------------------------------------------------- - - public ActiveMQExceptionMessage_V2(final long correlationID, final ActiveMQException exception) { - super(exception); - this.correlationID = correlationID; - } - - public ActiveMQExceptionMessage_V2() { - super(); - } - - // Public -------------------------------------------------------- - - @Override - public boolean isResponse() { - return true; - } - - @Override - public void encodeRest(final ActiveMQBuffer buffer) { - super.encodeRest(buffer); - buffer.writeLong(correlationID); - } - - @Override - public void decodeRest(final ActiveMQBuffer buffer) { - super.decodeRest(buffer); - if (buffer.readableBytes() >= DataConstants.SIZE_LONG) { - correlationID = buffer.readLong(); - } - } - - @Override - public final boolean isResponseAsync() { - return true; - } - - @Override - public long getCorrelationID() { - return this.correlationID; - } - - @Override - public String toString() { - return getParentString() + ", exception= " + exception + "]"; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = super.hashCode(); - result = prime * result + (int) (correlationID ^ (correlationID >>> 32)); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (!super.equals(obj)) { - return false; - } - if (!(obj instanceof ActiveMQExceptionMessage_V2)) { - return false; - } - ActiveMQExceptionMessage_V2 other = (ActiveMQExceptionMessage_V2) obj; - if (correlationID != other.correlationID) { - return false; - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java index 8c84a9b..a98f888 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java @@ -65,7 +65,6 @@ public class CreateAddressMessage extends PacketImpl { return address; } - @Override public boolean isRequiresResponse() { return requiresResponse; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java index 985d5f4..2ebf147 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java @@ -100,7 +100,6 @@ public class CreateQueueMessage extends PacketImpl { return temporary; } - @Override public boolean isRequiresResponse() { return requiresResponse; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java index 3c072e0..af25ae9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java @@ -80,7 +80,6 @@ public class CreateSharedQueueMessage extends PacketImpl { return filterString; } - @Override public boolean isRequiresResponse() { return requiresResponse; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java deleted file mode 100644 index e3453af..0000000 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; - -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.utils.DataConstants; - -public class NullResponseMessage_V2 extends NullResponseMessage { - - private long correlationID; - - public NullResponseMessage_V2(final long correlationID) { - super(); - this.correlationID = correlationID; - } - - public NullResponseMessage_V2() { - super(); - } - - // Public -------------------------------------------------------- - - @Override - public long getCorrelationID() { - return correlationID; - } - - @Override - public void encodeRest(final ActiveMQBuffer buffer) { - super.encodeRest(buffer); - buffer.writeLong(correlationID); - } - - @Override - public void decodeRest(final ActiveMQBuffer buffer) { - super.decodeRest(buffer); - if (buffer.readableBytes() >= DataConstants.SIZE_LONG) { - correlationID = buffer.readLong(); - } - } - - @Override - public final boolean isResponse() { - return true; - } - - @Override - public final boolean isResponseAsync() { - return true; - } - - @Override - public String toString() { - return getParentString() + ", correlationID=" + correlationID + "]"; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = super.hashCode(); - result = prime * result + (int) (correlationID ^ (correlationID >>> 32)); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (!super.equals(obj)) { - return false; - } - if (!(obj instanceof NullResponseMessage_V2)) { - return false; - } - NullResponseMessage_V2 other = (NullResponseMessage_V2) obj; - if (correlationID != other.correlationID) { - return false; - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java index 67d9f67..542c34c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java @@ -51,7 +51,6 @@ public class SessionAcknowledgeMessage extends PacketImpl { return messageID; } - @Override public boolean isRequiresResponse() { return requiresResponse; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java index e07b50c..f09beeb 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java @@ -71,7 +71,6 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket { return browseOnly; } - @Override public boolean isRequiresResponse() { return requiresResponse; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java index 3164c23..7d06081 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java @@ -60,7 +60,6 @@ public class SessionIndividualAcknowledgeMessage extends PacketImpl { return messageID; } - @Override public boolean isRequiresResponse() { return requiresResponse; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java index 4105b11..26eedd7 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java @@ -26,10 +26,10 @@ import org.apache.activemq.artemis.utils.DataConstants; */ public class SessionSendContinuationMessage extends SessionContinuationMessage { - protected boolean requiresResponse; + private boolean requiresResponse; // Used on confirmation handling - protected Message message; + private Message message; /** * In case, we are using a different handler than the one set on the {@link org.apache.activemq.artemis.api.core.client.ClientSession} * <br> @@ -43,7 +43,7 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage { /** * to be sent on the last package */ - protected long messageBodySize = -1; + private long messageBodySize = -1; // Static -------------------------------------------------------- @@ -54,11 +54,6 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage { handler = null; } - protected SessionSendContinuationMessage(byte type) { - super(type); - handler = null; - } - /** * @param body * @param continues @@ -77,31 +72,11 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage { this.messageBodySize = messageBodySize; } - /** - * @param body - * @param continues - * @param requiresResponse - */ - protected SessionSendContinuationMessage(final byte type, - final Message message, - final byte[] body, - final boolean continues, - final boolean requiresResponse, - final long messageBodySize, - SendAcknowledgementHandler handler) { - super(type, body, continues); - this.requiresResponse = requiresResponse; - this.message = message; - this.handler = handler; - this.messageBodySize = messageBodySize; - } - // Public -------------------------------------------------------- /** * @return the requiresResponse */ - @Override public boolean isRequiresResponse() { return requiresResponse; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java deleted file mode 100644 index 2a3071c..0000000 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; - -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; -import org.apache.activemq.artemis.utils.DataConstants; - -/** - * A SessionSendContinuationMessage<br> - */ -public class SessionSendContinuationMessage_V2 extends SessionSendContinuationMessage { - - private long correlationID; - - // Static -------------------------------------------------------- - - // Constructors -------------------------------------------------- - - public SessionSendContinuationMessage_V2() { - super(); - } - - /** - * @param body - * @param continues - * @param requiresResponse - */ - public SessionSendContinuationMessage_V2(final Message message, - final byte[] body, - final boolean continues, - final boolean requiresResponse, - final long messageBodySize, - SendAcknowledgementHandler handler) { - super(message, body, continues, requiresResponse, messageBodySize, handler); - } - - // Public -------------------------------------------------------- - - @Override - public int expectedEncodeSize() { - return super.expectedEncodeSize() + DataConstants.SIZE_LONG; - } - - @Override - public void encodeRest(final ActiveMQBuffer buffer) { - super.encodeRest(buffer); - buffer.writeLong(correlationID); - } - - @Override - public void decodeRest(final ActiveMQBuffer buffer) { - super.decodeRest(buffer); - if (buffer.readableBytes() >= DataConstants.SIZE_LONG) { - correlationID = buffer.readLong(); - } - } - - @Override - public long getCorrelationID() { - return this.correlationID; - } - - @Override - public void setCorrelationID(long correlationID) { - this.correlationID = correlationID; - } - - @Override - public boolean isResponseAsync() { - return true; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = super.hashCode(); - result = prime * result + (int) (correlationID ^ (correlationID >>> 32)); - return result; - } - - @Override - public String toString() { - StringBuffer buff = new StringBuffer(getParentString()); - buff.append(", continues=" + continues); - buff.append(", message=" + message); - buff.append(", messageBodySize=" + messageBodySize); - buff.append(", requiresResponse=" + requiresResponse); - buff.append(", correlationID=" + correlationID); - buff.append("]"); - return buff.toString(); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (!super.equals(obj)) - return false; - if (!(obj instanceof SessionSendContinuationMessage_V2)) - return false; - SessionSendContinuationMessage_V2 other = (SessionSendContinuationMessage_V2) obj; - if (correlationID != other.correlationID) - return false; - return true; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java index e8dbdc1..b56ae30 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java @@ -21,7 +21,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.core.message.impl.CoreMessage; -import org.apache.activemq.artemis.utils.DataConstants; public class SessionSendMessage extends MessagePacket { @@ -38,22 +37,6 @@ public class SessionSendMessage extends MessagePacket { private final transient SendAcknowledgementHandler handler; /** This will be using the CoreMessage because it is meant for the core-protocol */ - protected SessionSendMessage(final byte id, - final ICoreMessage message, - final boolean requiresResponse, - final SendAcknowledgementHandler handler) { - super(id, message); - this.handler = handler; - this.requiresResponse = requiresResponse; - } - - protected SessionSendMessage(final byte id, - final CoreMessage message) { - super(id, message); - this.handler = null; - } - - /** This will be using the CoreMessage because it is meant for the core-protocol */ public SessionSendMessage(final ICoreMessage message, final boolean requiresResponse, final SendAcknowledgementHandler handler) { @@ -69,7 +52,6 @@ public class SessionSendMessage extends MessagePacket { // Public -------------------------------------------------------- - @Override public boolean isRequiresResponse() { return requiresResponse; } @@ -80,7 +62,7 @@ public class SessionSendMessage extends MessagePacket { @Override public int expectedEncodeSize() { - return message.getEncodeSize() + PACKET_HEADERS_SIZE + fieldsEncodeSize(); + return message.getEncodeSize() + PACKET_HEADERS_SIZE + 1; } @Override @@ -93,16 +75,13 @@ public class SessionSendMessage extends MessagePacket { public void decodeRest(final ActiveMQBuffer buffer) { // Buffer comes in after having read standard headers and positioned at Beginning of body part - ByteBuf messageBuffer = copyMessageBuffer(buffer.byteBuf(), fieldsEncodeSize()); + ByteBuf messageBuffer = copyMessageBuffer(buffer.byteBuf(), 1); receiveMessage(messageBuffer); - buffer.readerIndex(buffer.capacity() - fieldsEncodeSize()); + buffer.readerIndex(buffer.capacity() - 1); requiresResponse = buffer.readBoolean(); - } - protected int fieldsEncodeSize() { - return DataConstants.SIZE_BOOLEAN; } protected void receiveMessage(ByteBuf messageBuffer) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_V2.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_V2.java deleted file mode 100644 index 63c9a34..0000000 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_V2.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; - -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ICoreMessage; -import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; -import org.apache.activemq.artemis.core.message.impl.CoreMessage; -import org.apache.activemq.artemis.utils.DataConstants; - -public class SessionSendMessage_V2 extends SessionSendMessage { - - private long correlationID; - - /** This will be using the CoreMessage because it is meant for the core-protocol */ - public SessionSendMessage_V2(final ICoreMessage message, - final boolean requiresResponse, - final SendAcknowledgementHandler handler) { - super(SESS_SEND, message, requiresResponse, handler); - } - - public SessionSendMessage_V2(final CoreMessage message) { - super(SESS_SEND, message); - } - - @Override - public void encodeRest(ActiveMQBuffer buffer) { - super.encodeRest(buffer); - buffer.writeLong(correlationID); - } - - @Override - public void decodeRest(final ActiveMQBuffer buffer) { - super.decodeRest(buffer); - correlationID = buffer.readLong(); - } - - @Override - protected int fieldsEncodeSize() { - return super.fieldsEncodeSize() + DataConstants.SIZE_LONG; - } - - @Override - public long getCorrelationID() { - return this.correlationID; - } - - @Override - public void setCorrelationID(long correlationID) { - this.correlationID = correlationID; - } - - @Override - public boolean isResponseAsync() { - return true; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = super.hashCode(); - result = prime * result + (int) (correlationID ^ (correlationID >>> 32)); - return result; - } - - - @Override - public String toString() { - StringBuffer buff = new StringBuffer(getParentString()); - buff.append(", correlationID=" + correlationID); - buff.append(", requiresResponse=" + super.isRequiresResponse()); - buff.append("]"); - return buff.toString(); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (!super.equals(obj)) - return false; - if (!(obj instanceof SessionSendMessage_V2)) - return false; - SessionSendMessage_V2 other = (SessionSendMessage_V2) obj; - if (correlationID != other.correlationID) - return false; - return true; - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java index f88e0c8..086b851 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java @@ -21,11 +21,11 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; public class SessionXAResponseMessage extends PacketImpl { - protected boolean error; + private boolean error; - protected int responseCode; + private int responseCode; - protected String message; + private String message; public SessionXAResponseMessage(final boolean isError, final int responseCode, final String message) { super(SESS_XA_RESP); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java deleted file mode 100644 index 4e949bd..0000000 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; - -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.utils.DataConstants; - -public class SessionXAResponseMessage_V2 extends SessionXAResponseMessage { - - private long correlationID; - - public SessionXAResponseMessage_V2(final long correlationID, final boolean isError, final int responseCode, final String message) { - super(isError, responseCode, message); - this.correlationID = correlationID; - } - - public SessionXAResponseMessage_V2() { - super(); - } - - // Public -------------------------------------------------------- - - @Override - public long getCorrelationID() { - return correlationID; - } - - @Override - public void encodeRest(final ActiveMQBuffer buffer) { - super.encodeRest(buffer); - buffer.writeLong(correlationID); - } - - @Override - public void decodeRest(final ActiveMQBuffer buffer) { - super.decodeRest(buffer); - if (buffer.readableBytes() >= DataConstants.SIZE_LONG) { - correlationID = buffer.readLong(); - } - } - - @Override - public final boolean isResponse() { - return true; - } - - @Override - public final boolean isResponseAsync() { - return true; - } - - @Override - public String toString() { - StringBuffer buff = new StringBuffer(getParentString()); - buff.append(", error=" + error); - buff.append(", message=" + message); - buff.append(", responseCode=" + responseCode); - buff.append(", correlationID=" + correlationID); - buff.append("]"); - return buff.toString(); - } - - @Override - public int hashCode() { - final int prime = 31; - int result = super.hashCode(); - result = prime * result + (int) (correlationID ^ (correlationID >>> 32)); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (!super.equals(obj)) { - return false; - } - if (!(obj instanceof SessionXAResponseMessage_V2)) { - return false; - } - SessionXAResponseMessage_V2 other = (SessionXAResponseMessage_V2) obj; - if (correlationID != other.correlationID) { - return false; - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/resources/activemq-version.properties ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/resources/activemq-version.properties b/artemis-core-client/src/main/resources/activemq-version.properties index ff65ff9..a39b422 100644 --- a/artemis-core-client/src/main/resources/activemq-version.properties +++ b/artemis-core-client/src/main/resources/activemq-version.properties @@ -20,4 +20,4 @@ activemq.version.minorVersion=${activemq.version.minorVersion} activemq.version.microVersion=${activemq.version.microVersion} activemq.version.incrementingVersion=${activemq.version.incrementingVersion} activemq.version.versionTag=${activemq.version.versionTag} -activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129,130 +activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java deleted file mode 100644 index 416c911..0000000 --- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java +++ /dev/null @@ -1,512 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.core.impl; - -import javax.security.auth.Subject; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -import io.netty.buffer.Unpooled; -import io.netty.channel.ChannelFutureListener; -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.api.core.TransportConfiguration; -import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; -import org.apache.activemq.artemis.core.protocol.core.Channel; -import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler; -import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; -import org.apache.activemq.artemis.core.protocol.core.Packet; -import org.apache.activemq.artemis.core.protocol.core.ResponseHandler; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage; -import org.apache.activemq.artemis.core.remoting.CloseListener; -import org.apache.activemq.artemis.core.remoting.FailureListener; -import org.apache.activemq.artemis.core.security.ActiveMQPrincipal; -import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; -import org.apache.activemq.artemis.spi.core.remoting.Connection; -import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -public class ChannelImplTest { - - ChannelImpl channel; - - @Before - public void setUp() { - channel = new ChannelImpl(new CoreRR(), 1, 4000, null); - } - - @Test - public void testCorrelation() { - - AtomicInteger handleResponseCount = new AtomicInteger(); - - RequestPacket requestPacket = new RequestPacket((byte) 1); - setResponseHandlerAsPerActiveMQSessionContext((packet, response) -> handleResponseCount.incrementAndGet()); - - channel.send(requestPacket); - - assertEquals(1, channel.getCache().size()); - - ResponsePacket responsePacket = new ResponsePacket((byte) 1); - responsePacket.setCorrelationID(requestPacket.getCorrelationID()); - - channel.handlePacket(responsePacket); - - assertEquals(1, handleResponseCount.get()); - assertEquals(0, channel.getCache().size()); - } - - private void setResponseHandlerAsPerActiveMQSessionContext(ResponseHandler responseHandler) { - channel.setResponseHandler(responseHandler); - channel.setCommandConfirmationHandler(wrapAsPerActiveMQSessionContext(responseHandler)); - } - - private CommandConfirmationHandler wrapAsPerActiveMQSessionContext(ResponseHandler responseHandler) { - return new CommandConfirmationHandler() { - @Override - public void commandConfirmed(Packet packet) { - responseHandler.handleResponse(packet, null); - } - }; - } - - @Test - public void testPacketsConfirmedMessage() { - - AtomicInteger handleResponseCount = new AtomicInteger(); - - RequestPacket requestPacket = new RequestPacket((byte) 1); - setResponseHandlerAsPerActiveMQSessionContext((packet, response) -> handleResponseCount.incrementAndGet()); - - channel.send(requestPacket); - - PacketsConfirmedMessage responsePacket = new PacketsConfirmedMessage((byte) 2); - - channel.handlePacket(responsePacket); - - assertEquals(0, channel.getCache().size()); - } - - class RequestPacket extends PacketImpl { - - private long id; - - RequestPacket(byte type) { - super(type); - } - - @Override - public boolean isRequiresResponse() { - return true; - } - - @Override - public boolean isResponseAsync() { - return true; - } - - @Override - public long getCorrelationID() { - return id; - } - - @Override - public void setCorrelationID(long id) { - this.id = id; - } - - @Override - public int getPacketSize() { - return 0; - } - } - - class ResponsePacket extends PacketImpl { - - private long id; - - ResponsePacket(byte type) { - super(type); - } - - @Override - public boolean isResponseAsync() { - return true; - } - - @Override - public boolean isResponse() { - return true; - } - - @Override - public long getCorrelationID() { - return id; - } - - @Override - public void setCorrelationID(long id) { - this.id = id; - } - - @Override - public int getPacketSize() { - return 0; - } - } - - class CoreRR implements CoreRemotingConnection { - - @Override - public int getChannelVersion() { - return 0; - } - - @Override - public void setChannelVersion(int clientVersion) { - - } - - @Override - public Channel getChannel(long channelID, int confWindowSize) { - return null; - } - - @Override - public void putChannel(long channelID, Channel channel) { - - } - - @Override - public boolean removeChannel(long channelID) { - return false; - } - - @Override - public long generateChannelID() { - return 0; - } - - @Override - public void syncIDGeneratorSequence(long id) { - - } - - @Override - public long getIDGeneratorSequence() { - return 0; - } - - @Override - public long getBlockingCallTimeout() { - return 0; - } - - @Override - public long getBlockingCallFailoverTimeout() { - return 0; - } - - @Override - public Object getTransferLock() { - return null; - } - - @Override - public ActiveMQPrincipal getDefaultActiveMQPrincipal() { - return null; - } - - @Override - public boolean blockUntilWritable(int size, long timeout) { - return false; - } - - @Override - public Object getID() { - return null; - } - - @Override - public long getCreationTime() { - return 0; - } - - @Override - public String getRemoteAddress() { - return null; - } - - @Override - public void scheduledFlush() { - - } - - @Override - public void addFailureListener(FailureListener listener) { - - } - - @Override - public boolean removeFailureListener(FailureListener listener) { - return false; - } - - @Override - public void addCloseListener(CloseListener listener) { - - } - - @Override - public boolean removeCloseListener(CloseListener listener) { - return false; - } - - @Override - public List<CloseListener> removeCloseListeners() { - return null; - } - - @Override - public void setCloseListeners(List<CloseListener> listeners) { - - } - - @Override - public List<FailureListener> getFailureListeners() { - return null; - } - - @Override - public List<FailureListener> removeFailureListeners() { - return null; - } - - @Override - public void setFailureListeners(List<FailureListener> listeners) { - - } - - @Override - public ActiveMQBuffer createTransportBuffer(int size) { - return new ChannelBufferWrapper(Unpooled.buffer(size)); - } - - @Override - public void fail(ActiveMQException me) { - - } - - @Override - public void fail(ActiveMQException me, String scaleDownTargetNodeID) { - - } - - @Override - public void destroy() { - - } - - @Override - public Connection getTransportConnection() { - return new Connection() { - @Override - public ActiveMQBuffer createTransportBuffer(int size) { - return null; - } - - @Override - public RemotingConnection getProtocolConnection() { - return null; - } - - @Override - public void setProtocolConnection(RemotingConnection connection) { - - } - - @Override - public boolean isWritable(ReadyListener listener) { - return false; - } - - @Override - public void fireReady(boolean ready) { - - } - - @Override - public void setAutoRead(boolean autoRead) { - - } - - @Override - public Object getID() { - return null; - } - - @Override - public void write(ActiveMQBuffer buffer, boolean flush, boolean batched) { - - } - - @Override - public void write(ActiveMQBuffer buffer, - boolean flush, - boolean batched, - ChannelFutureListener futureListener) { - - } - - @Override - public void write(ActiveMQBuffer buffer) { - - } - - @Override - public void forceClose() { - - } - - @Override - public void close() { - - } - - @Override - public String getRemoteAddress() { - return null; - } - - @Override - public String getLocalAddress() { - return null; - } - - @Override - public void checkFlushBatchBuffer() { - - } - - @Override - public TransportConfiguration getConnectorConfig() { - return null; - } - - @Override - public ActiveMQPrincipal getDefaultActiveMQPrincipal() { - return null; - } - - @Override - public boolean isUsingProtocolHandling() { - return false; - } - - @Override - public boolean isSameTarget(TransportConfiguration... configs) { - return false; - } - }; - } - - @Override - public boolean isClient() { - return true; - } - - @Override - public boolean isDestroyed() { - return false; - } - - @Override - public void disconnect(boolean criticalError) { - - } - - @Override - public void disconnect(String scaleDownNodeID, boolean criticalError) { - - } - - @Override - public boolean checkDataReceived() { - return false; - } - - @Override - public void flush() { - - } - - @Override - public boolean isWritable(ReadyListener callback) { - return false; - } - - @Override - public void killMessage(SimpleString nodeID) { - - } - - @Override - public boolean isSupportReconnect() { - return false; - } - - @Override - public boolean isSupportsFlowControl() { - return false; - } - - @Override - public Subject getSubject() { - return null; - } - - @Override - public String getProtocolName() { - return null; - } - - @Override - public void setClientID(String cID) { - - } - - @Override - public String getClientID() { - return null; - } - - @Override - public String getTransportLocalAddress() { - return null; - } - - @Override - public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) { - - } - } - -} \ No newline at end of file
