This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 8abdee29e9 ARTEMIS-4235 fix map msg conversion from OpenWire->core
8abdee29e9 is described below
commit 8abdee29e97a7c8d7c0ec6fb78380665bdc3079a
Author: Justin Bertram <[email protected]>
AuthorDate: Tue Apr 4 12:30:26 2023 -0500
ARTEMIS-4235 fix map msg conversion from OpenWire->core
---
.../openwire/OpenWireMessageConverter.java | 10 +-
.../openwire/OpenWireMessageConverterTest.java | 13 ++
.../jms/multiprotocol/JMSMessageConsumerTest.java | 144 +++++++++++++++++++++
3 files changed, 164 insertions(+), 3 deletions(-)
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index ebda1c18a9..182c7d832f 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -111,9 +111,13 @@ public final class OpenWireMessageConverter {
final ActiveMQBuffer body = coreMessage.getBodyBuffer();
final ByteSequence contents = messageSend.getContent();
- if (contents == null && coreType ==
org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
- body.writeNullableString(null);
- } else if (contents != null) {
+ if (contents == null) {
+ if (coreType ==
org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
+ body.writeNullableString(null);
+ } else if (coreType ==
org.apache.activemq.artemis.api.core.Message.MAP_TYPE) {
+ body.writeByte(DataConstants.NULL);
+ }
+ } else {
final boolean messageCompressed = messageSend.isCompressed();
if (messageCompressed) {
coreMessage.putBooleanProperty(OpenWireConstants.AMQ_MSG_COMPRESSED, true);
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
index d10de16abd..92a1bfb3c1 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.protocol.openwire;
import org.apache.activemq.ActiveMQMessageAuditNoSync;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
@@ -28,7 +29,9 @@ import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.MessageDispatch;
@@ -135,6 +138,16 @@ public class OpenWireMessageConverterTest {
}
}
+ @Test
+ public void testEmptyMapMessage() throws Exception {
+ CoreMessage artemisMessage = (CoreMessage)
OpenWireMessageConverter.inbound(new ActiveMQMapMessage().getMessage(),
openWireFormat, null);
+ assertEquals(Message.MAP_TYPE, artemisMessage.getType());
+ ActiveMQBuffer buffer = artemisMessage.getDataBuffer();
+ TypedProperties map = new TypedProperties();
+ buffer.resetReaderIndex();
+ map.decode(buffer.byteBuf());
+ }
+
@Test
public void testProducerId() throws Exception {
final String PRODUCER_ID = "123:456:789";
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java
index 570f1c2eb8..988fdc14f0 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java
@@ -19,6 +19,7 @@ package
org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
+import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -31,6 +32,7 @@ import
org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.utils.DestinationUtil;
+import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -193,4 +195,146 @@ public class JMSMessageConsumerTest extends
MultiprotocolJMSClientTestSupport {
assertEquals("color = 'BLUE'",
queue.getFilter().getFilterString().toString());
}
}
+
+ @Test(timeout = 30000)
+ public void testEmptyMapMessageConversionBetweenOpenWireAndAMQP() throws
Exception {
+ testEmptyMapMessageConversion(createOpenWireConnection(),
createConnection());
+ }
+
+ @Test(timeout = 30000)
+ public void testEmptyMapMessageConversionBetweenAMQPAndOpenWire() throws
Exception {
+ testEmptyMapMessageConversion(createConnection(),
createOpenWireConnection());
+ }
+
+ @Test(timeout = 30000)
+ public void testEmptyMapMessageConversionBetweenCoreAndAMQP() throws
Exception {
+ testEmptyMapMessageConversion(createCoreConnection(),
createConnection());
+ }
+
+ @Test(timeout = 30000)
+ public void testEmptyMapMessageConversionBetweenAMQPAndCore() throws
Exception {
+ testEmptyMapMessageConversion(createConnection(),
createCoreConnection());
+ }
+
+ @Test(timeout = 30000)
+ public void testEmptyMapMessageConversionBetweenCoreAndOpenWire() throws
Exception {
+ testEmptyMapMessageConversion(createCoreConnection(),
createOpenWireConnection());
+ }
+
+ @Test(timeout = 30000)
+ public void testEmptyMapMessageConversionBetweenOpenWireAndCore() throws
Exception {
+ testEmptyMapMessageConversion(createOpenWireConnection(),
createCoreConnection());
+ }
+
+ private void testEmptyMapMessageConversion(Connection senderConnection,
Connection consumerConnection) throws Exception {
+ try {
+ Session consumerSession = consumerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer =
consumerSession.createConsumer(consumerSession.createQueue(getQueueName()));
+
+ Session senderSession = senderConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer =
senderSession.createProducer(senderSession.createQueue(getQueueName()));
+ MapMessage message = senderSession.createMapMessage();
+ producer.send(message);
+
+ Message received = consumer.receive(1000);
+
+ assertNotNull("Should have received a message by now.", received);
+ assertTrue("Should be an instance of MapMessage", received instanceof
MapMessage);
+ } finally {
+ senderConnection.close();
+ consumerConnection.close();
+ }
+ }
+
+ @Test(timeout = 30000)
+ public void testMapMessageConversionBetweenAMQPAndOpenWire() throws
Exception {
+ testMapMessageConversion(createConnection(), createOpenWireConnection());
+ }
+
+ @Test(timeout = 30000)
+ public void testMapMessageConversionBetweenCoreAndAMQP() throws Exception {
+ testMapMessageConversion(createCoreConnection(), createConnection());
+ }
+
+ @Test(timeout = 30000)
+ public void testMapMessageConversionBetweenAMQPAndCore() throws Exception {
+ testMapMessageConversion(createConnection(), createCoreConnection());
+ }
+
+ @Test(timeout = 30000)
+ public void testMapMessageConversionBetweenCoreAndOpenWire() throws
Exception {
+ testMapMessageConversion(createCoreConnection(),
createOpenWireConnection());
+ }
+
+ @Test(timeout = 30000)
+ public void testMapMessageConversionBetweenOpenWireAndCore() throws
Exception {
+ testMapMessageConversion(createOpenWireConnection(),
createCoreConnection());
+ }
+
+ private void testMapMessageConversion(Connection senderConnection,
Connection consumerConnection) throws Exception {
+ final boolean BOOLEAN_VALUE = RandomUtil.randomBoolean();
+ final String BOOLEAN_KEY = "myBoolean";
+ final byte BYTE_VALUE = RandomUtil.randomByte();
+ final String BYTE_KEY = "myByte";
+ final byte[] BYTES_VALUE = RandomUtil.randomBytes();
+ final String BYTES_KEY = "myBytes";
+ final char CHAR_VALUE = RandomUtil.randomChar();
+ final String CHAR_KEY = "myChar";
+ final double DOUBLE_VALUE = RandomUtil.randomDouble();
+ final String DOUBLE_KEY = "myDouble";
+ final float FLOAT_VALUE = RandomUtil.randomFloat();
+ final String FLOAT_KEY = "myFloat";
+ final int INT_VALUE = RandomUtil.randomInt();
+ final String INT_KEY = "myInt";
+ final long LONG_VALUE = RandomUtil.randomLong();
+ final String LONG_KEY = "myLong";
+ final Boolean OBJECT_VALUE = RandomUtil.randomBoolean();
+ final String OBJECT_KEY = "myObject";
+ final short SHORT_VALUE = RandomUtil.randomShort();
+ final String SHORT_KEY = "myShort";
+ final String STRING_VALUE = RandomUtil.randomString();
+ final String STRING_KEY = "myString";
+
+ try {
+ Session consumerSession = consumerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer =
consumerSession.createConsumer(consumerSession.createQueue(getQueueName()));
+
+ Session senderSession = senderConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer =
senderSession.createProducer(senderSession.createQueue(getQueueName()));
+ MapMessage message =
senderSession.createMapMessage();message.setBoolean(BOOLEAN_KEY, BOOLEAN_VALUE);
+ message.setByte(BYTE_KEY, BYTE_VALUE);
+ message.setBytes(BYTES_KEY, BYTES_VALUE);
+ message.setChar(CHAR_KEY, CHAR_VALUE);
+ message.setDouble(DOUBLE_KEY, DOUBLE_VALUE);
+ message.setFloat(FLOAT_KEY, FLOAT_VALUE);
+ message.setInt(INT_KEY, INT_VALUE);
+ message.setLong(LONG_KEY, LONG_VALUE);
+ message.setObject(OBJECT_KEY, OBJECT_VALUE);
+ message.setShort(SHORT_KEY, SHORT_VALUE);
+ message.setString(STRING_KEY, STRING_VALUE);
+ producer.send(message);
+
+ Message received = consumer.receive(1000);
+
+ assertNotNull("Should have received a message by now.", received);
+ assertTrue("Should be an instance of MapMessage", received instanceof
MapMessage);
+ MapMessage receivedMapMessage = (MapMessage) received;
+
+ assertEquals(BOOLEAN_VALUE,
receivedMapMessage.getBoolean(BOOLEAN_KEY));
+ assertEquals(BYTE_VALUE, receivedMapMessage.getByte(BYTE_KEY));
+ assertEqualsByteArrays(BYTES_VALUE,
receivedMapMessage.getBytes(BYTES_KEY));
+ assertEquals(CHAR_VALUE, receivedMapMessage.getChar(CHAR_KEY));
+ assertEquals(DOUBLE_VALUE, receivedMapMessage.getDouble(DOUBLE_KEY),
0);
+ assertEquals(FLOAT_VALUE, receivedMapMessage.getFloat(FLOAT_KEY), 0);
+ assertEquals(INT_VALUE, receivedMapMessage.getInt(INT_KEY));
+ assertEquals(LONG_VALUE, receivedMapMessage.getLong(LONG_KEY));
+ assertTrue(receivedMapMessage.getObject(OBJECT_KEY) instanceof
Boolean);
+ assertEquals(OBJECT_VALUE, receivedMapMessage.getObject(OBJECT_KEY));
+ assertEquals(SHORT_VALUE, receivedMapMessage.getShort(SHORT_KEY));
+ assertEquals(STRING_VALUE, receivedMapMessage.getString(STRING_KEY));
+ } finally {
+ senderConnection.close();
+ consumerConnection.close();
+ }
+ }
}