Repository: activemq-artemis Updated Branches: refs/heads/master 532317cef -> 2c8b6b4ae
ARTEMIS-1928 Fixing body conversion of LargeMessages to AMQP Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/efd966d8 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/efd966d8 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/efd966d8 Branch: refs/heads/master Commit: efd966d88d1a3ff4612c29729c8c717f075de943 Parents: 532317c Author: Clebert Suconic <[email protected]> Authored: Fri Jun 22 15:37:42 2018 -0400 Committer: Clebert Suconic <[email protected]> Committed: Fri Jun 22 15:48:15 2018 -0400 ---------------------------------------------------------------------- .../amqp/converter/CoreAmqpConverter.java | 9 ++--- .../ProtocolsMessageLoadBalancingTest.java | 37 ++++++++++++++++++-- 2 files changed, 39 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/efd966d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java index 66c75a8..49372db 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java @@ -60,6 +60,7 @@ import javax.jms.TemporaryTopic; import javax.jms.TextMessage; import javax.jms.Topic; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; @@ -381,17 +382,17 @@ public class CoreAmqpConverter { // will be unknown so we check for special cases of messages with special data // encoded into the server message body. ICoreMessage internalMessage = message.getInnerMessage(); - int readerIndex = internalMessage.getBodyBuffer().readerIndex(); + + // this will represent a readOnly buffer for the message + ActiveMQBuffer buffer = internalMessage.getDataBuffer(); try { - Object s = internalMessage.getBodyBuffer().readNullableSimpleString(); + Object s = buffer.readNullableSimpleString(); if (s != null) { body = new AmqpValue(s.toString()); } } catch (Throwable ignored) { logger.debug("Exception ignored during conversion", ignored.getMessage(), ignored); body = new AmqpValue("Conversion to AMQP error!"); - } finally { - internalMessage.getBodyBuffer().readerIndex(readerIndex); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/efd966d8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java index ae41392..ae4f013 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java @@ -29,6 +29,12 @@ import java.util.Collection; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManagerFactory; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -392,9 +398,34 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase { waitForBindings(1, "queues.0", 1, 1, false); - // sending Messages.. they should be load balanced - { - ConnectionFactory cf = getJmsConnectionFactory(0); + + if (protocol.equals("AMQP")) { + + + ServerLocator locator = ActiveMQClient.createServerLocator("tcp://localhost:61616"); + locator.setMinLargeMessageSize(1024); + ClientSessionFactory coreFactory = locator.createSessionFactory(); + ClientSession clientSession = coreFactory.createSession(); + ClientProducer producer = clientSession.createProducer(queueName); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + ClientMessage message = clientSession.createMessage((byte)0, true); + StringBuffer stringbuffer = new StringBuffer(); + stringbuffer.append("hello"); + if (i % 3 == 0) { + // making 1/3 of the messages to be large message + for (int j = 0; j < 10 * 1024; j++) { + stringbuffer.append(" "); + } + } + message.getBodyBuffer().writeUTF(stringbuffer.toString()); + producer.send(message); + } + coreFactory.close(); + + } else { + + // sending Messages.. they should be load balanced + ConnectionFactory cf = getJmsConnectionFactory(0); Connection cn = cf.createConnection(); Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer pd = sn.createProducer(sn.createQueue(queueName.toString()));
