ARTEMIS-1009 Pure Message Encoding. with this we could send and receive message in their raw format, without requiring conversions to Core.
- MessageImpl and ServerMessage are removed as part of this - AMQPMessage and CoreMessage will have the specialized message format for each protocol - The protocol manager is now responsible to send the message - The message will provide an encoder for journal and paging Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/64681865 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/64681865 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/64681865 Branch: refs/heads/artemis-1009 Commit: 64681865090a77dc8ea7fe653379d06a67849ab8 Parents: 503b112 Author: Clebert Suconic <clebertsuco...@apache.org> Authored: Mon Feb 20 15:55:15 2017 -0500 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Thu Mar 2 20:04:30 2017 -0500 ---------------------------------------------------------------------- .../artemis/cli/commands/tools/PrintData.java | 7 + .../cli/commands/tools/XmlDataExporter.java | 18 +- .../cli/commands/tools/XmlDataExporterUtil.java | 7 +- .../cli/commands/tools/XmlDataImporter.java | 3 +- .../org/apache/activemq/artemis/Closeable.java | 22 + .../artemis/api/core/ActiveMQBuffer.java | 13 + .../artemis/api/core/ActiveMQBuffers.java | 15 + .../activemq/artemis/api/core/SimpleString.java | 34 + .../core/buffers/impl/ChannelBufferWrapper.java | 83 +- .../artemis/core/persistence/Persister.java | 30 + .../apache/activemq/artemis/utils/ByteUtil.java | 8 + .../activemq/artemis/utils/TypedProperties.java | 74 +- .../apache/activemq/artemis/utils/UTF8Util.java | 36 +- .../artemis/utils/TypedPropertiesTest.java | 10 +- .../config/ActiveMQDefaultConfiguration.java | 20 - .../activemq/artemis/api/core/Message.java | 627 +++++----- .../artemis/api/core/RefCountMessage.java | 81 ++ .../api/core/RefCountMessageListener.java | 31 + .../artemis/api/core/client/ClientMessage.java | 8 - .../artemis/api/core/encode/BodyType.java | 22 + .../artemis/api/core/encode/MessageBody.java | 28 + .../impl/ResetLimitWrappedActiveMQBuffer.java | 24 +- .../client/impl/ClientLargeMessageImpl.java | 22 +- .../core/client/impl/ClientMessageImpl.java | 74 +- .../core/client/impl/ClientMessageInternal.java | 5 + .../core/client/impl/ClientProducerImpl.java | 41 +- .../CompressedLargeMessageControllerImpl.java | 6 + .../client/impl/LargeMessageControllerImpl.java | 15 + .../artemis/core/message/BodyEncoder.java | 55 - .../artemis/core/message/LargeBodyEncoder.java | 55 + .../artemis/core/message/impl/CoreMessage.java | 1066 ++++++++++++++++++ .../core/message/impl/CoreMessagePersister.java | 66 ++ .../artemis/core/message/impl/MessageImpl.java | 1059 ----------------- .../core/message/impl/MessageInternal.java | 57 - .../core/impl/ActiveMQSessionContext.java | 16 +- .../core/protocol/core/impl/ChannelImpl.java | 1 + .../core/protocol/core/impl/PacketImpl.java | 30 +- .../core/impl/RemotingConnectionImpl.java | 1 + .../core/impl/wireformat/MessagePacket.java | 18 +- .../SessionReceiveClientLargeMessage.java | 5 +- .../wireformat/SessionReceiveLargeMessage.java | 14 +- .../impl/wireformat/SessionReceiveMessage.java | 60 +- .../SessionSendContinuationMessage.java | 8 +- .../wireformat/SessionSendLargeMessage.java | 12 +- .../impl/wireformat/SessionSendMessage.java | 54 +- .../activemq/artemis/reader/MapMessageUtil.java | 4 +- .../spi/core/remoting/SessionContext.java | 11 +- .../artemis/message/CoreMessageTest.java | 365 ++++++ .../jdbc/store/journal/JDBCJournalImpl.java | 36 +- .../jdbc/store/journal/JDBCJournalRecord.java | 7 +- .../jms/client/ActiveMQBytesMessage.java | 4 +- .../artemis/jms/client/ActiveMQMessage.java | 8 +- .../jms/transaction/JMSTransactionDetail.java | 6 +- .../artemis/core/journal/EncoderPersister.java | 51 + .../activemq/artemis/core/journal/Journal.java | 55 +- .../journal/impl/AbstractJournalUpdateTask.java | 3 +- .../core/journal/impl/FileWrapperJournal.java | 26 +- .../artemis/core/journal/impl/JournalBase.java | 63 +- .../core/journal/impl/JournalCompactor.java | 9 +- .../artemis/core/journal/impl/JournalImpl.java | 62 +- .../impl/dataformat/JournalAddRecord.java | 20 +- .../impl/dataformat/JournalAddRecordTX.java | 17 +- .../protocol/amqp/broker/AMQPMessage.java | 761 +++++++++++++ .../amqp/broker/AMQPMessagePersister.java | 75 ++ .../amqp/broker/AMQPSessionCallback.java | 33 +- .../broker/ProtonProtocolManagerFactory.java | 14 + .../amqp/converter/ProtonMessageConverter.java | 12 +- .../converter/jms/ServerJMSBytesMessage.java | 8 +- .../amqp/converter/jms/ServerJMSMapMessage.java | 3 +- .../amqp/converter/jms/ServerJMSMessage.java | 21 +- .../converter/jms/ServerJMSObjectMessage.java | 3 +- .../converter/jms/ServerJMSStreamMessage.java | 5 +- .../converter/jms/ServerJMSTextMessage.java | 3 +- .../converter/message/AMQPMessageSupport.java | 28 +- .../message/AMQPNativeInboundTransformer.java | 44 - .../message/AMQPRawInboundTransformer.java | 62 - .../converter/message/InboundTransformer.java | 5 +- .../message/JMSMappingInboundTransformer.java | 34 +- .../message/JMSMappingOutboundTransformer.java | 3 +- .../amqp/proton/AMQPConnectionContext.java | 4 + .../proton/ProtonServerReceiverContext.java | 32 +- .../amqp/proton/ProtonServerSenderContext.java | 25 +- .../amqp/proton/ProtonTransactionHandler.java | 3 +- .../amqp/proton/handler/ProtonHandler.java | 2 +- .../protocol/amqp/util/DeliveryUtil.java | 13 - .../protocol/amqp/util/NettyReadable.java | 139 +++ .../amqp/converter/TestConversions.java | 62 +- .../JMSMappingInboundTransformerTest.java | 6 +- .../JMSMappingOutboundTransformerTest.java | 38 +- .../JMSTransformationSpeedComparisonTest.java | 36 +- .../message/MessageTransformationTest.java | 19 +- .../protocol/amqp/message/AMQPMessageTest.java | 63 ++ .../core/protocol/mqtt/MQTTPublishManager.java | 22 +- .../protocol/mqtt/MQTTRetainMessageManager.java | 8 +- .../core/protocol/mqtt/MQTTSessionCallback.java | 10 +- .../artemis/core/protocol/mqtt/MQTTUtil.java | 15 +- .../openwire/OpenWireMessageConverter.java | 20 +- .../core/protocol/openwire/amq/AMQConsumer.java | 6 +- .../core/protocol/openwire/amq/AMQSession.java | 9 +- .../protocol/openwire/util/OpenWireUtil.java | 7 +- .../ActiveMQStompProtocolMessageBundle.java | 3 +- .../core/protocol/stomp/StompConnection.java | 14 +- .../protocol/stomp/StompProtocolManager.java | 6 +- .../core/protocol/stomp/StompSession.java | 35 +- .../artemis/core/protocol/stomp/StompUtils.java | 6 +- .../stomp/VersionedStompFrameHandler.java | 19 +- .../stomp/v12/StompFrameHandlerV12.java | 4 +- .../artemis/core/config/Configuration.java | 8 - .../core/config/impl/ConfigurationImpl.java | 32 - .../deployers/impl/FileConfigurationParser.java | 4 - .../activemq/artemis/core/filter/Filter.java | 4 +- .../artemis/core/filter/impl/FilterImpl.java | 12 +- .../management/impl/AddressControlImpl.java | 6 +- .../core/management/impl/QueueControlImpl.java | 10 +- .../impl/openmbean/OpenTypeSupport.java | 16 +- .../artemis/core/paging/PagedMessage.java | 5 +- .../artemis/core/paging/PagingStore.java | 8 +- .../core/paging/cursor/PagedReferenceImpl.java | 16 +- .../cursor/impl/PageSubscriptionImpl.java | 4 +- .../activemq/artemis/core/paging/impl/Page.java | 2 +- .../core/paging/impl/PagedMessageImpl.java | 66 +- .../core/paging/impl/PagingStoreImpl.java | 52 +- .../core/persistence/StorageManager.java | 16 +- .../journal/AbstractJournalStorageManager.java | 60 +- .../impl/journal/AddMessageRecord.java | 8 +- .../impl/journal/DescribeJournal.java | 17 +- .../impl/journal/JournalRecordIds.java | 3 + .../impl/journal/JournalStorageManager.java | 14 +- .../journal/LargeMessageTXFailureCallback.java | 6 +- .../impl/journal/LargeServerMessageImpl.java | 117 +- .../journal/LargeServerMessagePersister.java | 73 ++ .../journal/codec/LargeMessageEncoding.java | 55 - .../journal/codec/LargeMessagePersister.java | 63 ++ .../nullpm/NullStorageLargeServerMessage.java | 18 +- .../impl/nullpm/NullStorageManager.java | 16 +- .../artemis/core/postoffice/Binding.java | 9 +- .../artemis/core/postoffice/Bindings.java | 6 +- .../artemis/core/postoffice/PostOffice.java | 18 +- .../core/postoffice/impl/BindingsImpl.java | 27 +- .../core/postoffice/impl/DivertBinding.java | 8 +- .../core/postoffice/impl/LocalQueueBinding.java | 8 +- .../core/postoffice/impl/PostOfficeImpl.java | 84 +- .../core/protocol/ServerPacketDecoder.java | 6 +- .../core/ServerSessionPacketHandler.java | 82 +- .../core/impl/ActiveMQPacketHandler.java | 2 +- .../core/impl/CoreProtocolManagerFactory.java | 14 + .../protocol/core/impl/CoreSessionCallback.java | 8 +- .../impl/wireformat/ReplicationAddMessage.java | 14 +- .../wireformat/ReplicationAddTXMessage.java | 13 +- .../wireformat/ReplicationPageWriteMessage.java | 2 +- .../core/remoting/server/RemotingService.java | 4 + .../server/impl/RemotingServiceImpl.java | 11 +- .../core/replication/ReplicatedJournal.java | 52 +- .../core/replication/ReplicationEndpoint.java | 7 +- .../core/replication/ReplicationManager.java | 11 +- .../core/server/ActiveMQServerLogger.java | 9 +- .../activemq/artemis/core/server/Bindable.java | 6 +- .../artemis/core/server/LargeServerMessage.java | 3 +- .../artemis/core/server/MessageReference.java | 10 +- .../activemq/artemis/core/server/Queue.java | 3 +- .../artemis/core/server/ServerMessage.java | 78 -- .../artemis/core/server/ServerSession.java | 22 +- .../core/server/cluster/Transformer.java | 4 +- .../core/server/cluster/impl/BridgeImpl.java | 17 +- .../cluster/impl/ClusterConnectionBridge.java | 14 +- .../core/server/cluster/impl/Redistributor.java | 3 +- .../cluster/impl/RemoteQueueBindingImpl.java | 14 +- .../core/server/impl/ActiveMQServerImpl.java | 1 + .../artemis/core/server/impl/DivertImpl.java | 10 +- .../artemis/core/server/impl/JournalLoader.java | 6 +- .../core/server/impl/LastValueQueue.java | 10 +- .../core/server/impl/MessageReferenceImpl.java | 24 +- .../server/impl/PostOfficeJournalLoader.java | 7 +- .../artemis/core/server/impl/QueueImpl.java | 65 +- .../artemis/core/server/impl/RefsOperation.java | 4 +- .../core/server/impl/ScaleDownHandler.java | 45 +- .../core/server/impl/ServerConsumerImpl.java | 27 +- .../core/server/impl/ServerMessageImpl.java | 341 ------ .../core/server/impl/ServerSessionImpl.java | 118 +- .../server/management/ManagementService.java | 5 +- .../management/impl/ManagementServiceImpl.java | 12 +- .../core/transaction/TransactionDetail.java | 9 +- .../transaction/impl/CoreTransactionDetail.java | 6 +- .../spi/core/protocol/MessageConverter.java | 7 +- .../spi/core/protocol/MessagePersister.java | 88 ++ .../spi/core/protocol/ProtocolManager.java | 2 + .../core/protocol/ProtocolManagerFactory.java | 15 + .../spi/core/protocol/SessionCallback.java | 6 +- .../resources/schema/artemis-configuration.xsd | 16 - .../core/config/impl/ConfigurationImplTest.java | 9 - .../artemis/core/filter/impl/FilterTest.java | 12 +- .../group/impl/ClusteredResetMockTest.java | 5 +- .../impl/ScheduledDeliveryHandlerTest.java | 196 ++-- .../transaction/impl/TransactionImplTest.java | 16 +- .../artemis/tests/util/ActiveMQTestBase.java | 13 +- .../resources/ConfigurationTest-full-config.xml | 2 - .../test/resources/artemis-configuration.xsd | 16 - .../jms/example/HatColourChangeTransformer.java | 2 +- .../example/AddForwardingTimeTransformer.java | 2 +- pom.xml | 3 +- .../amqp/client/util/UnmodifiableDelivery.java | 5 + .../journal/gcfree/EncodersBench.java | 5 +- .../byteman/JMSBridgeReconnectionTest.java | 4 +- .../tests/extras/byteman/MessageCopyTest.java | 163 --- .../integration/DuplicateDetectionTest.java | 6 +- .../tests/integration/amqp/ProtonTest.java | 22 +- .../integration/client/AckBatchSizeTest.java | 14 +- .../integration/client/AcknowledgeTest.java | 177 ++- .../tests/integration/client/ConsumerTest.java | 163 ++- .../integration/client/HangConsumerTest.java | 7 +- .../InVMNonPersistentMessageBufferTest.java | 36 +- .../client/InterruptedLargeMessageTest.java | 10 +- .../integration/client/LargeMessageTest.java | 4 +- .../integration/cluster/bridge/BridgeTest.java | 10 +- .../cluster/bridge/SimpleTransformer.java | 61 +- .../distribution/ClusterHeadersRemovedTest.java | 5 +- .../distribution/MessageRedistributionTest.java | 4 +- .../tests/integration/divert/DivertTest.java | 5 +- .../interceptors/InterceptorTest.java | 8 +- .../integration/journal/MessageJournalTest.java | 133 +++ .../journal/NIOJournalCompactTest.java | 6 +- .../integration/karaf/ContainerBaseTest.java | 64 ++ .../tests/integration/karaf/KarafBaseTest.java | 212 ++++ .../karaf/distribution/ArtemisFeatureTest.java | 101 ++ .../karaf/distribution/package-info.java | 21 + .../karaf/version/ProbeRemoteServer.java | 51 + .../integration/karaf/version/RemoteTest.java | 38 + .../karaf/version/VersionWireTest.java | 104 ++ .../integration/karaf/version/package-info.java | 21 + .../management/ManagementServiceImplTest.java | 24 +- .../DeleteMessagesOnStartupTest.java | 10 +- .../replication/ReplicationTest.java | 71 +- .../integration/server/FakeStorageManager.java | 6 +- .../storage/PersistMultiThreadTest.java | 31 +- .../stress/paging/PageCursorStressTest.java | 24 +- .../core/server/impl/QueueConcurrentTest.java | 6 +- tests/unit-tests/pom.xml | 6 + .../core/journal/impl/JournalImplTestUnit.java | 2 +- .../unit/core/message/impl/MessageImplTest.java | 9 +- .../tests/unit/core/paging/impl/PageTest.java | 42 +- .../core/paging/impl/PagingManagerImplTest.java | 16 +- .../core/paging/impl/PagingStoreImplTest.java | 38 +- .../core/postoffice/impl/BindingsImplTest.java | 16 +- .../unit/core/postoffice/impl/FakeQueue.java | 9 +- .../impl/WildcardAddressManagerUnitTest.java | 12 +- .../unit/core/server/impl/QueueImplTest.java | 4 +- .../unit/core/server/impl/fakes/FakeFilter.java | 7 +- .../server/impl/fakes/FakeJournalLoader.java | 6 +- .../core/server/impl/fakes/FakePostOffice.java | 22 +- .../tests/unit/util/FakePagingManager.java | 7 +- .../artemis/tests/unit/util/MemorySizeTest.java | 4 +- .../artemis/tests/unit/util/UTF8Test.java | 10 +- 252 files changed, 6442 insertions(+), 4208 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java index 408aef5..2816aaf 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java @@ -34,6 +34,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.cli.Artemis; import org.apache.activemq.artemis.cli.commands.ActionContext; import org.apache.activemq.artemis.core.journal.RecordInfo; +import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; @@ -50,16 +51,22 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordId import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding; import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; +import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory; import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository; +import org.apache.activemq.artemis.spi.core.protocol.MessagePersister; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ExecutorFactory; @Command(name = "print", description = "Print data records information (WARNING: don't use while a production server is running)") public class PrintData extends OptionalLocking { + static { + MessagePersister.registerPersister(CoreProtocolManagerFactory.ID, CoreMessagePersister.getInstance()); + } + @Override public Object execute(ActionContext context) throws Exception { super.execute(context); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java index 4f99181..b57b5c5 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java @@ -50,7 +50,7 @@ import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; -import org.apache.activemq.artemis.core.message.BodyEncoder; +import org.apache.activemq.artemis.core.message.LargeBodyEncoder; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; @@ -75,7 +75,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.core.server.ServerMessage; + import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository; @@ -367,7 +367,7 @@ public final class XmlDataExporter extends OptionalLocking { // Order here is important. We must process the messages from the journal before we process those from the page // files in order to get the messages in the right order. for (Map.Entry<Long, Message> messageMapEntry : messages.entrySet()) { - printSingleMessageAsXML((ServerMessage) messageMapEntry.getValue(), extractQueueNames(messageRefs.get(messageMapEntry.getKey()))); + printSingleMessageAsXML(messageMapEntry.getValue(), extractQueueNames(messageRefs.get(messageMapEntry.getKey()))); } printPagedMessagesAsXML(); @@ -381,6 +381,8 @@ public final class XmlDataExporter extends OptionalLocking { */ private void printPagedMessagesAsXML() { try { + + // TODO-now: fix encodings ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory()); final ExecutorService executor = Executors.newFixedThreadPool(10, ActiveMQThreadFactory.defaultThreadFactory()); ExecutorFactory executorFactory = new ExecutorFactory() { @@ -456,7 +458,7 @@ public final class XmlDataExporter extends OptionalLocking { } } - private void printSingleMessageAsXML(ServerMessage message, List<String> queues) throws XMLStreamException { + private void printSingleMessageAsXML(Message message, List<String> queues) throws XMLStreamException { xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_CHILD); printMessageAttributes(message); printMessageProperties(message); @@ -466,7 +468,7 @@ public final class XmlDataExporter extends OptionalLocking { messagesPrinted++; } - private void printMessageBody(ServerMessage message) throws XMLStreamException { + private void printMessageBody(Message message) throws XMLStreamException { xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY); if (message.isLargeMessage()) { @@ -479,7 +481,7 @@ public final class XmlDataExporter extends OptionalLocking { private void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException { xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString()); - BodyEncoder encoder = null; + LargeBodyEncoder encoder = null; try { encoder = message.getBodyEncoder(); @@ -522,7 +524,7 @@ public final class XmlDataExporter extends OptionalLocking { xmlWriter.writeEndElement(); // end QUEUES_PARENT } - private void printMessageProperties(ServerMessage message) throws XMLStreamException { + private void printMessageProperties(Message message) throws XMLStreamException { xmlWriter.writeStartElement(XmlDataConstants.PROPERTIES_PARENT); for (SimpleString key : message.getPropertyNames()) { Object value = message.getObjectProperty(key); @@ -539,7 +541,7 @@ public final class XmlDataExporter extends OptionalLocking { xmlWriter.writeEndElement(); // end PROPERTIES_PARENT } - private void printMessageAttributes(ServerMessage message) throws XMLStreamException { + private void printMessageAttributes(Message message) throws XMLStreamException { xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_ID, Long.toString(message.getMessageID())); xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_PRIORITY, Byte.toString(message.getPriority())); xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_EXPIRATION, Long.toString(message.getExpiration())); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java index 8ee7678..a3807bd 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java @@ -17,10 +17,9 @@ package org.apache.activemq.artemis.cli.commands.tools; import com.google.common.base.Preconditions; - import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.utils.Base64; /** @@ -92,10 +91,10 @@ public class XmlDataExporterUtil { * @param message * @return */ - public static String encodeMessageBody(final ServerMessage message) { + public static String encodeMessageBody(final Message message) { Preconditions.checkNotNull(message, "ServerMessage can not be null"); - int size = message.getEndOfBodyPosition() - message.getBodyBuffer().readerIndex(); + int size = ((CoreMessage)message.toCore()).getEndOfBodyPosition() - message.getBodyBuffer().readerIndex(); byte[] buffer = new byte[size]; message.getBodyBuffer().readBytes(buffer); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java index 8e2bb9f..0f06738 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java @@ -59,7 +59,6 @@ import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.cli.commands.ActionAbstract; import org.apache.activemq.artemis.cli.commands.ActionContext; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; @@ -387,7 +386,7 @@ public final class XmlDataImporter extends ActionAbstract { logger.debug(logMessage); } - message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array()); + message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array()); try (ClientProducer producer = session.createProducer(destination)) { producer.send(message); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/main/java/org/apache/activemq/artemis/Closeable.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/Closeable.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/Closeable.java new file mode 100644 index 0000000..2f00c5d --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/Closeable.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis; + +public interface Closeable { + void close(boolean failed); +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java index 5446f3f..3a208a6 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java @@ -1065,6 +1065,19 @@ public interface ActiveMQBuffer extends DataInput { */ void writeBytes(ByteBuffer src); + + /** + * Transfers the specified source buffer's data to this buffer starting at + * the current {@code writerIndex} until the source buffer's position + * reaches its limit, and increases the {@code writerIndex} by the + * number of the transferred bytes. + * + * @param src The source buffer + * @throws IndexOutOfBoundsException if {@code src.remaining()} is greater than + * {@code this.writableBytes} + */ + void writeBytes(ByteBuf src, int srcIndex, int length); + /** * Returns a copy of this buffer's readable bytes. Modifying the content * of the returned buffer or this buffer does not affect each other at all. http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java index 32f9279..25fcfea 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.api.core; import java.nio.ByteBuffer; +import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; @@ -76,6 +77,20 @@ public final class ActiveMQBuffers { } /** + * Creates an ActiveMQBuffer wrapping an underlying ByteBuf + * + * The position on this buffer won't affect the position on the inner buffer + * + * @param underlying the underlying NIO ByteBuffer + * @return an ActiveMQBuffer wrapping the underlying NIO ByteBuffer + */ + public static ActiveMQBuffer wrappedBuffer(final ByteBuf underlying) { + ActiveMQBuffer buff = new ChannelBufferWrapper(underlying.duplicate()); + + return buff; + } + + /** * Creates an ActiveMQBuffer wrapping an underlying byte array * * @param underlying the underlying byte array http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java index b7f70c6..e8530e6 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java @@ -20,6 +20,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.utils.DataConstants; /** @@ -134,6 +135,39 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl } + public static SimpleString readNullableSimpleString(ByteBuf buffer) { + int b = buffer.readByte(); + if (b == DataConstants.NULL) { + return null; + } + return readSimpleString(buffer); + } + + + public static SimpleString readSimpleString(ByteBuf buffer) { + int len = buffer.readInt(); + byte[] data = new byte[len]; + buffer.readBytes(data); + return new SimpleString(data); + } + + public static void writeNullableSimpleString(ByteBuf buffer, SimpleString val) { + if (val == null) { + buffer.writeByte(DataConstants.NULL); + } else { + buffer.writeByte(DataConstants.NOT_NULL); + writeSimpleString(buffer, val); + } + } + + public static void writeSimpleString(ByteBuf buffer, SimpleString val) { + byte[] data = val.getData(); + buffer.writeInt(data.length); + buffer.writeBytes(data); + } + + + public SimpleString subSeq(final int start, final int end) { int len = data.length >> 1; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java index 690dbd7..b2660fa 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java @@ -66,11 +66,7 @@ public class ChannelBufferWrapper implements ActiveMQBuffer { @Override public SimpleString readNullableSimpleString() { - int b = buffer.readByte(); - if (b == DataConstants.NULL) { - return null; - } - return readSimpleStringInternal(); + return SimpleString.readNullableSimpleString(buffer); } @Override @@ -84,14 +80,7 @@ public class ChannelBufferWrapper implements ActiveMQBuffer { @Override public SimpleString readSimpleString() { - return readSimpleStringInternal(); - } - - private SimpleString readSimpleStringInternal() { - int len = buffer.readInt(); - byte[] data = new byte[len]; - buffer.readBytes(data); - return new SimpleString(data); + return SimpleString.readSimpleString(buffer); } @Override @@ -111,11 +100,21 @@ public class ChannelBufferWrapper implements ActiveMQBuffer { } else if (len < 0xfff) { return readUTF(); } else { - return readSimpleStringInternal().toString(); + return SimpleString.readNullableSimpleString(buffer).toString(); } } @Override + public void writeNullableString(String val) { + UTF8Util.writeNullableString(buffer, val); + } + + @Override + public void writeUTF(String utf) { + UTF8Util.saveUTF(buffer, utf); + } + + @Override public String readUTF() { return UTF8Util.readUTF(this); } @@ -127,62 +126,17 @@ public class ChannelBufferWrapper implements ActiveMQBuffer { @Override public void writeNullableSimpleString(final SimpleString val) { - if (val == null) { - buffer.writeByte(DataConstants.NULL); - } else { - buffer.writeByte(DataConstants.NOT_NULL); - writeSimpleStringInternal(val); - } - } - - @Override - public void writeNullableString(final String val) { - if (val == null) { - buffer.writeByte(DataConstants.NULL); - } else { - buffer.writeByte(DataConstants.NOT_NULL); - writeStringInternal(val); - } + SimpleString.writeNullableSimpleString(buffer, val); } @Override public void writeSimpleString(final SimpleString val) { - writeSimpleStringInternal(val); - } - - private void writeSimpleStringInternal(final SimpleString val) { - byte[] data = val.getData(); - buffer.writeInt(data.length); - buffer.writeBytes(data); + SimpleString.writeSimpleString(buffer, val); } @Override public void writeString(final String val) { - writeStringInternal(val); - } - - private void writeStringInternal(final String val) { - int length = val.length(); - - buffer.writeInt(length); - - if (length < 9) { - // If very small it's more performant to store char by char - for (int i = 0; i < val.length(); i++) { - buffer.writeShort((short) val.charAt(i)); - } - } else if (length < 0xfff) { - // Store as UTF - this is quicker than char by char for most strings - writeUTF(val); - } else { - // Store as SimpleString, since can't store utf > 0xffff in length - writeSimpleStringInternal(new SimpleString(val)); - } - } - - @Override - public void writeUTF(final String utf) { - UTF8Util.saveUTF(this, utf); + UTF8Util.writeString(buffer, val); } @Override @@ -576,6 +530,11 @@ public class ChannelBufferWrapper implements ActiveMQBuffer { } @Override + public void writeBytes(ByteBuf src, int srcIndex, int length) { + buffer.writeBytes(src, srcIndex, length); + } + + @Override public void writeBytes(final ActiveMQBuffer src, final int srcIndex, final int length) { buffer.writeBytes(src.byteBuf(), srcIndex, length); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java new file mode 100644 index 0000000..fd68a77 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.persistence; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; + +public interface Persister<T extends Object> { + + int getEncodeSize(T record); + + void encode(ActiveMQBuffer buffer, T record); + + T decode(ActiveMQBuffer buffer, T record); + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java index bee8790..e70891d 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java @@ -101,6 +101,14 @@ public class ByteUtil { } public static String bytesToHex(byte[] bytes, int groupSize) { + if (bytes == null) { + return "NULL"; + } + + if (bytes.length == 0) { + return "[]"; + } + char[] hexChars = new char[bytes.length * 2 + numberOfGroups(bytes, groupSize)]; int outPos = 0; for (int j = 0; j < bytes.length; j++) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java index 56cec48..a421484 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java @@ -24,7 +24,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.logs.ActiveMQUtilBundle; @@ -47,21 +47,23 @@ import static org.apache.activemq.artemis.utils.DataConstants.STRING; * This implementation follows section 3.5.4 of the <i>Java Message Service</i> specification * (Version 1.1 April 12, 2002). * <p> - * TODO - should have typed property getters and do conversions herein */ public final class TypedProperties { - private static final SimpleString AMQ_PROPNAME = new SimpleString("_AMQ_"); - private Map<SimpleString, PropertyValue> properties; private volatile int size; - private boolean internalProperties; - public TypedProperties() { } + /** + * Return the number of properites + * */ + public int size() { + return properties.size(); + } + public int getMemoryOffset() { // The estimate is basically the encode size + 2 object references for each entry in the map // Note we don't include the attributes or anything else since they already included in the memory estimate @@ -75,10 +77,6 @@ public final class TypedProperties { size = other.size; } - public boolean hasInternalProperties() { - return internalProperties; - } - public void putBooleanProperty(final SimpleString key, final boolean value) { checkCreateProperties(); doPutValue(key, new BooleanValue(value)); @@ -321,7 +319,7 @@ public final class TypedProperties { } } - public synchronized void decode(final ActiveMQBuffer buffer) { + public synchronized void decode(final ByteBuf buffer) { byte b = buffer.readByte(); if (b == DataConstants.NULL) { @@ -406,7 +404,7 @@ public final class TypedProperties { } } - public synchronized void encode(final ActiveMQBuffer buffer) { + public synchronized void encode(final ByteBuf buffer) { if (properties == null) { buffer.writeByte(DataConstants.NULL); } else { @@ -499,10 +497,6 @@ public final class TypedProperties { } private synchronized void doPutValue(final SimpleString key, final PropertyValue value) { - if (key.startsWith(AMQ_PROPNAME)) { - internalProperties = true; - } - PropertyValue oldValue = properties.put(key, value); if (oldValue != null) { size += value.encodeSize() - oldValue.encodeSize(); @@ -547,7 +541,7 @@ public final class TypedProperties { abstract Object getValue(); - abstract void write(ActiveMQBuffer buffer); + abstract void write(ByteBuf buffer); abstract int encodeSize(); @@ -568,7 +562,7 @@ public final class TypedProperties { } @Override - public void write(final ActiveMQBuffer buffer) { + public void write(final ByteBuf buffer) { buffer.writeByte(DataConstants.NULL); } @@ -587,7 +581,7 @@ public final class TypedProperties { this.val = val; } - private BooleanValue(final ActiveMQBuffer buffer) { + private BooleanValue(final ByteBuf buffer) { val = buffer.readBoolean(); } @@ -597,7 +591,7 @@ public final class TypedProperties { } @Override - public void write(final ActiveMQBuffer buffer) { + public void write(final ByteBuf buffer) { buffer.writeByte(DataConstants.BOOLEAN); buffer.writeBoolean(val); } @@ -617,7 +611,7 @@ public final class TypedProperties { this.val = val; } - private ByteValue(final ActiveMQBuffer buffer) { + private ByteValue(final ByteBuf buffer) { val = buffer.readByte(); } @@ -627,7 +621,7 @@ public final class TypedProperties { } @Override - public void write(final ActiveMQBuffer buffer) { + public void write(final ByteBuf buffer) { buffer.writeByte(DataConstants.BYTE); buffer.writeByte(val); } @@ -646,7 +640,7 @@ public final class TypedProperties { this.val = val; } - private BytesValue(final ActiveMQBuffer buffer) { + private BytesValue(final ByteBuf buffer) { int len = buffer.readInt(); val = new byte[len]; buffer.readBytes(val); @@ -658,7 +652,7 @@ public final class TypedProperties { } @Override - public void write(final ActiveMQBuffer buffer) { + public void write(final ByteBuf buffer) { buffer.writeByte(DataConstants.BYTES); buffer.writeInt(val.length); buffer.writeBytes(val); @@ -679,7 +673,7 @@ public final class TypedProperties { this.val = val; } - private ShortValue(final ActiveMQBuffer buffer) { + private ShortValue(final ByteBuf buffer) { val = buffer.readShort(); } @@ -689,7 +683,7 @@ public final class TypedProperties { } @Override - public void write(final ActiveMQBuffer buffer) { + public void write(final ByteBuf buffer) { buffer.writeByte(DataConstants.SHORT); buffer.writeShort(val); } @@ -708,7 +702,7 @@ public final class TypedProperties { this.val = val; } - private IntValue(final ActiveMQBuffer buffer) { + private IntValue(final ByteBuf buffer) { val = buffer.readInt(); } @@ -718,7 +712,7 @@ public final class TypedProperties { } @Override - public void write(final ActiveMQBuffer buffer) { + public void write(final ByteBuf buffer) { buffer.writeByte(DataConstants.INT); buffer.writeInt(val); } @@ -737,7 +731,7 @@ public final class TypedProperties { this.val = val; } - private LongValue(final ActiveMQBuffer buffer) { + private LongValue(final ByteBuf buffer) { val = buffer.readLong(); } @@ -747,7 +741,7 @@ public final class TypedProperties { } @Override - public void write(final ActiveMQBuffer buffer) { + public void write(final ByteBuf buffer) { buffer.writeByte(DataConstants.LONG); buffer.writeLong(val); } @@ -766,7 +760,7 @@ public final class TypedProperties { this.val = val; } - private FloatValue(final ActiveMQBuffer buffer) { + private FloatValue(final ByteBuf buffer) { val = Float.intBitsToFloat(buffer.readInt()); } @@ -776,7 +770,7 @@ public final class TypedProperties { } @Override - public void write(final ActiveMQBuffer buffer) { + public void write(final ByteBuf buffer) { buffer.writeByte(DataConstants.FLOAT); buffer.writeInt(Float.floatToIntBits(val)); } @@ -796,7 +790,7 @@ public final class TypedProperties { this.val = val; } - private DoubleValue(final ActiveMQBuffer buffer) { + private DoubleValue(final ByteBuf buffer) { val = Double.longBitsToDouble(buffer.readLong()); } @@ -806,7 +800,7 @@ public final class TypedProperties { } @Override - public void write(final ActiveMQBuffer buffer) { + public void write(final ByteBuf buffer) { buffer.writeByte(DataConstants.DOUBLE); buffer.writeLong(Double.doubleToLongBits(val)); } @@ -825,7 +819,7 @@ public final class TypedProperties { this.val = val; } - private CharValue(final ActiveMQBuffer buffer) { + private CharValue(final ByteBuf buffer) { val = (char) buffer.readShort(); } @@ -835,7 +829,7 @@ public final class TypedProperties { } @Override - public void write(final ActiveMQBuffer buffer) { + public void write(final ByteBuf buffer) { buffer.writeByte(DataConstants.CHAR); buffer.writeShort((short) val); } @@ -854,8 +848,8 @@ public final class TypedProperties { this.val = val; } - private StringValue(final ActiveMQBuffer buffer) { - val = buffer.readSimpleString(); + private StringValue(final ByteBuf buffer) { + val = SimpleString.readSimpleString(buffer); } @Override @@ -864,9 +858,9 @@ public final class TypedProperties { } @Override - public void write(final ActiveMQBuffer buffer) { + public void write(final ByteBuf buffer) { buffer.writeByte(DataConstants.STRING); - buffer.writeSimpleString(val); + SimpleString.writeSimpleString(buffer, val); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java index e75395b..84e1557 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java @@ -18,7 +18,9 @@ package org.apache.activemq.artemis.utils; import java.lang.ref.SoftReference; +import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.logs.ActiveMQUtilBundle; import org.apache.activemq.artemis.logs.ActiveMQUtilLogger; @@ -29,15 +31,43 @@ import org.apache.activemq.artemis.logs.ActiveMQUtilLogger; */ public final class UTF8Util { + + private static final boolean isTrace = ActiveMQUtilLogger.LOGGER.isTraceEnabled(); + + private static final ThreadLocal<SoftReference<StringUtilBuffer>> currenBuffer = new ThreadLocal<>(); + private UTF8Util() { // utility class } + public static void writeNullableString(ByteBuf buffer, final String val) { + if (val == null) { + buffer.writeByte(DataConstants.NULL); + } else { + buffer.writeByte(DataConstants.NOT_NULL); + writeString(buffer, val); + } + } - private static final boolean isTrace = ActiveMQUtilLogger.LOGGER.isTraceEnabled(); + public static void writeString(final ByteBuf buffer, final String val) { + int length = val.length(); - private static final ThreadLocal<SoftReference<StringUtilBuffer>> currenBuffer = new ThreadLocal<>(); + buffer.writeInt(length); + + if (length < 9) { + // If very small it's more performant to store char by char + for (int i = 0; i < val.length(); i++) { + buffer.writeShort((short) val.charAt(i)); + } + } else if (length < 0xfff) { + // Store as UTF - this is quicker than char by char for most strings + saveUTF(buffer, val); + } else { + // Store as SimpleString, since can't store utf > 0xffff in length + SimpleString.writeSimpleString(buffer, new SimpleString(val)); + } + } - public static void saveUTF(final ActiveMQBuffer out, final String str) { + public static void saveUTF(final ByteBuf out, final String str) { StringUtilBuffer buffer = UTF8Util.getThreadLocalBuffer(); if (str.length() > 0xffff) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java index 8013e96..cb6c8fe 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java @@ -187,12 +187,12 @@ public class TypedPropertiesTest { props.putSimpleStringProperty(keyToRemove, RandomUtil.randomSimpleString()); ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(1024); - props.encode(buffer); + props.encode(buffer.byteBuf()); Assert.assertEquals(props.getEncodeSize(), buffer.writerIndex()); TypedProperties decodedProps = new TypedProperties(); - decodedProps.decode(buffer); + decodedProps.decode(buffer.byteBuf()); TypedPropertiesTest.assertEqualsTypeProperties(props, decodedProps); @@ -200,7 +200,7 @@ public class TypedPropertiesTest { // After removing a property, you should still be able to encode the Property props.removeProperty(keyToRemove); - props.encode(buffer); + props.encode(buffer.byteBuf()); Assert.assertEquals(props.getEncodeSize(), buffer.writerIndex()); } @@ -210,12 +210,12 @@ public class TypedPropertiesTest { TypedProperties emptyProps = new TypedProperties(); ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(1024); - emptyProps.encode(buffer); + emptyProps.encode(buffer.byteBuf()); Assert.assertEquals(props.getEncodeSize(), buffer.writerIndex()); TypedProperties decodedProps = new TypedProperties(); - decodedProps.decode(buffer); + decodedProps.decode(buffer.byteBuf()); TypedPropertiesTest.assertEqualsTypeProperties(emptyProps, decodedProps); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 38ec105..c0d9db6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -262,12 +262,6 @@ public final class ActiveMQDefaultConfiguration { // The minimal number of data files before we can start compacting private static int DEFAULT_JOURNAL_COMPACT_MIN_FILES = 10; - // XXX Only meant to be used by project developers - private static int DEFAULT_JOURNAL_PERF_BLAST_PAGES = -1; - - // XXX Only meant to be used by project developers - private static boolean DEFAULT_RUN_SYNC_SPEED_TEST = false; - // Interval to log server specific information (e.g. memory usage etc) private static long DEFAULT_SERVER_DUMP_INTERVAL = -1; @@ -801,20 +795,6 @@ public final class ActiveMQDefaultConfiguration { } /** - * XXX Only meant to be used by project developers - */ - public static int getDefaultJournalPerfBlastPages() { - return DEFAULT_JOURNAL_PERF_BLAST_PAGES; - } - - /** - * XXX Only meant to be used by project developers - */ - public static boolean isDefaultRunSyncSpeedTest() { - return DEFAULT_RUN_SYNC_SPEED_TEST; - } - - /** * Interval to log server specific information (e.g. memory usage etc) */ public static long getDefaultServerDumpInterval() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index 80116ed..4a5381c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -16,10 +16,14 @@ */ package org.apache.activemq.artemis.api.core; +import java.util.HashMap; import java.util.Map; import java.util.Set; -import org.apache.activemq.artemis.utils.UUID; +import io.netty.buffer.ByteBuf; +import org.apache.activemq.artemis.api.core.encode.BodyType; +import org.apache.activemq.artemis.core.message.LargeBodyEncoder; +import org.apache.activemq.artemis.core.persistence.Persister; /** * A Message is a routable instance that has a payload. @@ -48,9 +52,41 @@ import org.apache.activemq.artemis.utils.UUID; * <p> * If conversion is not allowed (for example calling {@code getFloatProperty} on a property set a * {@code boolean}), a {@link ActiveMQPropertyConversionException} will be thrown. + * + * + * User cases that will be covered by Message + * + * Receiving a buffer: + * + * Message encode = new CoreMessage(); // or any other implementation + * encode.receiveBuffer(buffer); + * + * + * Sending to a buffer: + * + * Message encode; + * size = encode.getEncodeSize(); + * encode.encodeDirectly(bufferOutput); + * + * + * Disabling temporary buffer: + * + * // This will make the message to only be encoded directly to the output stream, useful on client core API + * encode.disableInternalBuffer(); + */ public interface Message { + + SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_AMQ_ROUTE_TO"); + + SimpleString HDR_SCALEDOWN_TO_IDS = new SimpleString("_AMQ_SCALEDOWN_TO"); + + SimpleString HDR_ROUTE_TO_ACK_IDS = new SimpleString("_AMQ_ACK_ROUTE_TO"); + + // used by the bridges to set duplicates + SimpleString HDR_BRIDGE_DUPLICATE_ID = new SimpleString("_AMQ_BRIDGE_DUP"); + /** * the actual time the message was expired. * * * @@ -129,6 +165,60 @@ public interface Message { byte STREAM_TYPE = 6; + + void messageChanged(); + + /** + * Careful: Unless you are changing the body of the message, prefer getReadOnlyBodyBuffer + */ + ActiveMQBuffer getBodyBuffer(); + + ActiveMQBuffer getReadOnlyBodyBuffer(); + + /** Used in the cases of large messages */ + LargeBodyEncoder getBodyEncoder() throws ActiveMQException; + + /** Context can be used by the application server to inject extra control, like a protocol specific on the server. + * There is only one per Object, use it wisely! + * + * Note: the intent of this was to replace PageStore reference on Message, but it will be later increased by adidn a ServerPojo + * */ + RefCountMessageListener getContext(); + + Message setContext(RefCountMessageListener context); + + /** The buffer will belong to this message, until release is called. */ + Message setBuffer(ByteBuf buffer); + + // TODO-now: Do we need this? + byte getType(); + + // TODO-now: Do we need this? + Message setType(byte type); + + /** + * Returns whether this message is a <em>large message</em> or a regular message. + */ + boolean isLargeMessage(); + + /** + * TODO: There's currently some treatment on LargeMessage that is done for server's side large message + * This needs to be refactored, this Method shouldn't be used at all. + * @Deprecated do not use this, internal use only. *It will* be removed for sure even on minor releases. + * */ + @Deprecated + default boolean isServerMessage() { + return false; + } + + ByteBuf getBuffer(); + + /** It will generate a new instance of the message encode, being a deep copy, new properties, new everything */ + Message copy(); + + /** It will generate a new instance of the message encode, being a deep copy, new properties, new everything */ + Message copy(long newID); + /** * Returns the messageID. * <br> @@ -136,39 +226,43 @@ public interface Message { */ long getMessageID(); + Message setMessageID(long id); + /** - * Returns the userID - this is an optional user specified UUID that can be set to identify the message - * and will be passed around with the message - * - * @return the user id + * Returns the expiration time of this message. */ - UUID getUserID(); + long getExpiration(); /** - * Sets the user ID + * Sets the expiration of this message. * - * @param userID + * @param expiration expiration time */ - Message setUserID(UUID userID); + Message setExpiration(long expiration); /** - * Returns the address this message is sent to. + * Returns whether this message is expired or not. */ - SimpleString getAddress(); + default boolean isExpired() { + if (getExpiration() == 0) { + return false; + } + + return System.currentTimeMillis() - getExpiration() >= 0; + } + /** - * Sets the address to send this message to. + * Returns the userID - this is an optional user specified UUID that can be set to identify the message + * and will be passed around with the message * - * @param address address to send the message to + * @return the user id */ - Message setAddress(SimpleString address); + Object getUserID(); - /** - * Returns this message type. - * <p> - * See fields {@literal *_TYPE} for possible values. - */ - byte getType(); + Message setUserID(Object userID); + + void copyHeadersAndProperties(final Message msg); /** * Returns whether this message is durable or not. @@ -182,36 +276,28 @@ public interface Message { */ Message setDurable(boolean durable); - /** - * Returns the expiration time of this message. - */ - long getExpiration(); + Persister<Message> getPersister(); - /** - * Returns whether this message is expired or not. - */ - boolean isExpired(); + Object getProtocol(); - /** - * Sets the expiration of this message. - * - * @param expiration expiration time - */ - Message setExpiration(long expiration); + Message setProtocol(Object protocol); + + Object getBody(); + + BodyType getBodyType(); + + Message setBody(BodyType type, Object body); + + String getAddress(); + + Message setAddress(String address); + + SimpleString getAddressSimpleString(); + + Message setAddress(SimpleString address); - /** - * Returns the message timestamp. - * <br> - * The timestamp corresponds to the time this message - * was handled by an ActiveMQ Artemis server. - */ long getTimestamp(); - /** - * Sets the message timestamp. - * - * @param timestamp timestamp - */ Message setTimestamp(long timestamp); /** @@ -230,164 +316,128 @@ public interface Message { */ Message setPriority(byte priority); - /** - * Returns the size of the <em>encoded</em> message. - */ - int getEncodeSize(); + /** Used to receive this message from an encoded medium buffer */ + void receiveBuffer(ByteBuf buffer); - /** - * Returns whether this message is a <em>large message</em> or a regular message. - */ - boolean isLargeMessage(); + /** Used to send this message to an encoded medium buffer. + * @param buffer the buffer used. + * @param deliveryCount Some protocols (AMQP) will have this as part of the message. */ + void sendBuffer(ByteBuf buffer, int deliveryCount); - /** - * Returns the message body as an ActiveMQBuffer - */ - ActiveMQBuffer getBodyBuffer(); + int getPersistSize(); - /** - * Writes the input byte array to the message body ActiveMQBuffer - */ - Message writeBodyBufferBytes(byte[] bytes); + void persist(ActiveMQBuffer targetRecord); - /** - * Writes the input String to the message body ActiveMQBuffer - */ - Message writeBodyBufferString(String string); + void reloadPersistence(ActiveMQBuffer record); - /** - * Returns a <em>copy</em> of the message body as an ActiveMQBuffer. Any modification - * of this buffer should not impact the underlying buffer. - */ - ActiveMQBuffer getBodyBufferDuplicate(); + default void releaseBuffer() { + ByteBuf buffer = getBuffer(); + if (buffer != null) { + buffer.release(); + } + setBuffer(null); + } - // Properties - // ----------------------------------------------------------------- + default String getText() { + if (getBodyType() == BodyType.Text) { + return getBody().toString(); + } else { + return null; + } + } - /** - * Puts a boolean property in this message. - * - * @param key property name - * @param value property value - */ - Message putBooleanProperty(SimpleString key, boolean value); + // TODO-now: move this to some utility class + default void referenceOriginalMessage(final Message original, String originalQueue) { + String queueOnMessage = original.getStringProperty(Message.HDR_ORIGINAL_QUEUE.toString()); - /** - * @see #putBooleanProperty(SimpleString, boolean) - */ - Message putBooleanProperty(String key, boolean value); + if (queueOnMessage != null) { + putStringProperty(Message.HDR_ORIGINAL_QUEUE.toString(), queueOnMessage); + } else if (originalQueue != null) { + putStringProperty(Message.HDR_ORIGINAL_QUEUE.toString(), originalQueue); + } - /** - * Puts a byte property in this message. - * - * @param key property name - * @param value property value - */ - Message putByteProperty(SimpleString key, byte value); + if (original.containsProperty(Message.HDR_ORIG_MESSAGE_ID.toString())) { + putStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString(), original.getStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString())); - /** - * @see #putByteProperty(SimpleString, byte) - */ - Message putByteProperty(String key, byte value); + putLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString(), original.getLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString())); + } else { + putStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString(), original.getAddress()); - /** - * Puts a byte[] property in this message. - * - * @param key property name - * @param value property value - */ - Message putBytesProperty(SimpleString key, byte[] value); + putLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString(), original.getMessageID()); + } - /** - * @see #putBytesProperty(SimpleString, byte[]) - */ - Message putBytesProperty(String key, byte[] value); + // reset expiry + setExpiration(0); + } /** - * Puts a short property in this message. - * - * @param key property name - * @param value property value + * it will translate a property named HDR_DUPLICATE_DETECTION_ID. + * TODO-NOW: this can probably be replaced by an utility. + * @return */ - Message putShortProperty(SimpleString key, short value); + default byte[] getDuplicateIDBytes() { + Object duplicateID = getDuplicateProperty(); - /** - * @see #putShortProperty(SimpleString, short) - */ - Message putShortProperty(String key, short value); + if (duplicateID == null) { + return null; + } else { + if (duplicateID instanceof SimpleString) { + return ((SimpleString) duplicateID).getData(); + } else if (duplicateID instanceof String) { + return new SimpleString(duplicateID.toString()).getData(); + } else { + return (byte[]) duplicateID; + } + } + } /** - * Puts a char property in this message. - * - * @param key property name - * @param value property value + * it will translate a property named HDR_DUPLICATE_DETECTION_ID. + * TODO-NOW: this can probably be replaced by an utility. + * @return */ - Message putCharProperty(SimpleString key, char value); + default Object getDuplicateProperty() { + return getObjectProperty(Message.HDR_DUPLICATE_DETECTION_ID.toString()); + } - /** - * @see #putCharProperty(SimpleString, char) - */ - Message putCharProperty(String key, char value); - /** - * Puts an int property in this message. - * - * @param key property name - * @param value property value - */ - Message putIntProperty(SimpleString key, int value); + Message putBooleanProperty(String key, boolean value); - /** - * @see #putIntProperty(SimpleString, int) - */ - Message putIntProperty(String key, int value); + Message putByteProperty(String key, byte value); - /** - * Puts a long property in this message. - * - * @param key property name - * @param value property value - */ - Message putLongProperty(SimpleString key, long value); + Message putBytesProperty(String key, byte[] value); - /** - * @see #putLongProperty(SimpleString, long) - */ - Message putLongProperty(String key, long value); + Message putShortProperty(String key, short value); - /** - * Puts a float property in this message. - * - * @param key property name - * @param value property value - */ - Message putFloatProperty(SimpleString key, float value); + Message putCharProperty(String key, char value); - /** - * @see #putFloatProperty(SimpleString, float) - */ - Message putFloatProperty(String key, float value); + Message putIntProperty(String key, int value); - /** - * Puts a double property in this message. - * - * @param key property name - * @param value property value - */ - Message putDoubleProperty(SimpleString key, double value); + Message putLongProperty(String key, long value); + + Message putFloatProperty(String key, float value); - /** - * @see #putDoubleProperty(SimpleString, double) - */ Message putDoubleProperty(String key, double value); - /** - * Puts a SimpleString property in this message. - * - * @param key property name - * @param value property value - */ - Message putStringProperty(SimpleString key, SimpleString value); + + + Message putBooleanProperty(SimpleString key, boolean value); + + Message putByteProperty(SimpleString key, byte value); + + Message putBytesProperty(SimpleString key, byte[] value); + + Message putShortProperty(SimpleString key, short value); + + Message putCharProperty(SimpleString key, char value); + + Message putIntProperty(SimpleString key, int value); + + Message putLongProperty(SimpleString key, long value); + + Message putFloatProperty(SimpleString key, float value); + + Message putDoubleProperty(SimpleString key, double value); /** * Puts a String property in this message. @@ -397,202 +447,125 @@ public interface Message { */ Message putStringProperty(String key, String value); - /** - * Puts an Object property in this message. <br> - * Accepted types are: - * <ul> - * <li>Boolean</li> - * <li>Byte</li> - * <li>Short</li> - * <li>Character</li> - * <li>Integer</li> - * <li>Long</li> - * <li>Float</li> - * <li>Double</li> - * <li>String</li> - * <li>SimpleString</li> - * </ul> - * Using any other type will throw a PropertyConversionException. - * - * @param key property name - * @param value property value - * @throws ActiveMQPropertyConversionException if the value is not one of the accepted property - * types. - */ - Message putObjectProperty(SimpleString key, Object value) throws ActiveMQPropertyConversionException; - - /** - * @see #putObjectProperty(SimpleString, Object) - */ Message putObjectProperty(String key, Object value) throws ActiveMQPropertyConversionException; - /** - * Removes the property corresponding to the specified key. - * - * @param key property name - * @return the value corresponding to the specified key or @{code null} - */ - Object removeProperty(SimpleString key); + Message putObjectProperty(SimpleString key, Object value) throws ActiveMQPropertyConversionException; - /** - * @see #removeProperty(SimpleString) - */ Object removeProperty(String key); - /** - * Returns {@code true} if this message contains a property with the given key, {@code false} else. - * - * @param key property name - */ - boolean containsProperty(SimpleString key); - - /** - * @see #containsProperty(SimpleString) - */ boolean containsProperty(String key); - /** - * Returns the property corresponding to the specified key as a Boolean. - * - * @throws ActiveMQPropertyConversionException if the value can not be converted to a Boolean - */ - Boolean getBooleanProperty(SimpleString key) throws ActiveMQPropertyConversionException; - - /** - * @see #getBooleanProperty(SimpleString) - */ Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException; - /** - * Returns the property corresponding to the specified key as a Byte. - * - * @throws ActiveMQPropertyConversionException if the value can not be converted to a Byte - */ - Byte getByteProperty(SimpleString key) throws ActiveMQPropertyConversionException; - - /** - * @see #getByteProperty(SimpleString) - */ Byte getByteProperty(String key) throws ActiveMQPropertyConversionException; - /** - * Returns the property corresponding to the specified key as a Double. - * - * @throws ActiveMQPropertyConversionException if the value can not be converted to a Double - */ - Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException; - - /** - * @see #getDoubleProperty(SimpleString) - */ Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException; - /** - * Returns the property corresponding to the specified key as an Integer. - * - * @throws ActiveMQPropertyConversionException if the value can not be converted to an Integer - */ - Integer getIntProperty(SimpleString key) throws ActiveMQPropertyConversionException; - - /** - * @see #getIntProperty(SimpleString) - */ Integer getIntProperty(String key) throws ActiveMQPropertyConversionException; - /** - * Returns the property corresponding to the specified key as a Long. - * - * @throws ActiveMQPropertyConversionException if the value can not be converted to a Long - */ - Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException; - - /** - * @see #getLongProperty(SimpleString) - */ Long getLongProperty(String key) throws ActiveMQPropertyConversionException; - /** - * Returns the property corresponding to the specified key - */ - Object getObjectProperty(SimpleString key); - - /** - * @see #getBooleanProperty(SimpleString) - */ Object getObjectProperty(String key); - /** - * Returns the property corresponding to the specified key as a Short. - * - * @throws ActiveMQPropertyConversionException if the value can not be converted to a Short - */ - Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException; - - /** - * @see #getShortProperty(SimpleString) - */ Short getShortProperty(String key) throws ActiveMQPropertyConversionException; - /** - * Returns the property corresponding to the specified key as a Float. - * - * @throws ActiveMQPropertyConversionException if the value can not be converted to a Float - */ - Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException; - - /** - * @see #getFloatProperty(SimpleString) - */ Float getFloatProperty(String key) throws ActiveMQPropertyConversionException; - /** - * Returns the property corresponding to the specified key as a String. - * - * @throws ActiveMQPropertyConversionException if the value can not be converted to a String - */ - String getStringProperty(SimpleString key) throws ActiveMQPropertyConversionException; - - /** - * @see #getStringProperty(SimpleString) - */ String getStringProperty(String key) throws ActiveMQPropertyConversionException; - /** - * Returns the property corresponding to the specified key as a SimpleString. - * - * @throws ActiveMQPropertyConversionException if the value can not be converted to a SimpleString - */ - SimpleString getSimpleStringProperty(SimpleString key) throws ActiveMQPropertyConversionException; - - /** - * @see #getSimpleStringProperty(SimpleString) - */ SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException; - /** - * Returns the property corresponding to the specified key as a byte[]. - * - * @throws ActiveMQPropertyConversionException if the value can not be converted to a byte[] - */ + byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException; + + + Object removeProperty(SimpleString key); + + boolean containsProperty(SimpleString key); + + Boolean getBooleanProperty(SimpleString key) throws ActiveMQPropertyConversionException; + + Byte getByteProperty(SimpleString key) throws ActiveMQPropertyConversionException; + + Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException; + + Integer getIntProperty(SimpleString key) throws ActiveMQPropertyConversionException; + + Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException; + + Object getObjectProperty(SimpleString key); + + Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException; + + Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException; + + String getStringProperty(SimpleString key) throws ActiveMQPropertyConversionException; + + SimpleString getSimpleStringProperty(SimpleString key) throws ActiveMQPropertyConversionException; + byte[] getBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException; + Message putStringProperty(SimpleString key, SimpleString value); + /** - * @see #getBytesProperty(SimpleString) + * Returns the size of the <em>encoded</em> message. */ - byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException; + int getEncodeSize(); /** * Returns all the names of the properties for this message. */ Set<SimpleString> getPropertyNames(); + + + int getRefCount(); + + int incrementRefCount() throws Exception; + + int decrementRefCount() throws Exception; + + int incrementDurableRefCount(); + + int decrementDurableRefCount(); + /** * @return Returns the message in Map form, useful when encoding to JSON */ - Map<String, Object> toMap(); + default Map<String, Object> toMap() { + Map map = toPropertyMap(); + map.put("messageID", getMessageID()); + Object userID = getUserID(); + if (getUserID() != null) { + map.put("userID", "ID:" + userID.toString()); + } + + map.put("address", getAddress()); + map.put("type", getBodyType().toString()); + map.put("durable", isDurable()); + map.put("expiration", getExpiration()); + map.put("timestamp", getTimestamp()); + map.put("priority", (int)getPriority()); + + return map; + } /** * @return Returns the message properties in Map form, useful when encoding to JSON */ - Map<String, Object> toPropertyMap(); + default Map<String, Object> toPropertyMap() { + Map map = new HashMap<>(); + for (SimpleString name : getPropertyNames()) { + map.put(name.toString(), getObjectProperty(name.toString())); + } + return map; + } + + + /** This should make you convert your message into Core format. */ + Message toCore(); + + int getMemoryEstimate(); + + + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java new file mode 100644 index 0000000..64dd44d --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.api.core; + +import java.util.concurrent.atomic.AtomicInteger; + +public abstract class RefCountMessage implements Message { + + private final AtomicInteger durableRefCount = new AtomicInteger(); + + private final AtomicInteger refCount = new AtomicInteger(); + + private RefCountMessageListener context; + + @Override + public Message setContext(RefCountMessageListener context) { + this.context = context; + return this; + } + + @Override + public RefCountMessageListener getContext() { + return context; + } + + @Override + public int getRefCount() { + return refCount.get(); + } + + @Override + public int incrementRefCount() throws Exception { + int count = refCount.incrementAndGet(); + if (context != null) { + context.nonDurableUp(this, count); + } + return count; + } + + @Override + public int incrementDurableRefCount() { + int count = durableRefCount.incrementAndGet(); + if (context != null) { + context.durableUp(this, count); + } + return count; + } + + @Override + public int decrementDurableRefCount() { + int count = durableRefCount.decrementAndGet(); + if (context != null) { + context.durableDown(this, count); + } + return count; + } + + @Override + public int decrementRefCount() throws Exception { + int count = refCount.decrementAndGet(); + if (context != null) { + context.nonDurableDown(this, count); + } + return count; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessageListener.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessageListener.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessageListener.java new file mode 100644 index 0000000..e68dffd --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessageListener.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.api.core; + +/** If {@link Message#getContext()} != null and is implementing this interface. + * These methods will be called during refCount operations */ +public interface RefCountMessageListener { + + void durableUp(Message message, int durableCount); + + void durableDown(Message message, int durableCount); + + void nonDurableUp(Message message, int nonDurableCoun); + + void nonDurableDown(Message message, int nonDurableCoun); +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java index e87d365..daded00 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java @@ -234,24 +234,16 @@ public interface ClientMessage extends Message { * Overridden from {@link Message} to enable fluent API */ @Override - ClientMessage putStringProperty(SimpleString key, SimpleString value); - - /** - * Overridden from {@link Message} to enable fluent API - */ - @Override ClientMessage putStringProperty(String key, String value); /** * Overridden from {@link Message} to enable fluent API */ - @Override ClientMessage writeBodyBufferBytes(byte[] bytes); /** * Overridden from {@link Message} to enable fluent API */ - @Override ClientMessage writeBodyBufferString(String string); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/BodyType.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/BodyType.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/BodyType.java new file mode 100644 index 0000000..743583b --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/BodyType.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.api.core.encode; + +public enum BodyType { + Undefined, Bytes, Map, Object, Stream, Text +}