ARTEMIS-1009 Pure Message Encoding. with this we could send and receive message in their raw format, without requiring conversions to Core.
- MessageImpl and ServerMessage are removed as part of this - AMQPMessage and CoreMessage will have the specialized message format for each protocol - The protocol manager is now responsible to send the message - The message will provide an encoder for journal and paging Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/fe0ca4d8 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/fe0ca4d8 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/fe0ca4d8 Branch: refs/heads/master Commit: fe0ca4d84fc587b701e4b83ae8e5f2240c222842 Parents: c1fa5d0 Author: Clebert Suconic <[email protected]> Authored: Mon Feb 20 15:55:15 2017 -0500 Committer: Clebert Suconic <[email protected]> Committed: Sun Mar 5 23:08:53 2017 -0500 ---------------------------------------------------------------------- .../artemis/cli/commands/tools/PrintData.java | 7 + .../cli/commands/tools/XmlDataExporter.java | 42 +- .../cli/commands/tools/XmlDataExporterUtil.java | 11 +- .../cli/commands/tools/XmlDataImporter.java | 10 +- .../org/apache/activemq/artemis/Closeable.java | 22 + .../artemis/api/core/ActiveMQBuffer.java | 13 + .../artemis/api/core/ActiveMQBuffers.java | 15 + .../activemq/artemis/api/core/SimpleString.java | 34 + .../core/buffers/impl/ChannelBufferWrapper.java | 84 +- .../artemis/core/persistence/Persister.java | 30 + .../apache/activemq/artemis/utils/ByteUtil.java | 8 + .../activemq/artemis/utils/TypedProperties.java | 62 +- .../apache/activemq/artemis/utils/UTF8Util.java | 36 +- .../artemis/utils/TypedPropertiesTest.java | 10 +- .../config/ActiveMQDefaultConfiguration.java | 20 - .../activemq/artemis/api/core/ICoreMessage.java | 90 ++ .../activemq/artemis/api/core/Message.java | 637 +++++------ .../artemis/api/core/RefCountMessage.java | 81 ++ .../api/core/RefCountMessageListener.java | 31 + .../artemis/api/core/client/ClientMessage.java | 69 +- .../api/core/management/ManagementHelper.java | 18 +- .../impl/ResetLimitWrappedActiveMQBuffer.java | 24 +- .../core/client/impl/ClientConsumerImpl.java | 4 +- .../client/impl/ClientLargeMessageImpl.java | 22 +- .../core/client/impl/ClientMessageImpl.java | 69 +- .../core/client/impl/ClientMessageInternal.java | 4 +- .../core/client/impl/ClientProducerImpl.java | 50 +- .../CompressedLargeMessageControllerImpl.java | 6 + .../client/impl/LargeMessageControllerImpl.java | 15 + .../artemis/core/message/BodyEncoder.java | 55 - .../artemis/core/message/LargeBodyEncoder.java | 55 + .../artemis/core/message/impl/CoreMessage.java | 1068 ++++++++++++++++++ .../core/message/impl/CoreMessagePersister.java | 66 ++ .../artemis/core/message/impl/MessageImpl.java | 1059 ----------------- .../core/message/impl/MessageInternal.java | 57 - .../core/impl/ActiveMQSessionContext.java | 17 +- .../core/protocol/core/impl/ChannelImpl.java | 1 + .../core/protocol/core/impl/PacketImpl.java | 30 +- .../core/impl/RemotingConnectionImpl.java | 1 + .../core/impl/wireformat/MessagePacket.java | 21 +- .../SessionReceiveClientLargeMessage.java | 5 +- .../wireformat/SessionReceiveLargeMessage.java | 14 +- .../impl/wireformat/SessionReceiveMessage.java | 60 +- .../SessionSendContinuationMessage.java | 8 +- .../wireformat/SessionSendLargeMessage.java | 12 +- .../impl/wireformat/SessionSendMessage.java | 54 +- .../activemq/artemis/reader/MapMessageUtil.java | 4 +- .../spi/core/remoting/SessionContext.java | 14 +- .../artemis/message/CoreMessageTest.java | 365 ++++++ .../jdbc/store/journal/JDBCJournalImpl.java | 36 +- .../jdbc/store/journal/JDBCJournalRecord.java | 7 +- .../api/jms/management/JMSManagementHelper.java | 3 +- .../jms/client/ActiveMQBytesMessage.java | 4 +- .../artemis/jms/client/ActiveMQMessage.java | 8 +- .../jms/transaction/JMSTransactionDetail.java | 12 +- .../artemis/core/journal/EncoderPersister.java | 51 + .../activemq/artemis/core/journal/Journal.java | 55 +- .../journal/impl/AbstractJournalUpdateTask.java | 3 +- .../core/journal/impl/FileWrapperJournal.java | 26 +- .../artemis/core/journal/impl/JournalBase.java | 63 +- .../core/journal/impl/JournalCompactor.java | 9 +- .../artemis/core/journal/impl/JournalImpl.java | 62 +- .../impl/dataformat/JournalAddRecord.java | 20 +- .../impl/dataformat/JournalAddRecordTX.java | 17 +- .../protocol/amqp/broker/AMQPMessage.java | 872 ++++++++++++++ .../amqp/broker/AMQPMessagePersister.java | 75 ++ .../amqp/broker/AMQPSessionCallback.java | 45 +- .../amqp/broker/ProtonProtocolManager.java | 12 +- .../broker/ProtonProtocolManagerFactory.java | 14 + .../amqp/converter/AMQPContentTypeSupport.java | 146 +++ .../protocol/amqp/converter/AMQPConverter.java | 44 + .../amqp/converter/AMQPMessageIdHelper.java | 252 +++++ .../amqp/converter/AMQPMessageSupport.java | 308 +++++ .../amqp/converter/AmqpCoreConverter.java | 351 ++++++ .../amqp/converter/CoreAmqpConverter.java | 461 ++++++++ .../amqp/converter/ProtonMessageConverter.java | 101 -- .../converter/jms/ServerJMSBytesMessage.java | 10 +- .../amqp/converter/jms/ServerJMSMapMessage.java | 6 +- .../amqp/converter/jms/ServerJMSMessage.java | 71 +- .../converter/jms/ServerJMSObjectMessage.java | 9 +- .../converter/jms/ServerJMSStreamMessage.java | 8 +- .../converter/jms/ServerJMSTextMessage.java | 6 +- .../message/AMQPContentTypeSupport.java | 146 --- .../converter/message/AMQPMessageIdHelper.java | 252 ----- .../converter/message/AMQPMessageSupport.java | 276 ----- .../converter/message/AMQPMessageTypes.java | 30 - .../message/AMQPNativeInboundTransformer.java | 44 - .../message/AMQPNativeOutboundTransformer.java | 80 -- .../message/AMQPRawInboundTransformer.java | 62 - .../amqp/converter/message/EncodedMessage.java | 67 -- .../converter/message/InboundTransformer.java | 243 ---- .../message/JMSMappingInboundTransformer.java | 196 ---- .../message/JMSMappingOutboundTransformer.java | 592 ---------- .../converter/message/OutboundTransformer.java | 53 - .../amqp/proton/AMQPConnectionContext.java | 4 + .../proton/ProtonServerReceiverContext.java | 39 +- .../amqp/proton/ProtonServerSenderContext.java | 30 +- .../amqp/proton/ProtonTransactionHandler.java | 3 +- .../amqp/proton/handler/ProtonHandler.java | 2 +- .../protocol/amqp/util/NettyReadable.java | 139 +++ .../artemis/protocol/amqp/util/TLSEncode.java | 52 + .../amqp/converter/TestConversions.java | 619 +--------- .../message/AMQPContentTypeSupportTest.java | 10 +- .../message/AMQPMessageIdHelperTest.java | 11 +- .../message/AMQPMessageSupportTest.java | 11 +- .../JMSMappingInboundTransformerTest.java | 234 +--- .../JMSMappingOutboundTransformerTest.java | 387 +------ .../JMSTransformationSpeedComparisonTest.java | 94 +- .../message/MessageTransformationTest.java | 150 +-- .../protocol/amqp/message/AMQPMessageTest.java | 63 ++ .../core/protocol/mqtt/MQTTProtocolManager.java | 6 - .../core/protocol/mqtt/MQTTPublishManager.java | 31 +- .../protocol/mqtt/MQTTRetainMessageManager.java | 8 +- .../core/protocol/mqtt/MQTTSessionCallback.java | 12 +- .../artemis/core/protocol/mqtt/MQTTUtil.java | 20 +- .../protocol/openwire/OpenWireConnection.java | 3 - .../openwire/OpenWireMessageConverter.java | 34 +- .../openwire/OpenWireProtocolManager.java | 9 +- .../core/protocol/openwire/OpenwireMessage.java | 499 ++++++++ .../core/protocol/openwire/amq/AMQConsumer.java | 9 +- .../core/protocol/openwire/amq/AMQSession.java | 14 +- .../protocol/openwire/util/OpenWireUtil.java | 12 +- .../ActiveMQStompProtocolMessageBundle.java | 3 +- .../core/protocol/stomp/StompConnection.java | 17 +- .../protocol/stomp/StompProtocolManager.java | 14 +- .../core/protocol/stomp/StompSession.java | 51 +- .../artemis/core/protocol/stomp/StompUtils.java | 6 +- .../stomp/VersionedStompFrameHandler.java | 23 +- .../stomp/v12/StompFrameHandlerV12.java | 8 +- .../artemis/core/config/Configuration.java | 8 - .../core/config/impl/ConfigurationImpl.java | 32 - .../deployers/impl/FileConfigurationParser.java | 4 - .../activemq/artemis/core/filter/Filter.java | 4 +- .../artemis/core/filter/impl/FilterImpl.java | 19 +- .../management/impl/AddressControlImpl.java | 6 +- .../core/management/impl/QueueControlImpl.java | 10 +- .../impl/openmbean/OpenTypeSupport.java | 21 +- .../artemis/core/paging/PagedMessage.java | 4 +- .../artemis/core/paging/PagingStore.java | 7 +- .../core/paging/cursor/PagedReferenceImpl.java | 16 +- .../cursor/impl/PageSubscriptionImpl.java | 4 +- .../activemq/artemis/core/paging/impl/Page.java | 5 +- .../core/paging/impl/PagedMessageImpl.java | 70 +- .../core/paging/impl/PagingStoreImpl.java | 46 +- .../core/persistence/StorageManager.java | 16 +- .../journal/AbstractJournalStorageManager.java | 60 +- .../impl/journal/AddMessageRecord.java | 8 +- .../impl/journal/DescribeJournal.java | 17 +- .../impl/journal/JournalRecordIds.java | 3 + .../impl/journal/JournalStorageManager.java | 14 +- .../journal/LargeMessageTXFailureCallback.java | 6 +- .../impl/journal/LargeServerMessageImpl.java | 108 +- .../journal/codec/LargeMessageEncoding.java | 55 - .../journal/codec/LargeMessagePersister.java | 62 + .../nullpm/NullStorageLargeServerMessage.java | 16 +- .../impl/nullpm/NullStorageManager.java | 15 +- .../artemis/core/postoffice/Binding.java | 9 +- .../artemis/core/postoffice/Bindings.java | 6 +- .../artemis/core/postoffice/PostOffice.java | 18 +- .../core/postoffice/impl/BindingsImpl.java | 26 +- .../core/postoffice/impl/DivertBinding.java | 8 +- .../core/postoffice/impl/LocalQueueBinding.java | 8 +- .../core/postoffice/impl/PostOfficeImpl.java | 121 +- .../core/protocol/ServerPacketDecoder.java | 6 +- .../core/ServerSessionPacketHandler.java | 83 +- .../core/impl/ActiveMQPacketHandler.java | 2 +- .../protocol/core/impl/CoreProtocolManager.java | 13 +- .../core/impl/CoreProtocolManagerFactory.java | 14 + .../protocol/core/impl/CoreSessionCallback.java | 9 +- .../impl/wireformat/ReplicationAddMessage.java | 14 +- .../wireformat/ReplicationAddTXMessage.java | 14 +- .../wireformat/ReplicationPageWriteMessage.java | 2 +- .../core/remoting/server/RemotingService.java | 4 + .../server/impl/RemotingServiceImpl.java | 11 +- .../core/replication/ReplicatedJournal.java | 52 +- .../core/replication/ReplicationEndpoint.java | 7 +- .../core/replication/ReplicationManager.java | 11 +- .../core/server/ActiveMQServerLogger.java | 8 +- .../activemq/artemis/core/server/Bindable.java | 6 +- .../artemis/core/server/LargeServerMessage.java | 3 +- .../artemis/core/server/MessageReference.java | 10 +- .../activemq/artemis/core/server/Queue.java | 3 +- .../artemis/core/server/ServerMessage.java | 78 -- .../artemis/core/server/ServerSession.java | 23 +- .../core/server/cluster/Transformer.java | 4 +- .../core/server/cluster/impl/BridgeImpl.java | 14 +- .../cluster/impl/ClusterConnectionBridge.java | 13 +- .../core/server/cluster/impl/Redistributor.java | 3 +- .../cluster/impl/RemoteQueueBindingImpl.java | 13 +- .../core/server/impl/ActiveMQServerImpl.java | 2 +- .../artemis/core/server/impl/DivertImpl.java | 9 +- .../artemis/core/server/impl/JournalLoader.java | 6 +- .../core/server/impl/LastValueQueue.java | 9 +- .../core/server/impl/MessageReferenceImpl.java | 24 +- .../server/impl/PostOfficeJournalLoader.java | 7 +- .../artemis/core/server/impl/QueueImpl.java | 60 +- .../artemis/core/server/impl/RefsOperation.java | 4 +- .../core/server/impl/ScaleDownHandler.java | 37 +- .../core/server/impl/ServerConsumerImpl.java | 30 +- .../core/server/impl/ServerMessageImpl.java | 341 ------ .../core/server/impl/ServerSessionImpl.java | 156 +-- .../server/management/ManagementService.java | 7 +- .../management/impl/ManagementServiceImpl.java | 15 +- .../core/transaction/TransactionDetail.java | 8 +- .../transaction/impl/CoreTransactionDetail.java | 11 +- .../spi/core/protocol/MessageConverter.java | 10 +- .../spi/core/protocol/MessagePersister.java | 88 ++ .../spi/core/protocol/ProtocolManager.java | 12 +- .../core/protocol/ProtocolManagerFactory.java | 15 + .../spi/core/protocol/SessionCallback.java | 6 +- .../resources/schema/artemis-configuration.xsd | 16 - .../core/config/impl/ConfigurationImplTest.java | 9 - .../artemis/core/filter/impl/FilterTest.java | 12 +- .../group/impl/ClusteredResetMockTest.java | 7 +- .../impl/ScheduledDeliveryHandlerTest.java | 200 ++-- .../transaction/impl/TransactionImplTest.java | 16 +- .../artemis/tests/util/ActiveMQTestBase.java | 14 +- .../resources/ConfigurationTest-full-config.xml | 2 - .../test/resources/artemis-configuration.xsd | 16 - .../jms/example/HatColourChangeTransformer.java | 4 +- .../example/AddForwardingTimeTransformer.java | 7 +- pom.xml | 5 +- .../PartialPooledByteBufAllocator.java | 5 + .../amqp/client/util/UnmodifiableDelivery.java | 6 + .../journal/gcfree/EncodersBench.java | 5 +- .../byteman/JMSBridgeReconnectionTest.java | 4 +- .../tests/extras/byteman/MessageCopyTest.java | 163 --- .../integration/DuplicateDetectionTest.java | 6 +- .../amqp/AmqpDescribedTypePayloadTest.java | 6 +- .../integration/amqp/AmqpSendReceiveTest.java | 21 - .../tests/integration/amqp/ProtonTest.java | 73 +- .../integration/client/AckBatchSizeTest.java | 14 +- .../integration/client/AcknowledgeTest.java | 172 ++- .../tests/integration/client/ConsumerTest.java | 265 ++++- .../integration/client/HangConsumerTest.java | 7 +- .../InVMNonPersistentMessageBufferTest.java | 36 +- .../client/InterruptedLargeMessageTest.java | 10 +- .../integration/client/LargeMessageTest.java | 5 +- .../integration/clientcrash/ClientExitTest.java | 4 +- .../integration/cluster/bridge/BridgeTest.java | 10 +- .../cluster/bridge/SimpleTransformer.java | 6 +- .../distribution/ClusterHeadersRemovedTest.java | 5 +- .../distribution/MessageRedistributionTest.java | 4 +- .../tests/integration/divert/DivertTest.java | 5 +- .../interceptors/InterceptorTest.java | 8 +- .../integration/journal/MessageJournalTest.java | 130 +++ .../journal/NIOJournalCompactTest.java | 6 +- .../integration/karaf/ArtemisFeatureTest.java | 2 + .../management/ManagementHelperTest.java | 8 +- .../management/ManagementServiceImplTest.java | 25 +- .../integration/paging/PagingSendTest.java | 3 +- .../tests/integration/paging/PagingTest.java | 4 +- .../DeleteMessagesOnStartupTest.java | 10 +- .../persistence/ExportFormatTest.java | 28 +- .../replication/ReplicationTest.java | 71 +- .../integration/server/FakeStorageManager.java | 6 +- .../tests/integration/server/ScaleDownTest.java | 4 +- .../ssl/CoreClientOverOneWaySSLTest.java | 4 +- .../ssl/CoreClientOverTwoWaySSLTest.java | 5 +- .../storage/PersistMultiThreadTest.java | 30 +- .../stress/paging/PageCursorStressTest.java | 24 +- .../core/server/impl/QueueConcurrentTest.java | 6 +- tests/unit-tests/pom.xml | 6 + .../core/journal/impl/JournalImplTestUnit.java | 2 +- .../unit/core/message/impl/MessageImplTest.java | 14 +- .../tests/unit/core/paging/impl/PageTest.java | 42 +- .../core/paging/impl/PagingManagerImplTest.java | 16 +- .../core/paging/impl/PagingStoreImplTest.java | 64 +- .../core/postoffice/impl/BindingsImplTest.java | 16 +- .../unit/core/postoffice/impl/FakeQueue.java | 9 +- .../impl/WildcardAddressManagerUnitTest.java | 12 +- .../unit/core/server/impl/QueueImplTest.java | 4 +- .../unit/core/server/impl/fakes/FakeFilter.java | 7 +- .../server/impl/fakes/FakeJournalLoader.java | 6 +- .../core/server/impl/fakes/FakePostOffice.java | 22 +- .../tests/unit/util/FakePagingManager.java | 7 +- .../artemis/tests/unit/util/MemorySizeTest.java | 4 +- .../artemis/tests/unit/util/UTF8Test.java | 10 +- 278 files changed, 8452 insertions(+), 7737 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java index 408aef5..2816aaf 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java @@ -34,6 +34,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.cli.Artemis; import org.apache.activemq.artemis.cli.commands.ActionContext; import org.apache.activemq.artemis.core.journal.RecordInfo; +import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; @@ -50,16 +51,22 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordId import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding; import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; +import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory; import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository; +import org.apache.activemq.artemis.spi.core.protocol.MessagePersister; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ExecutorFactory; @Command(name = "print", description = "Print data records information (WARNING: don't use while a production server is running)") public class PrintData extends OptionalLocking { + static { + MessagePersister.registerPersister(CoreProtocolManagerFactory.ID, CoreMessagePersister.getInstance()); + } + @Override public Object execute(ActionContext context) throws Exception { super.execute(context); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java index 4f99181..d2f6204 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.artemis.cli.commands.tools; +import javax.xml.stream.XMLOutputFactory; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamWriter; import java.io.File; import java.io.OutputStream; import java.lang.reflect.InvocationHandler; @@ -33,14 +36,13 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import javax.xml.stream.XMLOutputFactory; -import javax.xml.stream.XMLStreamException; -import javax.xml.stream.XMLStreamWriter; - +import io.airlift.airline.Command; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.cli.commands.ActionContext; import org.apache.activemq.artemis.core.config.Configuration; @@ -50,7 +52,7 @@ import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; -import org.apache.activemq.artemis.core.message.BodyEncoder; +import org.apache.activemq.artemis.core.message.LargeBodyEncoder; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; @@ -74,8 +76,6 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.Persisten import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.LargeServerMessage; -import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository; @@ -83,8 +83,6 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.OrderedExecutorFactory; -import io.airlift.airline.Command; - @Command(name = "exp", description = "Export all message-data using an XML that could be interpreted by any system.") public final class XmlDataExporter extends OptionalLocking { @@ -220,7 +218,9 @@ public final class XmlDataExporter extends OptionalLocking { Object o = DescribeJournal.newObjectEncoding(info, storageManager); if (info.getUserRecordType() == JournalRecordIds.ADD_MESSAGE) { - messages.put(info.id, ((MessageDescribe) o).getMsg()); + messages.put(info.id, ((MessageDescribe) o).getMsg().toCore()); + } else if (info.getUserRecordType() == JournalRecordIds.ADD_MESSAGE_PROTOCOL) { + messages.put(info.id, ((MessageDescribe) o).getMsg().toCore()); } else if (info.getUserRecordType() == JournalRecordIds.ADD_LARGE_MESSAGE) { messages.put(info.id, ((MessageDescribe) o).getMsg()); } else if (info.getUserRecordType() == JournalRecordIds.ADD_REF) { @@ -361,13 +361,13 @@ public final class XmlDataExporter extends OptionalLocking { xmlWriter.writeEndElement(); // end BINDINGS_PARENT } - private void printAllMessagesAsXML() throws XMLStreamException { + private void printAllMessagesAsXML() throws Exception { xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_PARENT); // Order here is important. We must process the messages from the journal before we process those from the page // files in order to get the messages in the right order. for (Map.Entry<Long, Message> messageMapEntry : messages.entrySet()) { - printSingleMessageAsXML((ServerMessage) messageMapEntry.getValue(), extractQueueNames(messageRefs.get(messageMapEntry.getKey()))); + printSingleMessageAsXML(messageMapEntry.getValue().toCore(), extractQueueNames(messageRefs.get(messageMapEntry.getKey()))); } printPagedMessagesAsXML(); @@ -439,7 +439,7 @@ public final class XmlDataExporter extends OptionalLocking { } if (queueNames.size() > 0 && (message.getTransactionID() == -1 || pgTXs.contains(message.getTransactionID()))) { - printSingleMessageAsXML(message.getMessage(), queueNames); + printSingleMessageAsXML(message.getMessage().toCore(), queueNames); } messageId++; @@ -456,20 +456,20 @@ public final class XmlDataExporter extends OptionalLocking { } } - private void printSingleMessageAsXML(ServerMessage message, List<String> queues) throws XMLStreamException { + private void printSingleMessageAsXML(ICoreMessage message, List<String> queues) throws Exception { xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_CHILD); printMessageAttributes(message); printMessageProperties(message); printMessageQueues(queues); - printMessageBody(message); + printMessageBody(message.toCore()); xmlWriter.writeEndElement(); // end MESSAGES_CHILD messagesPrinted++; } - private void printMessageBody(ServerMessage message) throws XMLStreamException { + private void printMessageBody(Message message) throws Exception { xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY); - if (message.isLargeMessage()) { + if (message.toCore().isLargeMessage()) { printLargeMessageBody((LargeServerMessage) message); } else { xmlWriter.writeCData(XmlDataExporterUtil.encodeMessageBody(message)); @@ -479,10 +479,10 @@ public final class XmlDataExporter extends OptionalLocking { private void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException { xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString()); - BodyEncoder encoder = null; + LargeBodyEncoder encoder = null; try { - encoder = message.getBodyEncoder(); + encoder = message.toCore().getBodyEncoder(); encoder.open(); long totalBytesWritten = 0; Long bufferSize; @@ -522,7 +522,7 @@ public final class XmlDataExporter extends OptionalLocking { xmlWriter.writeEndElement(); // end QUEUES_PARENT } - private void printMessageProperties(ServerMessage message) throws XMLStreamException { + private void printMessageProperties(Message message) throws XMLStreamException { xmlWriter.writeStartElement(XmlDataConstants.PROPERTIES_PARENT); for (SimpleString key : message.getPropertyNames()) { Object value = message.getObjectProperty(key); @@ -539,7 +539,7 @@ public final class XmlDataExporter extends OptionalLocking { xmlWriter.writeEndElement(); // end PROPERTIES_PARENT } - private void printMessageAttributes(ServerMessage message) throws XMLStreamException { + private void printMessageAttributes(ICoreMessage message) throws XMLStreamException { xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_ID, Long.toString(message.getMessageID())); xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_PRIORITY, Byte.toString(message.getPriority())); xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_EXPIRATION, Long.toString(message.getExpiration())); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java index 8ee7678..7711648 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java @@ -17,10 +17,9 @@ package org.apache.activemq.artemis.cli.commands.tools; import com.google.common.base.Preconditions; - +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.utils.Base64; /** @@ -92,12 +91,12 @@ public class XmlDataExporterUtil { * @param message * @return */ - public static String encodeMessageBody(final ServerMessage message) { + public static String encodeMessageBody(final Message message) throws Exception { Preconditions.checkNotNull(message, "ServerMessage can not be null"); - int size = message.getEndOfBodyPosition() - message.getBodyBuffer().readerIndex(); - byte[] buffer = new byte[size]; - message.getBodyBuffer().readBytes(buffer); + ActiveMQBuffer byteBuffer = message.toCore().getReadOnlyBodyBuffer(); + byte[] buffer = new byte[byteBuffer.writerIndex()]; + byteBuffer.readBytes(buffer); return XmlDataExporterUtil.encode(buffer); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java index 8e2bb9f..518d231 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java @@ -45,7 +45,9 @@ import java.util.UUID; import io.airlift.airline.Command; import io.airlift.airline.Option; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; @@ -59,11 +61,9 @@ import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.cli.commands.ActionAbstract; import org.apache.activemq.artemis.cli.commands.ActionContext; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.ClassloadingUtil; import org.apache.activemq.artemis.utils.ListUtil; @@ -298,7 +298,7 @@ public final class XmlDataImporter extends ActionAbstract { switch (eventType) { case XMLStreamConstants.START_ELEMENT: if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName())) { - processMessageBody(message); + processMessageBody(message.toCore()); } else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName())) { processMessageProperties(message); } else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName())) { @@ -387,7 +387,7 @@ public final class XmlDataImporter extends ActionAbstract { logger.debug(logMessage); } - message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array()); + message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array()); try (ClientProducer producer = session.createProducer(destination)) { producer.send(message); } @@ -469,7 +469,7 @@ public final class XmlDataImporter extends ActionAbstract { } } - private void processMessageBody(final Message message) throws XMLStreamException, IOException { + private void processMessageBody(final ICoreMessage message) throws XMLStreamException, IOException { boolean isLarge = false; for (int i = 0; i < reader.getAttributeCount(); i++) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-commons/src/main/java/org/apache/activemq/artemis/Closeable.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/Closeable.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/Closeable.java new file mode 100644 index 0000000..2f00c5d --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/Closeable.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis; + +public interface Closeable { + void close(boolean failed); +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java index 5446f3f..3a208a6 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java @@ -1065,6 +1065,19 @@ public interface ActiveMQBuffer extends DataInput { */ void writeBytes(ByteBuffer src); + + /** + * Transfers the specified source buffer's data to this buffer starting at + * the current {@code writerIndex} until the source buffer's position + * reaches its limit, and increases the {@code writerIndex} by the + * number of the transferred bytes. + * + * @param src The source buffer + * @throws IndexOutOfBoundsException if {@code src.remaining()} is greater than + * {@code this.writableBytes} + */ + void writeBytes(ByteBuf src, int srcIndex, int length); + /** * Returns a copy of this buffer's readable bytes. Modifying the content * of the returned buffer or this buffer does not affect each other at all. http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java index 32f9279..25fcfea 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.api.core; import java.nio.ByteBuffer; +import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; @@ -76,6 +77,20 @@ public final class ActiveMQBuffers { } /** + * Creates an ActiveMQBuffer wrapping an underlying ByteBuf + * + * The position on this buffer won't affect the position on the inner buffer + * + * @param underlying the underlying NIO ByteBuffer + * @return an ActiveMQBuffer wrapping the underlying NIO ByteBuffer + */ + public static ActiveMQBuffer wrappedBuffer(final ByteBuf underlying) { + ActiveMQBuffer buff = new ChannelBufferWrapper(underlying.duplicate()); + + return buff; + } + + /** * Creates an ActiveMQBuffer wrapping an underlying byte array * * @param underlying the underlying byte array http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java index b7f70c6..e8530e6 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java @@ -20,6 +20,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.utils.DataConstants; /** @@ -134,6 +135,39 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl } + public static SimpleString readNullableSimpleString(ByteBuf buffer) { + int b = buffer.readByte(); + if (b == DataConstants.NULL) { + return null; + } + return readSimpleString(buffer); + } + + + public static SimpleString readSimpleString(ByteBuf buffer) { + int len = buffer.readInt(); + byte[] data = new byte[len]; + buffer.readBytes(data); + return new SimpleString(data); + } + + public static void writeNullableSimpleString(ByteBuf buffer, SimpleString val) { + if (val == null) { + buffer.writeByte(DataConstants.NULL); + } else { + buffer.writeByte(DataConstants.NOT_NULL); + writeSimpleString(buffer, val); + } + } + + public static void writeSimpleString(ByteBuf buffer, SimpleString val) { + byte[] data = val.getData(); + buffer.writeInt(data.length); + buffer.writeBytes(data); + } + + + public SimpleString subSeq(final int start, final int end) { int len = data.length >> 1; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java index 690dbd7..92314e2 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java @@ -66,11 +66,7 @@ public class ChannelBufferWrapper implements ActiveMQBuffer { @Override public SimpleString readNullableSimpleString() { - int b = buffer.readByte(); - if (b == DataConstants.NULL) { - return null; - } - return readSimpleStringInternal(); + return SimpleString.readNullableSimpleString(buffer); } @Override @@ -84,14 +80,7 @@ public class ChannelBufferWrapper implements ActiveMQBuffer { @Override public SimpleString readSimpleString() { - return readSimpleStringInternal(); - } - - private SimpleString readSimpleStringInternal() { - int len = buffer.readInt(); - byte[] data = new byte[len]; - buffer.readBytes(data); - return new SimpleString(data); + return SimpleString.readSimpleString(buffer); } @Override @@ -111,11 +100,22 @@ public class ChannelBufferWrapper implements ActiveMQBuffer { } else if (len < 0xfff) { return readUTF(); } else { - return readSimpleStringInternal().toString(); + return SimpleString.readSimpleString(buffer).toString(); + } } @Override + public void writeNullableString(String val) { + UTF8Util.writeNullableString(buffer, val); + } + + @Override + public void writeUTF(String utf) { + UTF8Util.saveUTF(buffer, utf); + } + + @Override public String readUTF() { return UTF8Util.readUTF(this); } @@ -127,62 +127,17 @@ public class ChannelBufferWrapper implements ActiveMQBuffer { @Override public void writeNullableSimpleString(final SimpleString val) { - if (val == null) { - buffer.writeByte(DataConstants.NULL); - } else { - buffer.writeByte(DataConstants.NOT_NULL); - writeSimpleStringInternal(val); - } - } - - @Override - public void writeNullableString(final String val) { - if (val == null) { - buffer.writeByte(DataConstants.NULL); - } else { - buffer.writeByte(DataConstants.NOT_NULL); - writeStringInternal(val); - } + SimpleString.writeNullableSimpleString(buffer, val); } @Override public void writeSimpleString(final SimpleString val) { - writeSimpleStringInternal(val); - } - - private void writeSimpleStringInternal(final SimpleString val) { - byte[] data = val.getData(); - buffer.writeInt(data.length); - buffer.writeBytes(data); + SimpleString.writeSimpleString(buffer, val); } @Override public void writeString(final String val) { - writeStringInternal(val); - } - - private void writeStringInternal(final String val) { - int length = val.length(); - - buffer.writeInt(length); - - if (length < 9) { - // If very small it's more performant to store char by char - for (int i = 0; i < val.length(); i++) { - buffer.writeShort((short) val.charAt(i)); - } - } else if (length < 0xfff) { - // Store as UTF - this is quicker than char by char for most strings - writeUTF(val); - } else { - // Store as SimpleString, since can't store utf > 0xffff in length - writeSimpleStringInternal(new SimpleString(val)); - } - } - - @Override - public void writeUTF(final String utf) { - UTF8Util.saveUTF(this, utf); + UTF8Util.writeString(buffer, val); } @Override @@ -576,6 +531,11 @@ public class ChannelBufferWrapper implements ActiveMQBuffer { } @Override + public void writeBytes(ByteBuf src, int srcIndex, int length) { + buffer.writeBytes(src, srcIndex, length); + } + + @Override public void writeBytes(final ActiveMQBuffer src, final int srcIndex, final int length) { buffer.writeBytes(src.byteBuf(), srcIndex, length); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java new file mode 100644 index 0000000..fd68a77 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.persistence; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; + +public interface Persister<T extends Object> { + + int getEncodeSize(T record); + + void encode(ActiveMQBuffer buffer, T record); + + T decode(ActiveMQBuffer buffer, T record); + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java index bee8790..e70891d 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java @@ -101,6 +101,14 @@ public class ByteUtil { } public static String bytesToHex(byte[] bytes, int groupSize) { + if (bytes == null) { + return "NULL"; + } + + if (bytes.length == 0) { + return "[]"; + } + char[] hexChars = new char[bytes.length * 2 + numberOfGroups(bytes, groupSize)]; int outPos = 0; for (int j = 0; j < bytes.length; j++) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java index 56cec48..fda135b 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java @@ -24,7 +24,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.logs.ActiveMQUtilBundle; @@ -47,7 +47,6 @@ import static org.apache.activemq.artemis.utils.DataConstants.STRING; * This implementation follows section 3.5.4 of the <i>Java Message Service</i> specification * (Version 1.1 April 12, 2002). * <p> - * TODO - should have typed property getters and do conversions herein */ public final class TypedProperties { @@ -62,6 +61,13 @@ public final class TypedProperties { public TypedProperties() { } + /** + * Return the number of properites + * */ + public int size() { + return properties.size(); + } + public int getMemoryOffset() { // The estimate is basically the encode size + 2 object references for each entry in the map // Note we don't include the attributes or anything else since they already included in the memory estimate @@ -321,7 +327,7 @@ public final class TypedProperties { } } - public synchronized void decode(final ActiveMQBuffer buffer) { + public synchronized void decode(final ByteBuf buffer) { byte b = buffer.readByte(); if (b == DataConstants.NULL) { @@ -406,7 +412,7 @@ public final class TypedProperties { } } - public synchronized void encode(final ActiveMQBuffer buffer) { + public synchronized void encode(final ByteBuf buffer) { if (properties == null) { buffer.writeByte(DataConstants.NULL); } else { @@ -547,7 +553,7 @@ public final class TypedProperties { abstract Object getValue(); - abstract void write(ActiveMQBuffer buffer); + abstract void write(ByteBuf buffer); abstract int encodeSize(); @@ -568,7 +574,7 @@ public final class TypedProperties { } @Override - public void write(final ActiveMQBuffer buffer) { + public void write(final ByteBuf buffer) { buffer.writeByte(DataConstants.NULL); } @@ -587,7 +593,7 @@ public final class TypedProperties { this.val = val; } - private BooleanValue(final ActiveMQBuffer buffer) { + private BooleanValue(final ByteBuf buffer) { val = buffer.readBoolean(); } @@ -597,7 +603,7 @@ public final class TypedProperties { } @Override - public void write(final ActiveMQBuffer buffer) { + public void write(final ByteBuf buffer) { buffer.writeByte(DataConstants.BOOLEAN); buffer.writeBoolean(val); } @@ -617,7 +623,7 @@ public final class TypedProperties { this.val = val; } - private ByteValue(final ActiveMQBuffer buffer) { + private ByteValue(final ByteBuf buffer) { val = buffer.readByte(); } @@ -627,7 +633,7 @@ public final class TypedProperties { } @Override - public void write(final ActiveMQBuffer buffer) { + public void write(final ByteBuf buffer) { buffer.writeByte(DataConstants.BYTE); buffer.writeByte(val); } @@ -646,7 +652,7 @@ public final class TypedProperties { this.val = val; } - private BytesValue(final ActiveMQBuffer buffer) { + private BytesValue(final ByteBuf buffer) { int len = buffer.readInt(); val = new byte[len]; buffer.readBytes(val); @@ -658,7 +664,7 @@ public final class TypedProperties { } @Override - public void write(final ActiveMQBuffer buffer) { + public void write(final ByteBuf buffer) { buffer.writeByte(DataConstants.BYTES); buffer.writeInt(val.length); buffer.writeBytes(val); @@ -679,7 +685,7 @@ public final class TypedProperties { this.val = val; } - private ShortValue(final ActiveMQBuffer buffer) { + private ShortValue(final ByteBuf buffer) { val = buffer.readShort(); } @@ -689,7 +695,7 @@ public final class TypedProperties { } @Override - public void write(final ActiveMQBuffer buffer) { + public void write(final ByteBuf buffer) { buffer.writeByte(DataConstants.SHORT); buffer.writeShort(val); } @@ -708,7 +714,7 @@ public final class TypedProperties { this.val = val; } - private IntValue(final ActiveMQBuffer buffer) { + private IntValue(final ByteBuf buffer) { val = buffer.readInt(); } @@ -718,7 +724,7 @@ public final class TypedProperties { } @Override - public void write(final ActiveMQBuffer buffer) { + public void write(final ByteBuf buffer) { buffer.writeByte(DataConstants.INT); buffer.writeInt(val); } @@ -737,7 +743,7 @@ public final class TypedProperties { this.val = val; } - private LongValue(final ActiveMQBuffer buffer) { + private LongValue(final ByteBuf buffer) { val = buffer.readLong(); } @@ -747,7 +753,7 @@ public final class TypedProperties { } @Override - public void write(final ActiveMQBuffer buffer) { + public void write(final ByteBuf buffer) { buffer.writeByte(DataConstants.LONG); buffer.writeLong(val); } @@ -766,7 +772,7 @@ public final class TypedProperties { this.val = val; } - private FloatValue(final ActiveMQBuffer buffer) { + private FloatValue(final ByteBuf buffer) { val = Float.intBitsToFloat(buffer.readInt()); } @@ -776,7 +782,7 @@ public final class TypedProperties { } @Override - public void write(final ActiveMQBuffer buffer) { + public void write(final ByteBuf buffer) { buffer.writeByte(DataConstants.FLOAT); buffer.writeInt(Float.floatToIntBits(val)); } @@ -796,7 +802,7 @@ public final class TypedProperties { this.val = val; } - private DoubleValue(final ActiveMQBuffer buffer) { + private DoubleValue(final ByteBuf buffer) { val = Double.longBitsToDouble(buffer.readLong()); } @@ -806,7 +812,7 @@ public final class TypedProperties { } @Override - public void write(final ActiveMQBuffer buffer) { + public void write(final ByteBuf buffer) { buffer.writeByte(DataConstants.DOUBLE); buffer.writeLong(Double.doubleToLongBits(val)); } @@ -825,7 +831,7 @@ public final class TypedProperties { this.val = val; } - private CharValue(final ActiveMQBuffer buffer) { + private CharValue(final ByteBuf buffer) { val = (char) buffer.readShort(); } @@ -835,7 +841,7 @@ public final class TypedProperties { } @Override - public void write(final ActiveMQBuffer buffer) { + public void write(final ByteBuf buffer) { buffer.writeByte(DataConstants.CHAR); buffer.writeShort((short) val); } @@ -854,8 +860,8 @@ public final class TypedProperties { this.val = val; } - private StringValue(final ActiveMQBuffer buffer) { - val = buffer.readSimpleString(); + private StringValue(final ByteBuf buffer) { + val = SimpleString.readSimpleString(buffer); } @Override @@ -864,9 +870,9 @@ public final class TypedProperties { } @Override - public void write(final ActiveMQBuffer buffer) { + public void write(final ByteBuf buffer) { buffer.writeByte(DataConstants.STRING); - buffer.writeSimpleString(val); + SimpleString.writeSimpleString(buffer, val); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java index e75395b..84e1557 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java @@ -18,7 +18,9 @@ package org.apache.activemq.artemis.utils; import java.lang.ref.SoftReference; +import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.logs.ActiveMQUtilBundle; import org.apache.activemq.artemis.logs.ActiveMQUtilLogger; @@ -29,15 +31,43 @@ import org.apache.activemq.artemis.logs.ActiveMQUtilLogger; */ public final class UTF8Util { + + private static final boolean isTrace = ActiveMQUtilLogger.LOGGER.isTraceEnabled(); + + private static final ThreadLocal<SoftReference<StringUtilBuffer>> currenBuffer = new ThreadLocal<>(); + private UTF8Util() { // utility class } + public static void writeNullableString(ByteBuf buffer, final String val) { + if (val == null) { + buffer.writeByte(DataConstants.NULL); + } else { + buffer.writeByte(DataConstants.NOT_NULL); + writeString(buffer, val); + } + } - private static final boolean isTrace = ActiveMQUtilLogger.LOGGER.isTraceEnabled(); + public static void writeString(final ByteBuf buffer, final String val) { + int length = val.length(); - private static final ThreadLocal<SoftReference<StringUtilBuffer>> currenBuffer = new ThreadLocal<>(); + buffer.writeInt(length); + + if (length < 9) { + // If very small it's more performant to store char by char + for (int i = 0; i < val.length(); i++) { + buffer.writeShort((short) val.charAt(i)); + } + } else if (length < 0xfff) { + // Store as UTF - this is quicker than char by char for most strings + saveUTF(buffer, val); + } else { + // Store as SimpleString, since can't store utf > 0xffff in length + SimpleString.writeSimpleString(buffer, new SimpleString(val)); + } + } - public static void saveUTF(final ActiveMQBuffer out, final String str) { + public static void saveUTF(final ByteBuf out, final String str) { StringUtilBuffer buffer = UTF8Util.getThreadLocalBuffer(); if (str.length() > 0xffff) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java index 8013e96..cb6c8fe 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java @@ -187,12 +187,12 @@ public class TypedPropertiesTest { props.putSimpleStringProperty(keyToRemove, RandomUtil.randomSimpleString()); ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(1024); - props.encode(buffer); + props.encode(buffer.byteBuf()); Assert.assertEquals(props.getEncodeSize(), buffer.writerIndex()); TypedProperties decodedProps = new TypedProperties(); - decodedProps.decode(buffer); + decodedProps.decode(buffer.byteBuf()); TypedPropertiesTest.assertEqualsTypeProperties(props, decodedProps); @@ -200,7 +200,7 @@ public class TypedPropertiesTest { // After removing a property, you should still be able to encode the Property props.removeProperty(keyToRemove); - props.encode(buffer); + props.encode(buffer.byteBuf()); Assert.assertEquals(props.getEncodeSize(), buffer.writerIndex()); } @@ -210,12 +210,12 @@ public class TypedPropertiesTest { TypedProperties emptyProps = new TypedProperties(); ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(1024); - emptyProps.encode(buffer); + emptyProps.encode(buffer.byteBuf()); Assert.assertEquals(props.getEncodeSize(), buffer.writerIndex()); TypedProperties decodedProps = new TypedProperties(); - decodedProps.decode(buffer); + decodedProps.decode(buffer.byteBuf()); TypedPropertiesTest.assertEqualsTypeProperties(emptyProps, decodedProps); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 38ec105..c0d9db6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -262,12 +262,6 @@ public final class ActiveMQDefaultConfiguration { // The minimal number of data files before we can start compacting private static int DEFAULT_JOURNAL_COMPACT_MIN_FILES = 10; - // XXX Only meant to be used by project developers - private static int DEFAULT_JOURNAL_PERF_BLAST_PAGES = -1; - - // XXX Only meant to be used by project developers - private static boolean DEFAULT_RUN_SYNC_SPEED_TEST = false; - // Interval to log server specific information (e.g. memory usage etc) private static long DEFAULT_SERVER_DUMP_INTERVAL = -1; @@ -801,20 +795,6 @@ public final class ActiveMQDefaultConfiguration { } /** - * XXX Only meant to be used by project developers - */ - public static int getDefaultJournalPerfBlastPages() { - return DEFAULT_JOURNAL_PERF_BLAST_PAGES; - } - - /** - * XXX Only meant to be used by project developers - */ - public static boolean isDefaultRunSyncSpeedTest() { - return DEFAULT_RUN_SYNC_SPEED_TEST; - } - - /** * Interval to log server specific information (e.g. memory usage etc) */ public static long getDefaultServerDumpInterval() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java new file mode 100644 index 0000000..779470e --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.api.core; + +import java.io.InputStream; +import java.util.Map; + +import org.apache.activemq.artemis.core.message.LargeBodyEncoder; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; + +/** + * This interface is only to determine the API of methods required for Core Messages + */ +public interface ICoreMessage extends Message { + + LargeBodyEncoder getBodyEncoder() throws ActiveMQException; + + int getHeadersAndPropertiesEncodeSize(); + + @Override + InputStream getBodyInputStream(); + + /** Returns a new Buffer slicing the current Body. */ + ActiveMQBuffer getReadOnlyBodyBuffer(); + + /** Return the type of the message */ + @Override + byte getType(); + + /** the type of the message */ + @Override + CoreMessage setType(byte type); + + /** + * We are really interested if this is a LargeServerMessage. + * @return + */ + boolean isServerMessage(); + + /** + * The body used for this message. + * @return + */ + @Override + ActiveMQBuffer getBodyBuffer(); + + int getEndOfBodyPosition(); + + + /** Used on large messages treatment */ + void copyHeadersAndProperties(final Message msg); + + /** + * @return Returns the message in Map form, useful when encoding to JSON + */ + @Override + default Map<String, Object> toMap() { + Map map = toPropertyMap(); + map.put("messageID", getMessageID()); + Object userID = getUserID(); + if (getUserID() != null) { + map.put("userID", "ID:" + userID.toString()); + } + + map.put("address", getAddress()); + map.put("type", getType()); + map.put("durable", isDurable()); + map.put("expiration", getExpiration()); + map.put("timestamp", getTimestamp()); + map.put("priority", (int)getPriority()); + + return map; + } + +}
