http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ConnectionInfo.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ConnectionInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ConnectionInfo.java index 618fc43..8a30711 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ConnectionInfo.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ConnectionInfo.java @@ -16,25 +16,50 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * @openwire:marshaller code="3" */ +@OpenWireType(typeCode = 3) public class ConnectionInfo extends BaseCommand { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONNECTION_INFO; + @OpenWireProperty(version = 1, sequence = 1, cached = true) protected ConnectionId connectionId; + + @OpenWireProperty(version = 1, sequence = 2) protected String clientId; - protected String clientIp; - protected String userName; + + @OpenWireProperty(version = 1, sequence = 3) protected String password; + + @OpenWireProperty(version = 1, sequence = 4) + protected String userName; + + @OpenWireProperty(version = 1, sequence = 5, cached = true) protected BrokerId[] brokerPath; + + @OpenWireProperty(version = 1, sequence = 6) protected boolean brokerMasterConnector; + + @OpenWireProperty(version = 1, sequence = 7) protected boolean manageable; + + @OpenWireProperty(version = 2, sequence = 8) protected boolean clientMaster = true; + + @OpenWireProperty(version = 6, sequence = 9) protected boolean faultTolerant = false; + + @OpenWireProperty(version = 6, sequence = 10) protected boolean failoverReconnect; + @OpenWireProperty(version = 8, sequence = 11) + protected String clientIp; + public ConnectionInfo() { }
http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ConsumerControl.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ConsumerControl.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ConsumerControl.java index b85d224..5c254c1 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ConsumerControl.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ConsumerControl.java @@ -16,22 +16,39 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * Used by the Broker to control various aspects of a consumer instance. * * @openwire:marshaller code="17" */ +@OpenWireType(typeCode = 17) public class ConsumerControl extends BaseCommand { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONSUMER_CONTROL; - protected ConsumerId consumerId; + @OpenWireProperty(version = 6, sequence = 1) + protected OpenWireDestination destination; + + @OpenWireProperty(version = 1, sequence = 2) protected boolean close; - protected boolean stop; - protected boolean start; - protected boolean flush; + + @OpenWireProperty(version = 1, sequence = 3) + protected ConsumerId consumerId; + + @OpenWireProperty(version = 1, sequence = 4) protected int prefetch; - protected OpenWireDestination destination; + + @OpenWireProperty(version = 2, sequence = 5) + protected boolean flush; + + @OpenWireProperty(version = 2, sequence = 6) + protected boolean start; + + @OpenWireProperty(version = 2, sequence = 7) + protected boolean stop; /** * @openwire:property version=6 http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ConsumerId.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ConsumerId.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ConsumerId.java index 8556f75..a35470b 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ConsumerId.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ConsumerId.java @@ -16,19 +16,34 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireExtension; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * @openwire:marshaller code="122" */ +@OpenWireType(typeCode = 122) public class ConsumerId implements DataStructure { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONSUMER_ID; + @OpenWireProperty(version = 1, sequence = 1) protected String connectionId; + + @OpenWireProperty(version = 1, sequence = 2) protected long sessionId; + + @OpenWireProperty(version = 1, sequence = 3) protected long value; + @OpenWireExtension protected transient int hashCode; + + @OpenWireExtension protected transient String key; + + @OpenWireExtension protected transient SessionId parentId; public ConsumerId() { http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ConsumerInfo.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ConsumerInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ConsumerInfo.java index 0fae7d4..5f426ad 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ConsumerInfo.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ConsumerInfo.java @@ -19,9 +19,14 @@ package org.apache.activemq.openwire.commands; import java.util.ArrayList; import java.util.List; +import org.apache.activemq.openwire.annotations.OpenWireExtension; +import org.apache.activemq.openwire.annotations.OpenWireProperty; +import org.apache.activemq.openwire.annotations.OpenWireType; + /** * @openwire:marshaller code="5" */ +@OpenWireType(typeCode = 5) public class ConsumerInfo extends BaseCommand { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONSUMER_INFO; @@ -31,28 +36,65 @@ public class ConsumerInfo extends BaseCommand { public static final byte NETWORK_CONSUMER_PRIORITY = -5; public static final byte LOW_PRIORITY = -10; + @OpenWireProperty(version = 1, sequence = 1, cached = true) protected ConsumerId consumerId; + + @OpenWireProperty(version = 1, sequence = 2) + protected boolean browser; + + @OpenWireProperty(version = 1, sequence = 3, cached = true) protected OpenWireDestination destination; + + @OpenWireProperty(version = 1, sequence = 4) protected int prefetchSize; + + @OpenWireProperty(version = 1, sequence = 5) protected int maximumPendingMessageLimit; - protected boolean browser; + + @OpenWireProperty(version = 1, sequence = 6) protected boolean dispatchAsync; + + @OpenWireProperty(version = 1, sequence = 7) protected String selector; + + @OpenWireProperty(version = 10, sequence = 8) protected String clientId; + + @OpenWireProperty(version = 1, sequence = 9) protected String subscriptionName; + + @OpenWireProperty(version = 1, sequence = 10) protected boolean noLocal; + + @OpenWireProperty(version = 1, sequence = 11) protected boolean exclusive; + + @OpenWireProperty(version = 1, sequence = 12) protected boolean retroactive; + + @OpenWireProperty(version = 1, sequence = 13) protected byte priority; + + @OpenWireProperty(version = 1, sequence = 14, cached = true) protected BrokerId[] brokerPath; + + @OpenWireProperty(version = 1, sequence = 15) + protected Object additionalPredicate; + + @OpenWireProperty(version = 1, sequence = 16, serialized = false) + protected boolean networkSubscription; + + @OpenWireProperty(version = 1, sequence = 17) protected boolean optimizedAcknowledge; + + @OpenWireProperty(version = 1, sequence = 18) protected boolean noRangeAcks; - // Network connector values should not be serialized. - protected transient boolean networkSubscription; - protected transient List<ConsumerId> networkConsumerIds; + @OpenWireProperty(version = 4, sequence = 19, serialized = false) + protected transient ConsumerId[] networkConsumerPath; - protected Object additionalPredicate; + @OpenWireExtension(serialized = false) + protected transient List<ConsumerId> networkConsumerIds; public ConsumerInfo() { } @@ -88,6 +130,12 @@ public class ConsumerInfo extends BaseCommand { info.priority = priority; info.brokerPath = brokerPath; info.networkSubscription = networkSubscription; + if (networkConsumerIds != null) { + if (info.networkConsumerIds == null) { + info.networkConsumerIds = new ArrayList<ConsumerId>(); + } + info.networkConsumerIds.addAll(networkConsumerIds); + } } public boolean isDurable() { http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ControlCommand.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ControlCommand.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ControlCommand.java index 546e19c..61feb6e 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ControlCommand.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ControlCommand.java @@ -16,15 +16,20 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * Used by the Broker to send a specific named command to the client. * * @openwire:marshaller code="14" */ +@OpenWireType(typeCode = 14) public class ControlCommand extends BaseCommand { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONTROL_COMMAND; + @OpenWireProperty(version = 1, sequence = 1) private String command; @Override http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/DataArrayResponse.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/DataArrayResponse.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/DataArrayResponse.java index 9ae776a..985ca7f 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/DataArrayResponse.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/DataArrayResponse.java @@ -16,13 +16,18 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * @openwire:marshaller code="33" */ +@OpenWireType(typeCode = 33) public class DataArrayResponse extends Response { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.DATA_ARRAY_RESPONSE; + @OpenWireProperty(version = 1, sequence = 1) DataStructure data[]; public DataArrayResponse() { http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/DataResponse.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/DataResponse.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/DataResponse.java index af0a16f..71fdfe6 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/DataResponse.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/DataResponse.java @@ -16,13 +16,18 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * @openwire:marshaller code="32" */ +@OpenWireType(typeCode = 32) public class DataResponse extends Response { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.DATA_RESPONSE; + @OpenWireProperty(version = 1, sequence = 1) DataStructure data; public DataResponse() { http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/DestinationInfo.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/DestinationInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/DestinationInfo.java index 8251482..322aa0b 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/DestinationInfo.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/DestinationInfo.java @@ -18,11 +18,15 @@ package org.apache.activemq.openwire.commands; import java.io.IOException; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * Used to create and destroy destinations on the broker. * * @openwire:marshaller code="8" */ +@OpenWireType(typeCode = 8) public class DestinationInfo extends BaseCommand { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.DESTINATION_INFO; @@ -30,10 +34,19 @@ public class DestinationInfo extends BaseCommand { public static final byte ADD_OPERATION_TYPE = 0; public static final byte REMOVE_OPERATION_TYPE = 1; + @OpenWireProperty(version = 1, sequence = 1, cached = true) protected ConnectionId connectionId; + + @OpenWireProperty(version = 1, sequence = 2, cached = true) protected OpenWireDestination destination; + + @OpenWireProperty(version = 1, sequence = 3) protected byte operationType; + + @OpenWireProperty(version = 1, sequence = 4) protected long timeout; + + @OpenWireProperty(version = 1, sequence = 5) protected BrokerId[] brokerPath; public DestinationInfo() { http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/DiscoveryEvent.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/DiscoveryEvent.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/DiscoveryEvent.java index 590ed5e..188d355 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/DiscoveryEvent.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/DiscoveryEvent.java @@ -16,16 +16,23 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * Represents a discovery event containing the details of the service * * @openwire:marshaller code="40" */ +@OpenWireType(typeCode = 40) public class DiscoveryEvent implements DataStructure { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.DISCOVERY_EVENT; + @OpenWireProperty(version = 1, sequence = 1) protected String serviceName; + + @OpenWireProperty(version = 1, sequence = 2) protected String brokerName; public DiscoveryEvent() { http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ExceptionResponse.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ExceptionResponse.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ExceptionResponse.java index 07789ac..e99ff81 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ExceptionResponse.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ExceptionResponse.java @@ -16,13 +16,18 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * @openwire:marshaller code="31" */ +@OpenWireType(typeCode = 31) public class ExceptionResponse extends Response { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.EXCEPTION_RESPONSE; + @OpenWireProperty(version = 1, sequence = 1) Throwable exception; public ExceptionResponse() { http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/FlushCommand.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/FlushCommand.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/FlushCommand.java index 6625ff7..c238ae6 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/FlushCommand.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/FlushCommand.java @@ -16,11 +16,14 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; + /** * An indication to the transport layer that a flush is required. * * @openwire:marshaller code="15" */ +@OpenWireType(typeCode = 15) public class FlushCommand extends BaseCommand { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.FLUSH_COMMAND; http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/IntegerResponse.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/IntegerResponse.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/IntegerResponse.java index f33dc02..0be4fd2 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/IntegerResponse.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/IntegerResponse.java @@ -16,13 +16,18 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * @openwire:marshaller code="34" */ +@OpenWireType(typeCode = 34) public class IntegerResponse extends Response { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.INTEGER_RESPONSE; + @OpenWireProperty(version = 1, sequence = 1) int result; public IntegerResponse() { http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalQueueAck.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalQueueAck.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalQueueAck.java index 4594461..b70d765 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalQueueAck.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalQueueAck.java @@ -16,14 +16,21 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * @openwire:marshaller code="52" */ +@OpenWireType(typeCode = 52) public class JournalQueueAck implements DataStructure { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.JOURNAL_REMOVE; + @OpenWireProperty(version = 1, sequence = 1) OpenWireDestination destination; + + @OpenWireProperty(version = 1, sequence = 2) MessageAck messageAck; @Override http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalTopicAck.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalTopicAck.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalTopicAck.java index 032dff0..e3ea315 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalTopicAck.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalTopicAck.java @@ -16,18 +16,33 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * @openwire:marshaller code="50" */ +@OpenWireType(typeCode = 50) public class JournalTopicAck implements DataStructure { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.JOURNAL_ACK; + @OpenWireProperty(version = 1, sequence = 1) protected OpenWireDestination destination; - protected String clientId; - protected String subscritionName; + + @OpenWireProperty(version = 1, sequence = 2) protected MessageId messageId; + + @OpenWireProperty(version = 1, sequence = 3) protected long messageSequenceId; + + @OpenWireProperty(version = 1, sequence = 4) + protected String subscritionName; + + @OpenWireProperty(version = 1, sequence = 5) + protected String clientId; + + @OpenWireProperty(version = 1, sequence = 6) protected TransactionId transactionId; @Override http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalTrace.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalTrace.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalTrace.java index ebc4746..b7b85b3 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalTrace.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalTrace.java @@ -16,13 +16,18 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * @openwire:marshaller code="53" */ +@OpenWireType(typeCode = 53) public class JournalTrace implements DataStructure { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.JOURNAL_TRACE; + @OpenWireProperty(version = 1, sequence = 1) private String message; public JournalTrace() { http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalTransaction.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalTransaction.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalTransaction.java index 8df4ccf..a1ddc4f 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalTransaction.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalTransaction.java @@ -16,9 +16,13 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * @openwire:marshaller code="54" */ +@OpenWireType(typeCode = 54) public class JournalTransaction implements DataStructure { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.JOURNAL_TRANSACTION; @@ -29,9 +33,14 @@ public class JournalTransaction implements DataStructure { public static final byte LOCAL_COMMIT = 4; public static final byte LOCAL_ROLLBACK = 5; + @OpenWireProperty(version = 1, sequence = 1) + public TransactionId transactionId; + + @OpenWireProperty(version = 1, sequence = 2) public byte type; + + @OpenWireProperty(version = 1, sequence = 3) public boolean wasPrepared; - public TransactionId transactionId; public JournalTransaction(byte type, TransactionId transactionId, boolean wasPrepared) { this.type = type; http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/KeepAliveInfo.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/KeepAliveInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/KeepAliveInfo.java index 270475b..96a68dd 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/KeepAliveInfo.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/KeepAliveInfo.java @@ -16,9 +16,12 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; + /** * @openwire:marshaller code="10" */ +@OpenWireType(typeCode = 10) public class KeepAliveInfo extends BaseCommand { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.KEEP_ALIVE_INFO; http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/LastPartialCommand.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/LastPartialCommand.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/LastPartialCommand.java index cc49d63..c6540fc 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/LastPartialCommand.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/LastPartialCommand.java @@ -16,11 +16,14 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; + /** * Represents the end marker of a stream of {@link PartialCommand} instances. * * @openwire:marshaller code="61" */ +@OpenWireType(typeCode = 61) public class LastPartialCommand extends PartialCommand { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PARTIAL_LAST_COMMAND; http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/LocalTransactionId.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/LocalTransactionId.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/LocalTransactionId.java index ed5f5d4..34f71b5 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/LocalTransactionId.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/LocalTransactionId.java @@ -16,18 +16,29 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireExtension; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * @openwire:marshaller code="111" * */ +@OpenWireType(typeCode = 111) public class LocalTransactionId extends TransactionId implements Comparable<LocalTransactionId> { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.OPENWIRE_LOCAL_TRANSACTION_ID; - protected ConnectionId connectionId; + @OpenWireProperty(version = 1, sequence = 1) protected long value; + @OpenWireProperty(version = 1, sequence = 2, cached = true) + protected ConnectionId connectionId; + + @OpenWireExtension private transient String transactionKey; + + @OpenWireExtension private transient int hashCode; public LocalTransactionId() { http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/Message.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/Message.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/Message.java index b691f93..3c6c57b 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/Message.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/Message.java @@ -30,6 +30,9 @@ import java.util.zip.InflaterInputStream; import javax.jms.JMSException; +import org.apache.activemq.openwire.annotations.OpenWireExtension; +import org.apache.activemq.openwire.annotations.OpenWireProperty; +import org.apache.activemq.openwire.annotations.OpenWireType; import org.apache.activemq.openwire.codec.OpenWireFormat; import org.apache.activemq.openwire.utils.ExceptionSupport; import org.apache.activemq.openwire.utils.OpenWireMarshallingSupport; @@ -43,6 +46,7 @@ import org.fusesource.hawtbuf.UTF8Buffer; * * @openwire:marshaller */ +@OpenWireType(typeCode = 0, marshalAware = true) public abstract class Message extends BaseCommand implements MarshallAware { public static final String ORIGINAL_EXPIRATION = "originalExpiration"; @@ -52,44 +56,102 @@ public abstract class Message extends BaseCommand implements MarshallAware { */ public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024; - protected MessageId messageId; - protected OpenWireDestination originalDestination; - protected TransactionId originalTransactionId; - + @OpenWireProperty(version = 1, sequence = 1, cached = true) protected ProducerId producerId; + + @OpenWireProperty(version = 1, sequence = 2, cached = true) protected OpenWireDestination destination; + + @OpenWireProperty(version = 1, sequence = 3, cached = true) protected TransactionId transactionId; - protected long expiration; - protected long timestamp; - protected long arrival; - protected long brokerInTime; - protected long brokerOutTime; + @OpenWireProperty(version = 1, sequence = 4, cached = true) + protected OpenWireDestination originalDestination; + + @OpenWireProperty(version = 1, sequence = 5) + protected MessageId messageId; + + @OpenWireProperty(version = 1, sequence = 6, cached = true) + protected TransactionId originalTransactionId; + + @OpenWireProperty(version = 1, sequence = 7) + protected String groupId; + + @OpenWireProperty(version = 1, sequence = 8) + protected int groupSequence; + + @OpenWireProperty(version = 1, sequence = 9) protected String correlationId; - protected OpenWireDestination replyTo; + + @OpenWireProperty(version = 1, sequence = 10) protected boolean persistent; - protected String type; + + @OpenWireProperty(version = 1, sequence = 11) + protected long expiration; + + @OpenWireProperty(version = 1, sequence = 12) protected byte priority; - protected String groupId; - protected int groupSequence; - protected ConsumerId targetConsumerId; - protected boolean compressed; - protected String userId; + @OpenWireProperty(version = 1, sequence = 13) + protected OpenWireDestination replyTo; + + @OpenWireProperty(version = 1, sequence = 14) + protected long timestamp; + + @OpenWireProperty(version = 1, sequence = 15) + protected String type; + + @OpenWireProperty(version = 1, sequence = 16) protected Buffer content; + + @OpenWireProperty(version = 1, sequence = 17) protected Buffer marshalledProperties; + + @OpenWireProperty(version = 1, sequence = 18) protected DataStructure dataStructure; + + @OpenWireProperty(version = 1, sequence = 19, cached = true) + protected ConsumerId targetConsumerId; + + @OpenWireProperty(version = 1, sequence = 20) + protected boolean compressed; + + @OpenWireProperty(version = 1, sequence = 21) protected int redeliveryCounter; - protected int size; - protected Map<String, Object> properties; + @OpenWireProperty(version = 1, sequence = 22, cached = true) + private BrokerId[] brokerPath; + + @OpenWireProperty(version = 1, sequence = 23) + protected long arrival; + + @OpenWireProperty(version = 1, sequence = 24) + protected String userId; + + @OpenWireProperty(version = 1, sequence = 25, serialized = false) protected transient boolean recievedByDFBridge; + + @OpenWireProperty(version = 2, sequence = 26, cached = true) protected boolean droppable; - protected boolean jmsXGroupFirstForConsumer; - private BrokerId[] brokerPath; + @OpenWireProperty(version = 3, sequence = 27, cached = true) private BrokerId[] cluster; + @OpenWireProperty(version = 3, sequence = 28) + protected long brokerInTime; + + @OpenWireProperty(version = 3, sequence = 29) + protected long brokerOutTime; + + @OpenWireProperty(version = 10, sequence = 30) + protected boolean jmsXGroupFirstForConsumer; + + @OpenWireExtension(serialized = true) + protected int size; + + @OpenWireExtension(serialized = true) + protected Map<String, Object> properties; + public abstract Message copy(); public abstract void clearBody() throws JMSException; public abstract void storeContent(); http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageAck.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageAck.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageAck.java index 783b048..7c93914 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageAck.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageAck.java @@ -16,9 +16,14 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireExtension; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * @openwire:marshaller code="22" */ +@OpenWireType(typeCode = 22) public class MessageAck extends BaseCommand { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_ACK; @@ -64,15 +69,31 @@ public class MessageAck extends BaseCommand { */ public static final byte EXPIRED_ACK_TYPE = 6; - protected byte ackType; + @OpenWireProperty(version = 1, sequence = 1, cached = true) + protected OpenWireDestination destination; + + @OpenWireProperty(version = 1, sequence = 2, cached = true) + protected TransactionId transactionId; + + @OpenWireProperty(version = 1, sequence = 3, cached = true) protected ConsumerId consumerId; + + @OpenWireProperty(version = 1, sequence = 4) + protected byte ackType; + + @OpenWireProperty(version = 1, sequence = 5) protected MessageId firstMessageId; + + @OpenWireProperty(version = 1, sequence = 6) protected MessageId lastMessageId; - protected OpenWireDestination destination; - protected TransactionId transactionId; + + @OpenWireProperty(version = 1, sequence = 7) protected int messageCount; + + @OpenWireProperty(version = 7, sequence = 8) protected Throwable poisonCause; + @OpenWireExtension protected transient String consumerKey; public MessageAck() { http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageDispatch.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageDispatch.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageDispatch.java index 6dfe04d..01b28b4 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageDispatch.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageDispatch.java @@ -16,16 +16,27 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * @openwire:marshaller code="21" */ +@OpenWireType(typeCode = 21) public class MessageDispatch extends BaseCommand { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_DISPATCH; + @OpenWireProperty(version = 1, sequence = 1, cached = true) protected ConsumerId consumerId; + + @OpenWireProperty(version = 1, sequence = 2, cached = true) protected OpenWireDestination destination; + + @OpenWireProperty(version = 1, sequence = 3) protected Message message; + + @OpenWireProperty(version = 1, sequence = 4) protected int redeliveryCounter; @Override http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageDispatchNotification.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageDispatchNotification.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageDispatchNotification.java index 91ae1aa..98ed7ac 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageDispatchNotification.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageDispatchNotification.java @@ -16,18 +16,29 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * @openwire:marshaller code="90" */ +@OpenWireType(typeCode = 90) public class MessageDispatchNotification extends BaseCommand { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_DISPATCH_NOTIFICATION; + @OpenWireProperty(version = 1, sequence = 1, cached = true) protected ConsumerId consumerId; + + @OpenWireProperty(version = 1, sequence = 2, cached = true) protected OpenWireDestination destination; - protected MessageId messageId; + + @OpenWireProperty(version = 1, sequence = 3) protected long deliverySequenceId; + @OpenWireProperty(version = 1, sequence = 4) + protected MessageId messageId; + @Override public byte getDataStructureType() { return DATA_STRUCTURE_TYPE; http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageId.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageId.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageId.java index 4f3b13e..dde67de 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageId.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageId.java @@ -16,20 +16,34 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireExtension; +import org.apache.activemq.openwire.annotations.OpenWireProperty; /** * @openwire:marshaller code="110" */ +@OpenWireType(typeCode = 110) public class MessageId implements DataStructure, Comparable<MessageId> { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_ID; + @OpenWireProperty(version = 10, sequence = 1) protected String textView; + + @OpenWireProperty(version = 1, sequence = 2, cached = true) protected ProducerId producerId; + + @OpenWireProperty(version = 1, sequence = 3) protected long producerSequenceId; + + @OpenWireProperty(version = 1, sequence = 4) protected long brokerSequenceId; + @OpenWireExtension private transient String key; + + @OpenWireExtension private transient int hashCode; public MessageId() { http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessagePull.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessagePull.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessagePull.java index 29f5c01..b8cd970 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessagePull.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessagePull.java @@ -16,6 +16,10 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireExtension; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * Used to pull messages on demand, the command can have a time value that indicates * how long the Broker keeps the pull request open before returning a MessageDispatch @@ -23,18 +27,35 @@ package org.apache.activemq.openwire.commands; * * @openwire:marshaller code="20" */ +@OpenWireType(typeCode = 20) public class MessagePull extends BaseCommand { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_PULL; + @OpenWireProperty(version = 1, sequence = 1, cached = true) protected ConsumerId consumerId; + + @OpenWireProperty(version = 1, sequence = 2, cached = true) protected OpenWireDestination destination; + + @OpenWireProperty(version = 1, sequence = 3) protected long timeout; - private MessageId messageId; + + @OpenWireProperty(version = 3, sequence = 4) private String correlationId; + @OpenWireProperty(version = 4, sequence = 5) + private MessageId messageId; + + @OpenWireExtension private transient boolean tracked = false; + @OpenWireExtension + private transient int quantity = 1; + + @OpenWireExtension + private transient boolean alwaysSignalDone; + @Override public byte getDataStructureType() { return DATA_STRUCTURE_TYPE; http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/NetworkBridgeFilter.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/NetworkBridgeFilter.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/NetworkBridgeFilter.java index f39286a..95766cd 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/NetworkBridgeFilter.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/NetworkBridgeFilter.java @@ -16,20 +16,28 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @openwire:marshaller code="91" */ +@OpenWireType(typeCode = 91) public class NetworkBridgeFilter implements DataStructure { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.NETWORK_BRIDGE_FILTER; static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeFilter.class); + @OpenWireProperty(version = 1, sequence = 1, cached = true) protected BrokerId networkBrokerId; + + @OpenWireProperty(version = 10, sequence = 2) protected int messageTTL; + + @OpenWireProperty(version = 10, sequence = 3) protected int consumerTTL; public NetworkBridgeFilter() { http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireBlobMessage.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireBlobMessage.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireBlobMessage.java index 1c3f191..561793e 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireBlobMessage.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireBlobMessage.java @@ -23,24 +23,37 @@ import java.net.URL; import javax.jms.JMSException; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireExtension; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * An implementation of ActiveMQ's BlobMessage for out of band BLOB transfer * * openwire:marshaller code="29" */ +@OpenWireType(typeCode = 29, version = 3) public class OpenWireBlobMessage extends OpenWireMessage { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.OPENWIRE_BLOB_MESSAGE; public static final String BINARY_MIME_TYPE = "application/octet-stream"; + @OpenWireProperty(version = 3, sequence = 1) private String remoteBlobUrl; + + @OpenWireProperty(version = 3, sequence = 2, cached = true) private String mimeType; - private String name; + + @OpenWireProperty(version = 3, sequence = 3) private boolean deletedByBroker; + @OpenWireExtension private transient URL url; + @OpenWireExtension(serialized = true) + private String name; + @Override public OpenWireBlobMessage copy() { OpenWireBlobMessage copy = new OpenWireBlobMessage(); @@ -97,8 +110,6 @@ public class OpenWireBlobMessage extends OpenWireMessage { /** * The name of the attachment which can be useful information if transmitting files over * ActiveMQ - * - * @openwire:property version=3 cache=false */ public void setName(String name) { this.name = name; http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireBytesMessage.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireBytesMessage.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireBytesMessage.java index e7ea14c..33f922c 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireBytesMessage.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireBytesMessage.java @@ -24,6 +24,7 @@ import java.util.zip.Inflater; import javax.jms.JMSException; import javax.jms.MessageNotReadableException; +import org.apache.activemq.openwire.annotations.OpenWireType; import org.fusesource.hawtbuf.Buffer; import org.fusesource.hawtbuf.BufferEditor; import org.fusesource.hawtbuf.ByteArrayOutputStream; @@ -36,6 +37,7 @@ import org.fusesource.hawtbuf.ByteArrayOutputStream; * * openwire:marshaller code=24 */ +@OpenWireType(typeCode = 24) public class OpenWireBytesMessage extends OpenWireMessage { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.OPENWIRE_BYTES_MESSAGE; http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireDestination.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireDestination.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireDestination.java index 1b3d57f..04bb1f3 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireDestination.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireDestination.java @@ -33,6 +33,9 @@ import javax.jms.TemporaryQueue; import javax.jms.TemporaryTopic; import javax.jms.Topic; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireExtension; +import org.apache.activemq.openwire.annotations.OpenWireProperty; import org.apache.activemq.openwire.utils.DefaultUnresolvedDestinationTransformer; import org.apache.activemq.openwire.utils.UnresolvedDestinationTransformer; @@ -42,6 +45,7 @@ import org.apache.activemq.openwire.utils.UnresolvedDestinationTransformer; * * @openwire:marshaller */ +@OpenWireType(typeCode = 0) public abstract class OpenWireDestination implements Destination, DataStructure, Comparable<OpenWireDestination> { public static final String PATH_SEPERATOR = "."; @@ -60,12 +64,22 @@ public abstract class OpenWireDestination implements Destination, DataStructure, public static final String TEMP_DESTINATION_NAME_PREFIX = "ID:"; + @OpenWireProperty(version = 1, sequence = 1) protected String physicalName; + @OpenWireExtension protected transient OpenWireDestination[] compositeDestinations; + + @OpenWireExtension protected transient String[] destinationPaths; + + @OpenWireExtension protected transient boolean isPattern; + + @OpenWireExtension protected transient int hashValue; + + @OpenWireExtension(serialized = true) protected Map<String, String> options; protected static UnresolvedDestinationTransformer unresolvableDestinationTransformer = new DefaultUnresolvedDestinationTransformer(); http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireMapMessage.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireMapMessage.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireMapMessage.java index b828aea..656bfce 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireMapMessage.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireMapMessage.java @@ -33,6 +33,8 @@ import javax.jms.JMSException; import javax.jms.MessageFormatException; import javax.jms.MessageNotWriteableException; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireExtension; import org.apache.activemq.openwire.codec.OpenWireFormat; import org.apache.activemq.openwire.utils.ExceptionSupport; import org.apache.activemq.openwire.utils.OpenWireMarshallingSupport; @@ -44,10 +46,12 @@ import org.fusesource.hawtbuf.UTF8Buffer; /** * openwire:marshaller code="25" */ +@OpenWireType(typeCode = 25) public class OpenWireMapMessage extends OpenWireMessage { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.OPENWIRE_MAP_MESSAGE; + @OpenWireExtension protected transient Map<String, Object> map = new HashMap<String, Object>(); private Object readResolve() throws ObjectStreamException { http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireMessage.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireMessage.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireMessage.java index 12e1d09..ce39730 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireMessage.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireMessage.java @@ -26,6 +26,8 @@ import java.util.Vector; import javax.jms.JMSException; import javax.jms.MessageFormatException; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireExtension; import org.apache.activemq.openwire.utils.ExceptionSupport; import org.fusesource.hawtbuf.Buffer; import org.fusesource.hawtbuf.UTF8Buffer; @@ -35,11 +37,15 @@ import org.fusesource.hawtbuf.UTF8Buffer; * * openwire:marshaller code="23" */ +@OpenWireType(typeCode = 23) public class OpenWireMessage extends Message { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.OPENWIRE_MESSAGE; + @OpenWireExtension protected transient boolean useCompression; + + @OpenWireExtension protected transient boolean nestedMapAndListAllowed; @Override http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireObjectMessage.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireObjectMessage.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireObjectMessage.java index 4c4a74d..8480234 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireObjectMessage.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireObjectMessage.java @@ -29,6 +29,8 @@ import java.util.zip.InflaterInputStream; import javax.jms.JMSException; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireExtension; import org.apache.activemq.openwire.codec.OpenWireFormat; import org.apache.activemq.openwire.utils.ExceptionSupport; import org.apache.activemq.openwire.utils.ObjectMessageInputStream; @@ -39,11 +41,13 @@ import org.fusesource.hawtbuf.ByteArrayOutputStream; /** * openwire:marshaller code="26" */ +@OpenWireType(typeCode = 26) public class OpenWireObjectMessage extends OpenWireMessage { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.OPENWIRE_OBJECT_MESSAGE; static final ClassLoader ACTIVEMQ_CLASSLOADER = OpenWireObjectMessage.class.getClassLoader(); + @OpenWireExtension protected transient Serializable object; @Override http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireQueue.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireQueue.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireQueue.java index 67b87e1..ab03b58 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireQueue.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireQueue.java @@ -19,10 +19,13 @@ package org.apache.activemq.openwire.commands; import javax.jms.JMSException; import javax.jms.Queue; +import org.apache.activemq.openwire.annotations.OpenWireType; + /** * @openwire:marshaller code="100" */ +@OpenWireType(typeCode = 100) public class OpenWireQueue extends OpenWireDestination implements Queue { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.OPENWIRE_QUEUE; http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireStreamMessage.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireStreamMessage.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireStreamMessage.java index fa0660f..d848e0e 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireStreamMessage.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireStreamMessage.java @@ -26,6 +26,7 @@ import java.util.List; import javax.jms.JMSException; +import org.apache.activemq.openwire.annotations.OpenWireType; import org.apache.activemq.openwire.utils.ExceptionSupport; import org.apache.activemq.openwire.utils.OpenWireMarshallingSupport; import org.fusesource.hawtbuf.Buffer; @@ -35,6 +36,7 @@ import org.fusesource.hawtbuf.DataByteArrayOutputStream; /** * openwire:marshaller code="27" */ +@OpenWireType(typeCode = 27) public class OpenWireStreamMessage extends OpenWireMessage { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.OPENWIRE_STREAM_MESSAGE; http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTempDestination.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTempDestination.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTempDestination.java index f9ba499..9f07ec3 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTempDestination.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTempDestination.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,11 +26,15 @@ import org.slf4j.LoggerFactory; * * @openwire:marshaller */ +@OpenWireType(typeCode = 0) public abstract class OpenWireTempDestination extends OpenWireDestination { private static final Logger LOG = LoggerFactory.getLogger(OpenWireTempDestination.class); + @OpenWireExtension protected transient String connectionId; + + @OpenWireExtension protected transient int sequenceId; public OpenWireTempDestination() { http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTempQueue.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTempQueue.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTempQueue.java index 13f4c1b..b3cbed0 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTempQueue.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTempQueue.java @@ -19,12 +19,15 @@ package org.apache.activemq.openwire.commands; import javax.jms.JMSException; import javax.jms.TemporaryQueue; +import org.apache.activemq.openwire.annotations.OpenWireType; + /** * Represents an ActiveMQ Temporary Queue. * * @openwire:marshaller code="102" */ +@OpenWireType(typeCode = 102) public class OpenWireTempQueue extends OpenWireTempDestination implements TemporaryQueue { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.OPENWIRE_TEMP_QUEUE; http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTempTopic.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTempTopic.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTempTopic.java index 321bd7a..05a5fe3 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTempTopic.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTempTopic.java @@ -19,9 +19,12 @@ package org.apache.activemq.openwire.commands; import javax.jms.JMSException; import javax.jms.TemporaryTopic; +import org.apache.activemq.openwire.annotations.OpenWireType; + /** * @openwire:marshaller code="103" */ +@OpenWireType(typeCode = 103) public class OpenWireTempTopic extends OpenWireTempDestination implements TemporaryTopic { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.OPENWIRE_TEMP_TOPIC; http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTextMessage.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTextMessage.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTextMessage.java index 09c9223..de9d964 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTextMessage.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTextMessage.java @@ -25,6 +25,8 @@ import java.io.OutputStream; import javax.jms.JMSException; import javax.jms.MessageNotWriteableException; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireExtension; import org.apache.activemq.openwire.codec.OpenWireFormat; import org.apache.activemq.openwire.utils.ExceptionSupport; import org.apache.activemq.openwire.utils.OpenWireMarshallingSupport; @@ -35,10 +37,12 @@ import org.fusesource.hawtbuf.ByteArrayOutputStream; /** * openwire:marshaller code="28" */ +@OpenWireType(typeCode = 28) public class OpenWireTextMessage extends OpenWireMessage { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.OPENWIRE_TEXT_MESSAGE; + @OpenWireExtension(serialized = true) protected String text; @Override http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTopic.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTopic.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTopic.java index 8bf6500..2bf503f 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTopic.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireTopic.java @@ -19,9 +19,12 @@ package org.apache.activemq.openwire.commands; import javax.jms.JMSException; import javax.jms.Topic; +import org.apache.activemq.openwire.annotations.OpenWireType; + /** * @openwire:marshaller code="101" */ +@OpenWireType(typeCode = 101) public class OpenWireTopic extends OpenWireDestination implements Topic { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.OPENWIRE_TOPIC; http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/PartialCommand.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/PartialCommand.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/PartialCommand.java index 030f3a8..6a69c24 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/PartialCommand.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/PartialCommand.java @@ -16,17 +16,24 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * Represents a partial command; a large command that has been split up into * pieces. * * @openwire:marshaller code="60" */ +@OpenWireType(typeCode = 60) public class PartialCommand implements Command { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PARTIAL_COMMAND; + @OpenWireProperty(version = 1, sequence = 1) private int commandId; + + @OpenWireProperty(version = 1, sequence = 1, mandatory = true) private byte[] data; public PartialCommand() { http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerAck.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerAck.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerAck.java index b1bda9b..e01a187 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerAck.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerAck.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * A ProducerAck command is sent by a broker to a producer to let it know it has * received and processed messages that it has produced. The producer will be @@ -24,11 +27,15 @@ package org.apache.activemq.openwire.commands; * * @openwire:marshaller code="19" version="3" */ +@OpenWireType(typeCode = 19, version = 3) public class ProducerAck extends BaseCommand { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PRODUCER_ACK; + @OpenWireProperty(version = 3, sequence = 1) protected ProducerId producerId; + + @OpenWireProperty(version = 3, sequence = 1) protected int size; public ProducerAck() { http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerId.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerId.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerId.java index bd9283e..d934bbf 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerId.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerId.java @@ -16,20 +16,35 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireExtension; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * @openwire:marshaller code="123" * */ +@OpenWireType(typeCode = 123) public class ProducerId implements DataStructure { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PRODUCER_ID; + @OpenWireProperty(version = 1, sequence = 1, cached = true) protected String connectionId; - protected long sessionId; + + @OpenWireProperty(version = 1, sequence = 2) protected long value; + @OpenWireProperty(version = 1, sequence = 3) + protected long sessionId; + + @OpenWireExtension protected transient int hashCode; + + @OpenWireExtension protected transient String key; + + @OpenWireExtension protected transient SessionId parentId; public ProducerId() { http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerInfo.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerInfo.java index 23758b0..878ccba 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerInfo.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerInfo.java @@ -16,18 +16,31 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * @openwire:marshaller code="6" */ +@OpenWireType(typeCode = 6) public class ProducerInfo extends BaseCommand { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PRODUCER_INFO; + @OpenWireProperty(version = 1, sequence = 1, cached = true) protected ProducerId producerId; + + @OpenWireProperty(version = 1, sequence = 2, cached = true) protected OpenWireDestination destination; + + @OpenWireProperty(version = 1, sequence = 3, cached = true) protected BrokerId[] brokerPath; + + @OpenWireProperty(version = 2, sequence = 4) protected boolean dispatchAsync; + + @OpenWireProperty(version = 3, sequence = 5) protected int windowSize; public ProducerInfo() { http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/RemoveInfo.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/RemoveInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/RemoveInfo.java index 9aa4b5e..e739fba 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/RemoveInfo.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/RemoveInfo.java @@ -18,16 +18,23 @@ package org.apache.activemq.openwire.commands; import java.io.IOException; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * Removes a consumer, producer, session or connection. * * @openwire:marshaller code="12" */ +@OpenWireType(typeCode = 12) public class RemoveInfo extends BaseCommand { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REMOVE_INFO; + @OpenWireProperty(version = 1, sequence = 1, cached = true) protected DataStructure objectId; + + @OpenWireProperty(version = 5, sequence = 2) protected long lastDeliveredSequenceId; public RemoveInfo() { http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/RemoveSubscriptionInfo.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/RemoveSubscriptionInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/RemoveSubscriptionInfo.java index 5e04db8..1346b79 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/RemoveSubscriptionInfo.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/RemoveSubscriptionInfo.java @@ -16,17 +16,26 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * @openwire:marshaller code="9" */ +@OpenWireType(typeCode = 9) public class RemoveSubscriptionInfo extends BaseCommand { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REMOVE_SUBSCRIPTION_INFO; + @OpenWireProperty(version = 1, sequence = 1, cached = true) protected ConnectionId connectionId; - protected String clientId; + + @OpenWireProperty(version = 1, sequence = 2) protected String subscriptionName; + @OpenWireProperty(version = 1, sequence = 3) + protected String clientId; + @Override public byte getDataStructureType() { return DATA_STRUCTURE_TYPE; http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ReplayCommand.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ReplayCommand.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ReplayCommand.java index 9d1b4c8..a9b8050 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ReplayCommand.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ReplayCommand.java @@ -16,6 +16,10 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireExtension; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * A general purpose replay command for some kind of producer where ranges of * messages are asked to be replayed. This command is typically used over a @@ -24,15 +28,25 @@ package org.apache.activemq.openwire.commands; * * @openwire:marshaller code="65" */ +@OpenWireType(typeCode = 65) public class ReplayCommand extends BaseCommand { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REPLAY; + @OpenWireProperty(version = 1, sequence = 1) + private int firstNakNumber; + + @OpenWireProperty(version = 1, sequence = 2) + private int lastNakNumber; + + @OpenWireExtension(serialized = true) private String producerId; + + @OpenWireExtension(serialized = true) private int firstAckNumber; + + @OpenWireExtension(serialized = true) private int lastAckNumber; - private int firstNakNumber; - private int lastNakNumber; public ReplayCommand() { } @@ -48,8 +62,6 @@ public class ReplayCommand extends BaseCommand { /** * Is used to uniquely identify the producer of the sequence - * - * @openwire:property version=1 cache=false */ public void setProducerId(String producerId) { this.producerId = producerId; @@ -62,8 +74,6 @@ public class ReplayCommand extends BaseCommand { /** * Is used to specify the first sequence number being acknowledged as delivered on the transport * so that it can be removed from cache - * - * @openwire:property version=1 */ public void setFirstAckNumber(int firstSequenceNumber) { this.firstAckNumber = firstSequenceNumber; @@ -76,8 +86,6 @@ public class ReplayCommand extends BaseCommand { /** * Is used to specify the last sequence number being acknowledged as delivered on the transport * so that it can be removed from cache - * - * @openwire:property version=1 */ public void setLastAckNumber(int lastSequenceNumber) { this.lastAckNumber = lastSequenceNumber; http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/Response.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/Response.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/Response.java index 15e8508..32102dd 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/Response.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/Response.java @@ -16,12 +16,18 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * @openwire:marshaller code="30" */ +@OpenWireType(typeCode = 30) public class Response extends BaseCommand { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.RESPONSE; + + @OpenWireProperty(version = 1, sequence = 1) int correlationId; @Override http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SessionId.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SessionId.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SessionId.java index d30b6b2..34a2b5f 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SessionId.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SessionId.java @@ -16,18 +16,31 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireExtension; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * @openwire:marshaller code="121" */ +@OpenWireType(typeCode = 121) public class SessionId implements DataStructure { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.SESSION_ID; + @OpenWireProperty(version = 1, sequence = 1, cached = true) protected String connectionId; + + @OpenWireProperty(version = 1, sequence = 2) protected long value; + @OpenWireExtension protected transient int hashCode; + + @OpenWireExtension protected transient String key; + + @OpenWireExtension protected transient ConnectionId parentId; public SessionId() { http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SessionInfo.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SessionInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SessionInfo.java index 4064c3f..9b311bc 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SessionInfo.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SessionInfo.java @@ -16,13 +16,18 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * @openwire:marshaller code="4" */ +@OpenWireType(typeCode = 4) public class SessionInfo extends BaseCommand { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.SESSION_INFO; + @OpenWireProperty(version = 1, sequence = 1, cached = true) protected SessionId sessionId; public SessionInfo() { http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ShutdownInfo.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ShutdownInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ShutdownInfo.java index 72f67e0..b2a27c2 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ShutdownInfo.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ShutdownInfo.java @@ -16,9 +16,12 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; + /** * @openwire:marshaller code="11" */ +@OpenWireType(typeCode = 11) public class ShutdownInfo extends BaseCommand { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.SHUTDOWN_INFO; http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SubscriptionInfo.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SubscriptionInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SubscriptionInfo.java index 99fc3bd..899ba86 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SubscriptionInfo.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SubscriptionInfo.java @@ -16,20 +16,35 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * Used to represent a durable subscription. * * @openwire:marshaller code="55" */ +@OpenWireType(typeCode = 55) public class SubscriptionInfo implements DataStructure { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.DURABLE_SUBSCRIPTION_INFO; - protected OpenWireDestination subscribedDestination; - protected OpenWireDestination destination; + @OpenWireProperty(version = 1, sequence = 1) protected String clientId; - protected String subscriptionName; + + @OpenWireProperty(version = 1, sequence = 2, cached = true) + protected OpenWireDestination destination; + + @OpenWireProperty(version = 1, sequence = 3) protected String selector; + + @OpenWireProperty(version = 1, sequence = 4) + protected String subscriptionName; + + @OpenWireProperty(version = 3, sequence = 5) + protected OpenWireDestination subscribedDestination; + + @OpenWireProperty(version = 11, sequence = 6) protected boolean noLocal; public SubscriptionInfo() { http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/TransactionId.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/TransactionId.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/TransactionId.java index 71be56c..7592a13 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/TransactionId.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/TransactionId.java @@ -16,9 +16,12 @@ */ package org.apache.activemq.openwire.commands; +import org.apache.activemq.openwire.annotations.OpenWireType; + /** * @openwire:marshaller */ +@OpenWireType(typeCode = 0) public abstract class TransactionId implements DataStructure { public abstract boolean isXATransaction(); http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/TransactionInfo.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/TransactionInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/TransactionInfo.java index 698d5e2..223044b 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/TransactionInfo.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/TransactionInfo.java @@ -18,9 +18,13 @@ package org.apache.activemq.openwire.commands; import java.io.IOException; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireProperty; + /** * @openwire:marshaller code="7" */ +@OpenWireType(typeCode = 7) public class TransactionInfo extends BaseCommand { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.TRANSACTION_INFO; @@ -34,10 +38,15 @@ public class TransactionInfo extends BaseCommand { public static final byte FORGET = 6; public static final byte END = 7; - protected byte type; + @OpenWireProperty(version = 1, sequence = 1, cached = true) protected ConnectionId connectionId; + + @OpenWireProperty(version = 1, sequence = 2, cached = true) protected TransactionId transactionId; + @OpenWireProperty(version = 1, sequence = 3) + protected byte type; + public TransactionInfo() { } http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/WireFormatInfo.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/WireFormatInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/WireFormatInfo.java index f6ceca0..a205aeb 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/WireFormatInfo.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/WireFormatInfo.java @@ -24,6 +24,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import org.apache.activemq.openwire.annotations.OpenWireExtension; +import org.apache.activemq.openwire.annotations.OpenWireProperty; +import org.apache.activemq.openwire.annotations.OpenWireType; import org.apache.activemq.openwire.codec.OpenWireFormat; import org.apache.activemq.openwire.utils.OpenWireMarshallingSupport; import org.fusesource.hawtbuf.Buffer; @@ -34,16 +37,23 @@ import org.fusesource.hawtbuf.UTF8Buffer; /** * @openwire:marshaller code="1" */ +@OpenWireType(typeCode = 1, marshalAware = true) public class WireFormatInfo implements Command, MarshallAware { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.WIREFORMAT_INFO; private static final int MAX_PROPERTY_SIZE = 1024 * 4; private static final byte MAGIC[] = new byte[] { 'A', 'c', 't', 'i', 'v', 'e', 'M', 'Q' }; + @OpenWireProperty(version = 1, sequence = 1, size = 8) protected byte magic[] = MAGIC; + + @OpenWireProperty(version = 1, sequence = 2) protected int version; + + @OpenWireProperty(version = 1, sequence = 3) protected Buffer marshalledProperties; + @OpenWireExtension protected transient Map<String, Object> properties; @Override http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/0c90d2e3/openwire-core/src/main/java/org/apache/activemq/openwire/commands/XATransactionId.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/XATransactionId.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/XATransactionId.java index 8781386..6ed8551 100644 --- a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/XATransactionId.java +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/XATransactionId.java @@ -21,23 +21,39 @@ import java.util.Arrays; import javax.transaction.xa.Xid; +import org.apache.activemq.openwire.annotations.OpenWireType; +import org.apache.activemq.openwire.annotations.OpenWireExtension; +import org.apache.activemq.openwire.annotations.OpenWireProperty; import org.fusesource.hawtbuf.DataByteArrayInputStream; import org.fusesource.hawtbuf.DataByteArrayOutputStream; /** * @openwire:marshaller code="112" */ +@OpenWireType(typeCode = 112) public class XATransactionId extends TransactionId implements Xid, Comparable<XATransactionId> { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.OPENWIRE_XA_TRANSACTION_ID; + @OpenWireProperty(version = 1, sequence = 1) private int formatId; - private byte[] branchQualifier; + + @OpenWireProperty(version = 1, sequence = 2) private byte[] globalTransactionId; + + @OpenWireProperty(version = 1, sequence = 3) + private byte[] branchQualifier; + + @OpenWireExtension private transient DataByteArrayOutputStream outputStream; + + @OpenWireExtension private transient byte[] encodedXidBytes; + @OpenWireExtension private transient int hash; + + @OpenWireExtension private transient String transactionKey; public XATransactionId() {
