http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/PacketDecoder.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/PacketDecoder.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/PacketDecoder.java new file mode 100644 index 0000000..c13a4e8 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/PacketDecoder.java @@ -0,0 +1,474 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.protocol.core.impl; + +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY_V2; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY_V3; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.CREATESESSION; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.CREATESESSION_RESP; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.CREATE_QUEUE; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.DELETE_QUEUE; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.DISCONNECT; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.DISCONNECT_V2; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.DISCONNECT_CONSUMER; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.EXCEPTION; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.NULL_RESPONSE; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.PACKETS_CONFIRMED; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.PING; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.REATTACH_SESSION; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.REATTACH_SESSION_RESP; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.CHECK_FOR_FAILOVER; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_ADD_METADATA; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_ADD_METADATA2; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_CLOSE; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_COMMIT; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_CONSUMER_CLOSE; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_CREATECONSUMER; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_EXPIRED; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKEN; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_INDIVIDUAL_ACKNOWLEDGE; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_CREDITS; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_FAIL_CREDITS; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY_RESP; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_CONTINUATION; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_SEND_CONTINUATION; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_START; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_STOP; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_UNIQUE_ADD_METADATA; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_COMMIT; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_END; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_FAILED; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_FORGET; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_GET_TIMEOUT; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_GET_TIMEOUT_RESP; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_INDOUBT_XIDS; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_INDOUBT_XIDS_RESP; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_JOIN; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_PREPARE; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_RESP; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_RESUME; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_ROLLBACK; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_SET_TIMEOUT; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_SET_TIMEOUT_RESP; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_START; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_SUSPEND; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SUBSCRIBE_TOPOLOGY; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SUBSCRIBE_TOPOLOGY_V2; + +import java.io.Serializable; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.core.client.HornetQClientMessageBundle; +import org.apache.activemq6.core.protocol.core.Packet; +import org.apache.activemq6.core.protocol.core.impl.wireformat.CheckFailoverMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.CheckFailoverReplyMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2; +import org.apache.activemq6.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V3; +import org.apache.activemq6.core.protocol.core.impl.wireformat.CreateQueueMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.CreateSessionMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.CreateSessionResponseMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.CreateSharedQueueMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.DisconnectConsumerMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.DisconnectMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.DisconnectMessage_V2; +import org.apache.activemq6.core.protocol.core.impl.wireformat.HornetQExceptionMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.NullResponseMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.PacketsConfirmedMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.Ping; +import org.apache.activemq6.core.protocol.core.impl.wireformat.ReattachSessionMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.ReattachSessionResponseMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.RollbackMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionAddMetaDataMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionBindingQueryMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionCloseMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionCommitMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionDeleteQueueMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionExpireMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionForceConsumerDelivery; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionIndividualAcknowledgeMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionProducerCreditsFailMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionQueueQueryMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionSendContinuationMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXACommitMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAEndMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAForgetMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAGetTimeoutResponseMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAJoinMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAPrepareMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAResponseMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAResumeMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXARollbackMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAStartMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2; + +/** + * A PacketDecoder + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + */ +public abstract class PacketDecoder implements Serializable +{ + public abstract Packet decode(final HornetQBuffer in); + + public Packet decode(byte packetType) + { + Packet packet; + + switch (packetType) + { + case PING: + { + packet = new Ping(); + break; + } + case DISCONNECT: + { + packet = new DisconnectMessage(); + break; + } + case DISCONNECT_V2: + { + packet = new DisconnectMessage_V2(); + break; + } + case DISCONNECT_CONSUMER: + { + packet = new DisconnectConsumerMessage(); + break; + } + case EXCEPTION: + { + packet = new HornetQExceptionMessage(); + break; + } + case PACKETS_CONFIRMED: + { + packet = new PacketsConfirmedMessage(); + break; + } + case CREATESESSION: + { + packet = new CreateSessionMessage(); + break; + } + case CHECK_FOR_FAILOVER: + { + packet = new CheckFailoverMessage(); + break; + } + case CREATESESSION_RESP: + { + packet = new CreateSessionResponseMessage(); + break; + } + case REATTACH_SESSION: + { + packet = new ReattachSessionMessage(); + break; + } + case REATTACH_SESSION_RESP: + { + packet = new ReattachSessionResponseMessage(); + break; + } + case SESS_CLOSE: + { + packet = new SessionCloseMessage(); + break; + } + case SESS_CREATECONSUMER: + { + packet = new SessionCreateConsumerMessage(); + break; + } + case SESS_ACKNOWLEDGE: + { + packet = new SessionAcknowledgeMessage(); + break; + } + case SESS_EXPIRED: + { + packet = new SessionExpireMessage(); + break; + } + case SESS_COMMIT: + { + packet = new SessionCommitMessage(); + break; + } + case SESS_ROLLBACK: + { + packet = new RollbackMessage(); + break; + } + case SESS_QUEUEQUERY: + { + packet = new SessionQueueQueryMessage(); + break; + } + case SESS_QUEUEQUERY_RESP: + { + packet = new SessionQueueQueryResponseMessage(); + break; + } + case CREATE_QUEUE: + { + packet = new CreateQueueMessage(); + break; + } + case CREATE_SHARED_QUEUE: + { + packet = new CreateSharedQueueMessage(); + break; + } + case DELETE_QUEUE: + { + packet = new SessionDeleteQueueMessage(); + break; + } + case SESS_BINDINGQUERY: + { + packet = new SessionBindingQueryMessage(); + break; + } + case SESS_BINDINGQUERY_RESP: + { + packet = new SessionBindingQueryResponseMessage(); + break; + } + case SESS_XA_START: + { + packet = new SessionXAStartMessage(); + break; + } + case SESS_XA_FAILED: + { + packet = new SessionXAAfterFailedMessage(); + break; + } + case SESS_XA_END: + { + packet = new SessionXAEndMessage(); + break; + } + case SESS_XA_COMMIT: + { + packet = new SessionXACommitMessage(); + break; + } + case SESS_XA_PREPARE: + { + packet = new SessionXAPrepareMessage(); + break; + } + case SESS_XA_RESP: + { + packet = new SessionXAResponseMessage(); + break; + } + case SESS_XA_ROLLBACK: + { + packet = new SessionXARollbackMessage(); + break; + } + case SESS_XA_JOIN: + { + packet = new SessionXAJoinMessage(); + break; + } + case SESS_XA_SUSPEND: + { + packet = new PacketImpl(PacketImpl.SESS_XA_SUSPEND); + break; + } + case SESS_XA_RESUME: + { + packet = new SessionXAResumeMessage(); + break; + } + case SESS_XA_FORGET: + { + packet = new SessionXAForgetMessage(); + break; + } + case SESS_XA_INDOUBT_XIDS: + { + packet = new PacketImpl(PacketImpl.SESS_XA_INDOUBT_XIDS); + break; + } + case SESS_XA_INDOUBT_XIDS_RESP: + { + packet = new SessionXAGetInDoubtXidsResponseMessage(); + break; + } + case SESS_XA_SET_TIMEOUT: + { + packet = new SessionXASetTimeoutMessage(); + break; + } + case SESS_XA_SET_TIMEOUT_RESP: + { + packet = new SessionXASetTimeoutResponseMessage(); + break; + } + case SESS_XA_GET_TIMEOUT: + { + packet = new PacketImpl(PacketImpl.SESS_XA_GET_TIMEOUT); + break; + } + case SESS_XA_GET_TIMEOUT_RESP: + { + packet = new SessionXAGetTimeoutResponseMessage(); + break; + } + case SESS_START: + { + packet = new PacketImpl(PacketImpl.SESS_START); + break; + } + case SESS_STOP: + { + packet = new PacketImpl(PacketImpl.SESS_STOP); + break; + } + case SESS_FLOWTOKEN: + { + packet = new SessionConsumerFlowCreditMessage(); + break; + } + case SESS_CONSUMER_CLOSE: + { + packet = new SessionConsumerCloseMessage(); + break; + } + case SESS_INDIVIDUAL_ACKNOWLEDGE: + { + packet = new SessionIndividualAcknowledgeMessage(); + break; + } + case NULL_RESPONSE: + { + packet = new NullResponseMessage(); + break; + } + case SESS_RECEIVE_CONTINUATION: + { + packet = new SessionReceiveContinuationMessage(); + break; + } + case SESS_SEND_CONTINUATION: + { + packet = new SessionSendContinuationMessage(); + break; + } + case SESS_PRODUCER_REQUEST_CREDITS: + { + packet = new SessionRequestProducerCreditsMessage(); + break; + } + case SESS_PRODUCER_CREDITS: + { + packet = new SessionProducerCreditsMessage(); + break; + } + case SESS_PRODUCER_FAIL_CREDITS: + { + packet = new SessionProducerCreditsFailMessage(); + break; + } + case SESS_FORCE_CONSUMER_DELIVERY: + { + packet = new SessionForceConsumerDelivery(); + break; + } + case CLUSTER_TOPOLOGY: + { + packet = new ClusterTopologyChangeMessage(); + break; + } + case CLUSTER_TOPOLOGY_V2: + { + packet = new ClusterTopologyChangeMessage_V2(); + break; + } + case CLUSTER_TOPOLOGY_V3: + { + packet = new ClusterTopologyChangeMessage_V3(); + break; + } + case SUBSCRIBE_TOPOLOGY: + { + packet = new SubscribeClusterTopologyUpdatesMessage(); + break; + } + case SUBSCRIBE_TOPOLOGY_V2: + { + packet = new SubscribeClusterTopologyUpdatesMessageV2(); + break; + } + case SESS_ADD_METADATA: + { + packet = new SessionAddMetaDataMessage(); + break; + } + case SESS_ADD_METADATA2: + { + packet = new SessionAddMetaDataMessageV2(); + break; + } + case SESS_UNIQUE_ADD_METADATA: + { + packet = new SessionUniqueAddMetaDataMessage(); + break; + } + case PacketImpl.CHECK_FOR_FAILOVER_REPLY: + { + packet = new CheckFailoverReplyMessage(); + break; + } + default: + { + throw HornetQClientMessageBundle.BUNDLE.invalidType(packetType); + } + } + + return packet; + } + +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/PacketImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/PacketImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/PacketImpl.java new file mode 100644 index 0000000..66f72fe --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/PacketImpl.java @@ -0,0 +1,379 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.protocol.core.impl; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.core.protocol.core.Packet; +import org.apache.activemq6.spi.core.protocol.RemotingConnection; +import org.apache.activemq6.utils.DataConstants; + +/** + * @author <a href="mailto:[email protected]">Jeff Mesnil</a> + * @author <a href="mailto:[email protected]">Tim Fox</a> + */ +public class PacketImpl implements Packet +{ + // Constants ------------------------------------------------------------------------- + + // The minimal size for all the packets, Common data for all the packets (look at + // PacketImpl.encode) + public static final int PACKET_HEADERS_SIZE = DataConstants.SIZE_INT + DataConstants.SIZE_BYTE + + DataConstants.SIZE_LONG; + + private static final int INITIAL_PACKET_SIZE = 1500; + + protected long channelID; + + private final byte type; + + protected int size = -1; + + // The packet types + // ----------------------------------------------------------------------------------- + + public static final byte PING = 10; + + public static final byte DISCONNECT = 11; + + public static final byte DISCONNECT_CONSUMER = 12; + + // Miscellaneous + public static final byte EXCEPTION = 20; + + public static final byte NULL_RESPONSE = 21; + + public static final byte PACKETS_CONFIRMED = 22; + + // Server + public static final byte CREATESESSION = 30; + + public static final byte CREATESESSION_RESP = 31; + + public static final byte REATTACH_SESSION = 32; + + public static final byte REATTACH_SESSION_RESP = 33; + + public static final byte CREATE_QUEUE = 34; + + public static final byte DELETE_QUEUE = 35; + + public static final byte CREATE_SHARED_QUEUE = 36; + + // Session + + public static final byte SESS_XA_FAILED = 39; + + public static final byte SESS_CREATECONSUMER = 40; + + public static final byte SESS_ACKNOWLEDGE = 41; + + public static final byte SESS_EXPIRED = 42; + + public static final byte SESS_COMMIT = 43; + + public static final byte SESS_ROLLBACK = 44; + + public static final byte SESS_QUEUEQUERY = 45; + + public static final byte SESS_QUEUEQUERY_RESP = 46; + + public static final byte SESS_BINDINGQUERY = 49; + + public static final byte SESS_BINDINGQUERY_RESP = 50; + + public static final byte SESS_XA_START = 51; + + public static final byte SESS_XA_END = 52; + + public static final byte SESS_XA_COMMIT = 53; + + public static final byte SESS_XA_PREPARE = 54; + + public static final byte SESS_XA_RESP = 55; + + public static final byte SESS_XA_ROLLBACK = 56; + + public static final byte SESS_XA_JOIN = 57; + + public static final byte SESS_XA_SUSPEND = 58; + + public static final byte SESS_XA_RESUME = 59; + + public static final byte SESS_XA_FORGET = 60; + + public static final byte SESS_XA_INDOUBT_XIDS = 61; + + public static final byte SESS_XA_INDOUBT_XIDS_RESP = 62; + + public static final byte SESS_XA_SET_TIMEOUT = 63; + + public static final byte SESS_XA_SET_TIMEOUT_RESP = 64; + + public static final byte SESS_XA_GET_TIMEOUT = 65; + + public static final byte SESS_XA_GET_TIMEOUT_RESP = 66; + + public static final byte SESS_START = 67; + + public static final byte SESS_STOP = 68; + + public static final byte SESS_CLOSE = 69; + + public static final byte SESS_FLOWTOKEN = 70; + + public static final byte SESS_SEND = 71; + + public static final byte SESS_SEND_LARGE = 72; + + public static final byte SESS_SEND_CONTINUATION = 73; + + public static final byte SESS_CONSUMER_CLOSE = 74; + + public static final byte SESS_RECEIVE_MSG = 75; + + public static final byte SESS_RECEIVE_LARGE_MSG = 76; + + public static final byte SESS_RECEIVE_CONTINUATION = 77; + + public static final byte SESS_FORCE_CONSUMER_DELIVERY = 78; + + public static final byte SESS_PRODUCER_REQUEST_CREDITS = 79; + + public static final byte SESS_PRODUCER_CREDITS = 80; + + public static final byte SESS_INDIVIDUAL_ACKNOWLEDGE = 81; + + public static final byte SESS_PRODUCER_FAIL_CREDITS = 82; + + // Replication + + public static final byte REPLICATION_RESPONSE = 90; + + public static final byte REPLICATION_APPEND = 91; + + public static final byte REPLICATION_APPEND_TX = 92; + + public static final byte REPLICATION_DELETE = 93; + + public static final byte REPLICATION_DELETE_TX = 94; + + public static final byte REPLICATION_PREPARE = 95; + + public static final byte REPLICATION_COMMIT_ROLLBACK = 96; + + public static final byte REPLICATION_PAGE_WRITE = 97; + + public static final byte REPLICATION_PAGE_EVENT = 98; + + public static final byte REPLICATION_LARGE_MESSAGE_BEGIN = 99; + + public static final byte REPLICATION_LARGE_MESSAGE_END = 100; + + public static final byte REPLICATION_LARGE_MESSAGE_WRITE = 101; + + /* + * code 102 was REPLICATION_COMPARE_DATA, released into production as a message, but as part of + * the (then) non-function replication system. + */ + + public static final byte REPLICATION_SYNC_FILE = 103; + + public static final byte SESS_ADD_METADATA = 104; + + public static final byte SESS_ADD_METADATA2 = 105; + + public static final byte SESS_UNIQUE_ADD_METADATA = 106; + + + + // HA + + public static final byte CLUSTER_TOPOLOGY = 110; + + public static final byte NODE_ANNOUNCE = 111; + + public static final byte SUBSCRIBE_TOPOLOGY = 112; + + // For newer versions + + public static final byte SUBSCRIBE_TOPOLOGY_V2 = 113; + + public static final byte CLUSTER_TOPOLOGY_V2 = 114; + + public static final byte BACKUP_REGISTRATION = 115; + public static final byte BACKUP_REGISTRATION_FAILED = 116; + + public static final byte REPLICATION_START_FINISH_SYNC = 120; + public static final byte REPLICATION_SCHEDULED_FAILOVER = 121; + + public static final byte CLUSTER_TOPOLOGY_V3 = 122; + + //do not reuse + //public static final byte NODE_ANNOUNCE_V2 = 123; + + public static final byte DISCONNECT_V2 = 124; + + public static final byte CLUSTER_CONNECT = 125; + + public static final byte CLUSTER_CONNECT_REPLY = 126; + + public static final byte BACKUP_REQUEST = 127; + + //oops ran out of positive bytes + public static final byte BACKUP_REQUEST_RESPONSE = -1; + + public static final byte QUORUM_VOTE = -2; + + public static final byte QUORUM_VOTE_REPLY = -3; + + public static final byte CHECK_FOR_FAILOVER = -4; + + public static final byte CHECK_FOR_FAILOVER_REPLY = -5; + + public static final byte SCALEDOWN_ANNOUNCEMENT = -6; + + // Static -------------------------------------------------------- + + public PacketImpl(final byte type) + { + this.type = type; + } + + // Public -------------------------------------------------------- + + public byte getType() + { + return type; + } + + public long getChannelID() + { + return channelID; + } + + public void setChannelID(final long channelID) + { + this.channelID = channelID; + } + + public HornetQBuffer encode(final RemotingConnection connection) + { + HornetQBuffer buffer = connection.createBuffer(PacketImpl.INITIAL_PACKET_SIZE); + + // The standard header fields + + buffer.writeInt(0); // The length gets filled in at the end + buffer.writeByte(type); + buffer.writeLong(channelID); + + encodeRest(buffer); + + size = buffer.writerIndex(); + + // The length doesn't include the actual length byte + int len = size - DataConstants.SIZE_INT; + + buffer.setInt(0, len); + + return buffer; + } + + public void decode(final HornetQBuffer buffer) + { + channelID = buffer.readLong(); + + decodeRest(buffer); + + size = buffer.readerIndex(); + } + + public int getPacketSize() + { + if (size == -1) + { + throw new IllegalStateException("Packet hasn't been encoded/decoded yet"); + } + + return size; + } + + public boolean isResponse() + { + return false; + } + + public void encodeRest(final HornetQBuffer buffer) + { + } + + public void decodeRest(final HornetQBuffer buffer) + { + } + + public boolean isRequiresConfirmations() + { + return true; + } + + public boolean isAsyncExec() + { + return false; + } + + @Override + public String toString() + { + return getParentString() + "]"; + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = 1; + result = prime * result + (int)(channelID ^ (channelID >>> 32)); + result = prime * result + size; + result = prime * result + type; + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + { + return true; + } + if (!(obj instanceof PacketImpl)) + { + return false; + } + PacketImpl other = (PacketImpl)obj; + return (channelID == other.channelID) && (size == other.size) && (type != other.type); + } + + protected String getParentString() + { + return "PACKET(" + this.getClass().getSimpleName() + ")[type=" + type + ", channelID=" + channelID + ", packetObject=" + this.getClass().getSimpleName(); + } + + private int stringEncodeSize(final String str) + { + return DataConstants.SIZE_INT + str.length() * 2; + } + + protected int nullableStringEncodeSize(final String str) + { + return DataConstants.SIZE_BOOLEAN + (str != null ? stringEncodeSize(str) : 0); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/RemotingConnectionImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/RemotingConnectionImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/RemotingConnectionImpl.java new file mode 100644 index 0000000..f30f073 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/RemotingConnectionImpl.java @@ -0,0 +1,470 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.protocol.core.impl; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.Interceptor; +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.core.client.HornetQClientLogger; +import org.apache.activemq6.core.protocol.core.Channel; +import org.apache.activemq6.core.protocol.core.CoreRemotingConnection; +import org.apache.activemq6.core.protocol.core.Packet; +import org.apache.activemq6.core.protocol.core.impl.ChannelImpl.CHANNEL_ID; +import org.apache.activemq6.core.protocol.core.impl.wireformat.DisconnectMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.DisconnectMessage_V2; +import org.apache.activemq6.core.security.HornetQPrincipal; +import org.apache.activemq6.spi.core.protocol.AbstractRemotingConnection; +import org.apache.activemq6.spi.core.remoting.Connection; +import org.apache.activemq6.utils.SimpleIDGenerator; + +/** + * @author <a href="[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Jeff Mesnil</a> + */ +public class RemotingConnectionImpl extends AbstractRemotingConnection implements CoreRemotingConnection +{ + // Constants + // ------------------------------------------------------------------------------------ + + private static final boolean isTrace = HornetQClientLogger.LOGGER.isTraceEnabled(); + + // Static + // --------------------------------------------------------------------------------------- + + // Attributes + // ----------------------------------------------------------------------------------- + private final PacketDecoder packetDecoder; + + private final Map<Long, Channel> channels = new ConcurrentHashMap<Long, Channel>(); + + private final long blockingCallTimeout; + + private final long blockingCallFailoverTimeout; + + private final List<Interceptor> incomingInterceptors; + + private final List<Interceptor> outgoingInterceptors; + + private volatile boolean destroyed; + + private final boolean client; + + private int clientVersion; + + private volatile SimpleIDGenerator idGenerator = new SimpleIDGenerator(CHANNEL_ID.USER.id); + + private boolean idGeneratorSynced = false; + + private final Object transferLock = new Object(); + + private final Object failLock = new Object(); + + private volatile boolean executing; + + private final SimpleString nodeID; + + private String clientID; + + // Constructors + // --------------------------------------------------------------------------------- + + /* + * Create a client side connection + */ + public RemotingConnectionImpl(final PacketDecoder packetDecoder, + final Connection transportConnection, + final long blockingCallTimeout, + final long blockingCallFailoverTimeout, + final List<Interceptor> incomingInterceptors, + final List<Interceptor> outgoingInterceptors) + { + this(packetDecoder, transportConnection, blockingCallTimeout, blockingCallFailoverTimeout, incomingInterceptors, outgoingInterceptors, true, null, null); + } + + /* + * Create a server side connection + */ + RemotingConnectionImpl(final PacketDecoder packetDecoder, + final Connection transportConnection, + final List<Interceptor> incomingInterceptors, + final List<Interceptor> outgoingInterceptors, + final Executor executor, + final SimpleString nodeID) + + { + this(packetDecoder, transportConnection, -1, -1, incomingInterceptors, outgoingInterceptors, false, executor, nodeID); + } + + private RemotingConnectionImpl(final PacketDecoder packetDecoder, + final Connection transportConnection, + final long blockingCallTimeout, + final long blockingCallFailoverTimeout, + final List<Interceptor> incomingInterceptors, + final List<Interceptor> outgoingInterceptors, + final boolean client, + final Executor executor, + final SimpleString nodeID) + + { + super(transportConnection, executor); + + this.packetDecoder = packetDecoder; + + this.blockingCallTimeout = blockingCallTimeout; + + this.blockingCallFailoverTimeout = blockingCallFailoverTimeout; + + this.incomingInterceptors = incomingInterceptors; + + this.outgoingInterceptors = outgoingInterceptors; + + this.client = client; + + this.nodeID = nodeID; + + transportConnection.setProtocolConnection(this); + } + + + // RemotingConnection implementation + // ------------------------------------------------------------ + + @Override + public String toString() + { + return "RemotingConnectionImpl [clientID=" + clientID + + ", nodeID=" + + nodeID + + ", transportConnection=" + + getTransportConnection() + + "]"; + } + + /** + * @return the clientVersion + */ + public int getClientVersion() + { + return clientVersion; + } + + /** + * @param clientVersion the clientVersion to set + */ + public void setClientVersion(int clientVersion) + { + this.clientVersion = clientVersion; + } + + public synchronized Channel getChannel(final long channelID, final int confWindowSize) + { + Channel channel = channels.get(channelID); + + if (channel == null) + { + channel = new ChannelImpl(this, channelID, confWindowSize, outgoingInterceptors); + + channels.put(channelID, channel); + } + + return channel; + } + + public synchronized boolean removeChannel(final long channelID) + { + return channels.remove(channelID) != null; + } + + public synchronized void putChannel(final long channelID, final Channel channel) + { + channels.put(channelID, channel); + } + + public void fail(final HornetQException me, String scaleDownTargetNodeID) + { + synchronized (failLock) + { + if (destroyed) + { + return; + } + + destroyed = true; + } + + HornetQClientLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType()); + + + try + { + transportConnection.forceClose(); + } + catch (Throwable e) + { + HornetQClientLogger.LOGGER.warn(e.getMessage(), e); + } + + // Then call the listeners + callFailureListeners(me, scaleDownTargetNodeID); + + callClosingListeners(); + + internalClose(); + + for (Channel channel : channels.values()) + { + channel.returnBlocking(me); + } + } + + public void destroy() + { + synchronized (failLock) + { + if (destroyed) + { + return; + } + + destroyed = true; + } + + internalClose(); + + callClosingListeners(); + } + + public void disconnect(final boolean criticalError) + { + disconnect(null, criticalError); + } + + public void disconnect(String scaleDownNodeID, final boolean criticalError) + { + Channel channel0 = getChannel(ChannelImpl.CHANNEL_ID.PING.id, -1); + + // And we remove all channels from the connection, this ensures no more packets will be processed after this + // method is + // complete + + Set<Channel> allChannels = new HashSet<Channel>(channels.values()); + + if (!criticalError) + { + removeAllChannels(); + } + else + { + // We can't hold a lock if a critical error is happening... + // as other threads will be holding the lock while hanging on IO + channels.clear(); + } + + // Now we are 100% sure that no more packets will be processed we can flush then send the disconnect + + if (!criticalError) + { + for (Channel channel : allChannels) + { + channel.flushConfirmations(); + } + } + Packet disconnect; + + if (channel0.supports(PacketImpl.DISCONNECT_V2)) + { + disconnect = new DisconnectMessage_V2(nodeID, scaleDownNodeID); + } + else + { + disconnect = new DisconnectMessage(nodeID); + } + channel0.sendAndFlush(disconnect); + } + + public long generateChannelID() + { + return idGenerator.generateID(); + } + + public synchronized void syncIDGeneratorSequence(final long id) + { + if (!idGeneratorSynced) + { + idGenerator = new SimpleIDGenerator(id); + + idGeneratorSynced = true; + } + } + + public long getIDGeneratorSequence() + { + return idGenerator.getCurrentID(); + } + + public Object getTransferLock() + { + return transferLock; + } + + public boolean isClient() + { + return client; + } + + public boolean isDestroyed() + { + return destroyed; + } + + public long getBlockingCallTimeout() + { + return blockingCallTimeout; + } + + @Override + public long getBlockingCallFailoverTimeout() + { + return blockingCallFailoverTimeout; + } + + //We flush any confirmations on the connection - this prevents idle bridges for example + //sitting there with many unacked messages + public void flush() + { + synchronized (transferLock) + { + for (Channel channel : channels.values()) + { + channel.flushConfirmations(); + } + } + } + + public HornetQPrincipal getDefaultHornetQPrincipal() + { + return getTransportConnection().getDefaultHornetQPrincipal(); + } + + // Buffer Handler implementation + // ---------------------------------------------------- + public void bufferReceived(final Object connectionID, final HornetQBuffer buffer) + { + try + { + final Packet packet = packetDecoder.decode(buffer); + + if (isTrace) + { + HornetQClientLogger.LOGGER.trace("handling packet " + packet); + } + + if (packet.isAsyncExec() && executor != null) + { + executing = true; + + executor.execute(new Runnable() + { + public void run() + { + try + { + doBufferReceived(packet); + } + catch (Throwable t) + { + HornetQClientLogger.LOGGER.errorHandlingPacket(t, packet); + } + + executing = false; + } + }); + } + else + { + //To prevent out of order execution if interleaving sync and async operations on same connection + while (executing) + { + Thread.yield(); + } + + // Pings must always be handled out of band so we can send pings back to the client quickly + // otherwise they would get in the queue with everything else which might give an intolerable delay + doBufferReceived(packet); + } + + super.bufferReceived(connectionID, buffer); + } + catch (Exception e) + { + HornetQClientLogger.LOGGER.errorDecodingPacket(e); + } + } + + private void doBufferReceived(final Packet packet) + { + if (ChannelImpl.invokeInterceptors(packet, incomingInterceptors, this) != null) + { + return; + } + + synchronized (transferLock) + { + final Channel channel = channels.get(packet.getChannelID()); + + if (channel != null) + { + channel.handlePacket(packet); + } + } + } + + protected void removeAllChannels() + { + // We get the transfer lock first - this ensures no packets are being processed AND + // it's guaranteed no more packets will be processed once this method is complete + synchronized (transferLock) + { + channels.clear(); + } + } + + private void internalClose() + { + // We close the underlying transport connection + getTransportConnection().close(); + + for (Channel channel : channels.values()) + { + channel.close(); + } + } + + public void setClientID(String cID) + { + clientID = cID; + } + + public String getClientID() + { + return clientID; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CheckFailoverMessage.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CheckFailoverMessage.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CheckFailoverMessage.java new file mode 100644 index 0000000..ab1ec8c --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CheckFailoverMessage.java @@ -0,0 +1,49 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.protocol.core.impl.wireformat; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.core.protocol.core.impl.PacketImpl; + +public class CheckFailoverMessage extends PacketImpl +{ + private String nodeID; + + public CheckFailoverMessage(final String nodeID) + { + super(CHECK_FOR_FAILOVER); + this.nodeID = nodeID; + } + + public CheckFailoverMessage() + { + super(CHECK_FOR_FAILOVER); + } + + @Override + public void encodeRest(HornetQBuffer buffer) + { + buffer.writeNullableString(nodeID); + } + + @Override + public void decodeRest(HornetQBuffer buffer) + { + nodeID = buffer.readNullableString(); + } + + public String getNodeID() + { + return nodeID; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CheckFailoverReplyMessage.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CheckFailoverReplyMessage.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CheckFailoverReplyMessage.java new file mode 100644 index 0000000..d2fbc49 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CheckFailoverReplyMessage.java @@ -0,0 +1,56 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.protocol.core.impl.wireformat; + + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.core.protocol.core.impl.PacketImpl; + +public class CheckFailoverReplyMessage extends PacketImpl +{ + private boolean okToFailover; + + public CheckFailoverReplyMessage(boolean okToFailover) + { + super(CHECK_FOR_FAILOVER_REPLY); + this.okToFailover = okToFailover; + } + + public CheckFailoverReplyMessage() + { + super(CHECK_FOR_FAILOVER_REPLY); + } + + @Override + public boolean isResponse() + { + return true; + } + + @Override + public void encodeRest(HornetQBuffer buffer) + { + buffer.writeBoolean(okToFailover); + } + + @Override + public void decodeRest(HornetQBuffer buffer) + { + okToFailover = buffer.readBoolean(); + } + + public boolean isOkToFailover() + { + return okToFailover; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java new file mode 100644 index 0000000..0c3a998 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java @@ -0,0 +1,217 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.protocol.core.impl.wireformat; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.Pair; +import org.apache.activemq6.api.core.TransportConfiguration; +import org.apache.activemq6.core.protocol.core.impl.PacketImpl; + +/** + * @author <a href="mailto:[email protected]">Tim Fox</a> + */ +public class ClusterTopologyChangeMessage extends PacketImpl +{ + protected boolean exit; + + protected String nodeID; + + protected Pair<TransportConfiguration, TransportConfiguration> pair; + + protected boolean last; + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + public ClusterTopologyChangeMessage(final String nodeID, final Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last) + { + super(CLUSTER_TOPOLOGY); + + this.nodeID = nodeID; + + this.pair = pair; + + this.last = last; + + this.exit = false; + } + + public ClusterTopologyChangeMessage(final String nodeID) + { + super(CLUSTER_TOPOLOGY); + + this.exit = true; + + this.nodeID = nodeID; + } + + public ClusterTopologyChangeMessage() + { + super(CLUSTER_TOPOLOGY); + } + + // Public -------------------------------------------------------- + + /** + * @param clusterTopologyV2 + */ + public ClusterTopologyChangeMessage(byte clusterTopologyV2) + { + super(clusterTopologyV2); + } + + public String getNodeID() + { + return nodeID; + } + + public Pair<TransportConfiguration, TransportConfiguration> getPair() + { + return pair; + } + + public boolean isLast() + { + return last; + } + + public boolean isExit() + { + return exit; + } + + @Override + public void encodeRest(final HornetQBuffer buffer) + { + buffer.writeBoolean(exit); + buffer.writeString(nodeID); + if (!exit) + { + if (pair.getA() != null) + { + buffer.writeBoolean(true); + pair.getA().encode(buffer); + } + else + { + buffer.writeBoolean(false); + } + if (pair.getB() != null) + { + buffer.writeBoolean(true); + pair.getB().encode(buffer); + } + else + { + buffer.writeBoolean(false); + } + buffer.writeBoolean(last); + } + } + + @Override + public void decodeRest(final HornetQBuffer buffer) + { + exit = buffer.readBoolean(); + nodeID = buffer.readString(); + if (!exit) + { + boolean hasLive = buffer.readBoolean(); + TransportConfiguration a; + if (hasLive) + { + a = new TransportConfiguration(); + a.decode(buffer); + } + else + { + a = null; + } + boolean hasBackup = buffer.readBoolean(); + TransportConfiguration b; + if (hasBackup) + { + b = new TransportConfiguration(); + b.decode(buffer); + } + else + { + b = null; + } + pair = new Pair<TransportConfiguration, TransportConfiguration>(a, b); + last = buffer.readBoolean(); + } + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + (exit ? 1231 : 1237); + result = prime * result + (last ? 1231 : 1237); + result = prime * result + ((nodeID == null) ? 0 : nodeID.hashCode()); + result = prime * result + ((pair == null) ? 0 : pair.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + { + return true; + } + if (!super.equals(obj)) + { + return false; + } + if (!(obj instanceof ClusterTopologyChangeMessage)) + { + return false; + } + ClusterTopologyChangeMessage other = (ClusterTopologyChangeMessage) obj; + if (exit != other.exit) + { + return false; + } + if (last != other.last) + { + return false; + } + if (nodeID == null) + { + if (other.nodeID != null) + { + return false; + } + } + else if (!nodeID.equals(other.nodeID)) + { + return false; + } + if (pair == null) + { + if (other.pair != null) + { + return false; + } + } + else if (!pair.equals(other.pair)) + { + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V2.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V2.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V2.java new file mode 100644 index 0000000..5a1eeb9 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V2.java @@ -0,0 +1,192 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.protocol.core.impl.wireformat; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.Pair; +import org.apache.activemq6.api.core.TransportConfiguration; + +/** + * Clebert Suconic + */ +public class ClusterTopologyChangeMessage_V2 extends ClusterTopologyChangeMessage +{ + protected long uniqueEventID; + protected String backupGroupName; + + public ClusterTopologyChangeMessage_V2(final long uniqueEventID, final String nodeID, final String backupGroupName, + final Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last) + { + super(CLUSTER_TOPOLOGY_V2); + + this.nodeID = nodeID; + + this.pair = pair; + + this.last = last; + + this.exit = false; + + this.uniqueEventID = uniqueEventID; + + this.backupGroupName = backupGroupName; + } + + public ClusterTopologyChangeMessage_V2(final long uniqueEventID, final String nodeID) + { + super(CLUSTER_TOPOLOGY_V2); + + this.exit = true; + + this.nodeID = nodeID; + + this.uniqueEventID = uniqueEventID; + } + + public ClusterTopologyChangeMessage_V2() + { + super(CLUSTER_TOPOLOGY_V2); + } + + public ClusterTopologyChangeMessage_V2(byte clusterTopologyV3) + { + super(clusterTopologyV3); + } + + /** + * @return the uniqueEventID + */ + public long getUniqueEventID() + { + return uniqueEventID; + } + + public String getBackupGroupName() + { + return backupGroupName; + } + + @Override + public void encodeRest(final HornetQBuffer buffer) + { + buffer.writeBoolean(exit); + buffer.writeString(nodeID); + buffer.writeLong(uniqueEventID); + if (!exit) + { + if (pair.getA() != null) + { + buffer.writeBoolean(true); + pair.getA().encode(buffer); + } + else + { + buffer.writeBoolean(false); + } + if (pair.getB() != null) + { + buffer.writeBoolean(true); + pair.getB().encode(buffer); + } + else + { + buffer.writeBoolean(false); + } + buffer.writeBoolean(last); + } + buffer.writeNullableString(backupGroupName); + } + + @Override + public void decodeRest(final HornetQBuffer buffer) + { + exit = buffer.readBoolean(); + nodeID = buffer.readString(); + uniqueEventID = buffer.readLong(); + if (!exit) + { + boolean hasLive = buffer.readBoolean(); + TransportConfiguration a; + if (hasLive) + { + a = new TransportConfiguration(); + a.decode(buffer); + } + else + { + a = null; + } + boolean hasBackup = buffer.readBoolean(); + TransportConfiguration b; + if (hasBackup) + { + b = new TransportConfiguration(); + b.decode(buffer); + } + else + { + b = null; + } + pair = new Pair<TransportConfiguration, TransportConfiguration>(a, b); + last = buffer.readBoolean(); + } + if (buffer.readableBytes() > 0) + { + backupGroupName = buffer.readNullableString(); + } + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + ((backupGroupName == null) ? 0 : backupGroupName.hashCode()); + result = prime * result + (int) (uniqueEventID ^ (uniqueEventID >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + { + return true; + } + if (!super.equals(obj)) + { + return false; + } + if (!(obj instanceof ClusterTopologyChangeMessage_V2)) + { + return false; + } + ClusterTopologyChangeMessage_V2 other = (ClusterTopologyChangeMessage_V2) obj; + if (uniqueEventID != other.uniqueEventID) + { + return false; + } + if (backupGroupName == null) + { + if (other.backupGroupName != null) + { + return false; + } + } + else if (!backupGroupName.equals(other.backupGroupName)) + { + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V3.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V3.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V3.java new file mode 100644 index 0000000..7a78e6c --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V3.java @@ -0,0 +1,108 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.protocol.core.impl.wireformat; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.Pair; +import org.apache.activemq6.api.core.TransportConfiguration; + +/** + * @author Justin Bertram + */ +public class ClusterTopologyChangeMessage_V3 extends ClusterTopologyChangeMessage_V2 +{ + private String scaleDownGroupName; + + public ClusterTopologyChangeMessage_V3(final long uniqueEventID, final String nodeID, final String backupGroupName, final String scaleDownGroupName, + final Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last) + { + super(CLUSTER_TOPOLOGY_V3); + + this.nodeID = nodeID; + + this.pair = pair; + + this.last = last; + + this.exit = false; + + this.uniqueEventID = uniqueEventID; + + this.backupGroupName = backupGroupName; + + this.scaleDownGroupName = scaleDownGroupName; + } + + public ClusterTopologyChangeMessage_V3() + { + super(CLUSTER_TOPOLOGY_V3); + } + + public String getScaleDownGroupName() + { + return scaleDownGroupName; + } + + @Override + public void encodeRest(final HornetQBuffer buffer) + { + super.encodeRest(buffer); + buffer.writeNullableString(scaleDownGroupName); + } + + @Override + public void decodeRest(final HornetQBuffer buffer) + { + super.decodeRest(buffer); + scaleDownGroupName = buffer.readNullableString(); + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + ((scaleDownGroupName == null) ? 0 : scaleDownGroupName.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + { + return true; + } + if (!super.equals(obj)) + { + return false; + } + if (!(obj instanceof ClusterTopologyChangeMessage_V3)) + { + return false; + } + ClusterTopologyChangeMessage_V3 other = (ClusterTopologyChangeMessage_V3) obj; + if (scaleDownGroupName == null) + { + if (other.scaleDownGroupName != null) + { + return false; + } + } + else if (!scaleDownGroupName.equals(other.scaleDownGroupName)) + { + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CreateQueueMessage.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CreateQueueMessage.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CreateQueueMessage.java new file mode 100644 index 0000000..2be3e02 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CreateQueueMessage.java @@ -0,0 +1,205 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.protocol.core.impl.wireformat; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.core.protocol.core.impl.PacketImpl; + +/** + * @author <a href="mailto:[email protected]">Tim Fox</a> + + */ +public class CreateQueueMessage extends PacketImpl +{ + + private SimpleString address; + + private SimpleString queueName; + + private SimpleString filterString; + + private boolean durable; + + private boolean temporary; + + private boolean requiresResponse; + + public CreateQueueMessage(final SimpleString address, + final SimpleString queueName, + final SimpleString filterString, + final boolean durable, + final boolean temporary, + final boolean requiresResponse) + { + this(); + + this.address = address; + this.queueName = queueName; + this.filterString = filterString; + this.durable = durable; + this.temporary = temporary; + this.requiresResponse = requiresResponse; + } + + public CreateQueueMessage() + { + super(CREATE_QUEUE); + } + + // Public -------------------------------------------------------- + + @Override + public String toString() + { + StringBuffer buff = new StringBuffer(getParentString()); + buff.append(", address=" + address); + buff.append(", queueName=" + queueName); + buff.append(", filterString=" + filterString); + buff.append(", durable=" + durable); + buff.append(", temporary=" + temporary); + buff.append("]"); + return buff.toString(); + } + + public SimpleString getAddress() + { + return address; + } + + public SimpleString getQueueName() + { + return queueName; + } + + public SimpleString getFilterString() + { + return filterString; + } + + public boolean isDurable() + { + return durable; + } + + public boolean isTemporary() + { + return temporary; + } + + public boolean isRequiresResponse() + { + return requiresResponse; + } + + public void setAddress(SimpleString address) + { + this.address = address; + } + + public void setQueueName(SimpleString queueName) + { + this.queueName = queueName; + } + + public void setFilterString(SimpleString filterString) + { + this.filterString = filterString; + } + + public void setDurable(boolean durable) + { + this.durable = durable; + } + + public void setTemporary(boolean temporary) + { + this.temporary = temporary; + } + + @Override + public void encodeRest(final HornetQBuffer buffer) + { + buffer.writeSimpleString(address); + buffer.writeSimpleString(queueName); + buffer.writeNullableSimpleString(filterString); + buffer.writeBoolean(durable); + buffer.writeBoolean(temporary); + buffer.writeBoolean(requiresResponse); + } + + @Override + public void decodeRest(final HornetQBuffer buffer) + { + address = buffer.readSimpleString(); + queueName = buffer.readSimpleString(); + filterString = buffer.readNullableSimpleString(); + durable = buffer.readBoolean(); + temporary = buffer.readBoolean(); + requiresResponse = buffer.readBoolean(); + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + ((address == null) ? 0 : address.hashCode()); + result = prime * result + (durable ? 1231 : 1237); + result = prime * result + ((filterString == null) ? 0 : filterString.hashCode()); + result = prime * result + ((queueName == null) ? 0 : queueName.hashCode()); + result = prime * result + (requiresResponse ? 1231 : 1237); + result = prime * result + (temporary ? 1231 : 1237); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + return true; + if (!super.equals(obj)) + return false; + if (!(obj instanceof CreateQueueMessage)) + return false; + CreateQueueMessage other = (CreateQueueMessage)obj; + if (address == null) + { + if (other.address != null) + return false; + } + else if (!address.equals(other.address)) + return false; + if (durable != other.durable) + return false; + if (filterString == null) + { + if (other.filterString != null) + return false; + } + else if (!filterString.equals(other.filterString)) + return false; + if (queueName == null) + { + if (other.queueName != null) + return false; + } + else if (!queueName.equals(other.queueName)) + return false; + if (requiresResponse != other.requiresResponse) + return false; + if (temporary != other.temporary) + return false; + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CreateSessionMessage.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CreateSessionMessage.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CreateSessionMessage.java new file mode 100644 index 0000000..ad871b3 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CreateSessionMessage.java @@ -0,0 +1,272 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.protocol.core.impl.wireformat; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.core.protocol.core.impl.PacketImpl; + +/** + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Jeff Mesnil</a>. + * + */ +public class CreateSessionMessage extends PacketImpl +{ + private String name; + + private long sessionChannelID; + + private int version; + + private String username; + + private String password; + + private int minLargeMessageSize; + + private boolean xa; + + private boolean autoCommitSends; + + private boolean autoCommitAcks; + + private boolean preAcknowledge; + + private int windowSize; + + private String defaultAddress; + + public CreateSessionMessage(final String name, + final long sessionChannelID, + final int version, + final String username, + final String password, + final int minLargeMessageSize, + final boolean xa, + final boolean autoCommitSends, + final boolean autoCommitAcks, + final boolean preAcknowledge, + final int windowSize, + final String defaultAddress) + { + super(CREATESESSION); + + this.name = name; + + this.sessionChannelID = sessionChannelID; + + this.version = version; + + this.username = username; + + this.password = password; + + this.minLargeMessageSize = minLargeMessageSize; + + this.xa = xa; + + this.autoCommitSends = autoCommitSends; + + this.autoCommitAcks = autoCommitAcks; + + this.windowSize = windowSize; + + this.preAcknowledge = preAcknowledge; + + this.defaultAddress = defaultAddress; + } + + public CreateSessionMessage() + { + super(CREATESESSION); + } + + // Public -------------------------------------------------------- + + public String getName() + { + return name; + } + + public long getSessionChannelID() + { + return sessionChannelID; + } + + public int getVersion() + { + return version; + } + + public String getUsername() + { + return username; + } + + public String getPassword() + { + return password; + } + + public boolean isXA() + { + return xa; + } + + public boolean isAutoCommitSends() + { + return autoCommitSends; + } + + public boolean isAutoCommitAcks() + { + return autoCommitAcks; + } + + public boolean isPreAcknowledge() + { + return preAcknowledge; + } + + public int getWindowSize() + { + return windowSize; + } + + public String getDefaultAddress() + { + return defaultAddress; + } + + @Override + public void encodeRest(final HornetQBuffer buffer) + { + buffer.writeString(name); + buffer.writeLong(sessionChannelID); + buffer.writeInt(version); + buffer.writeNullableString(username); + buffer.writeNullableString(password); + buffer.writeInt(minLargeMessageSize); + buffer.writeBoolean(xa); + buffer.writeBoolean(autoCommitSends); + buffer.writeBoolean(autoCommitAcks); + buffer.writeInt(windowSize); + buffer.writeBoolean(preAcknowledge); + buffer.writeNullableString(defaultAddress); + } + + @Override + public void decodeRest(final HornetQBuffer buffer) + { + name = buffer.readString(); + sessionChannelID = buffer.readLong(); + version = buffer.readInt(); + username = buffer.readNullableString(); + password = buffer.readNullableString(); + minLargeMessageSize = buffer.readInt(); + xa = buffer.readBoolean(); + autoCommitSends = buffer.readBoolean(); + autoCommitAcks = buffer.readBoolean(); + windowSize = buffer.readInt(); + preAcknowledge = buffer.readBoolean(); + defaultAddress = buffer.readNullableString(); + } + + @Override + public final boolean isRequiresConfirmations() + { + return false; + } + + public int getMinLargeMessageSize() + { + return minLargeMessageSize; + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + (autoCommitAcks ? 1231 : 1237); + result = prime * result + (autoCommitSends ? 1231 : 1237); + result = prime * result + ((defaultAddress == null) ? 0 : defaultAddress.hashCode()); + result = prime * result + minLargeMessageSize; + result = prime * result + ((name == null) ? 0 : name.hashCode()); + result = prime * result + ((password == null) ? 0 : password.hashCode()); + result = prime * result + (preAcknowledge ? 1231 : 1237); + result = prime * result + (int)(sessionChannelID ^ (sessionChannelID >>> 32)); + result = prime * result + ((username == null) ? 0 : username.hashCode()); + result = prime * result + version; + result = prime * result + windowSize; + result = prime * result + (xa ? 1231 : 1237); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + return true; + if (!super.equals(obj)) + return false; + if (!(obj instanceof CreateSessionMessage)) + return false; + CreateSessionMessage other = (CreateSessionMessage)obj; + if (autoCommitAcks != other.autoCommitAcks) + return false; + if (autoCommitSends != other.autoCommitSends) + return false; + if (defaultAddress == null) + { + if (other.defaultAddress != null) + return false; + } + else if (!defaultAddress.equals(other.defaultAddress)) + return false; + if (minLargeMessageSize != other.minLargeMessageSize) + return false; + if (name == null) + { + if (other.name != null) + return false; + } + else if (!name.equals(other.name)) + return false; + if (password == null) + { + if (other.password != null) + return false; + } + else if (!password.equals(other.password)) + return false; + if (preAcknowledge != other.preAcknowledge) + return false; + if (sessionChannelID != other.sessionChannelID) + return false; + if (username == null) + { + if (other.username != null) + return false; + } + else if (!username.equals(other.username)) + return false; + if (version != other.version) + return false; + if (windowSize != other.windowSize) + return false; + if (xa != other.xa) + return false; + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CreateSessionResponseMessage.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CreateSessionResponseMessage.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CreateSessionResponseMessage.java new file mode 100644 index 0000000..76a5f54 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CreateSessionResponseMessage.java @@ -0,0 +1,91 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.protocol.core.impl.wireformat; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.core.protocol.core.impl.PacketImpl; + +/** + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Jeff Mesnil</a>. + * + */ +public class CreateSessionResponseMessage extends PacketImpl +{ + private int serverVersion; + + public CreateSessionResponseMessage(final int serverVersion) + { + super(CREATESESSION_RESP); + + this.serverVersion = serverVersion; + } + + public CreateSessionResponseMessage() + { + super(CREATESESSION_RESP); + } + + @Override + public boolean isResponse() + { + return true; + } + + public int getServerVersion() + { + return serverVersion; + } + + @Override + public void encodeRest(final HornetQBuffer buffer) + { + buffer.writeInt(serverVersion); + } + + @Override + public void decodeRest(final HornetQBuffer buffer) + { + serverVersion = buffer.readInt(); + } + + @Override + public final boolean isRequiresConfirmations() + { + return false; + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + serverVersion; + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + return true; + if (!super.equals(obj)) + return false; + if (!(obj instanceof CreateSessionResponseMessage)) + return false; + CreateSessionResponseMessage other = (CreateSessionResponseMessage)obj; + if (serverVersion != other.serverVersion) + return false; + return true; + } +}
