This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new 9fe24cd8e AMQ-7309 - Implement Message#isBodyAssignableTo and
Message#getBody methods (#979)
9fe24cd8e is described below
commit 9fe24cd8e3f67a7249afa32b01fc46807c5d8997
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Thu Feb 16 18:03:14 2023 -0500
AMQ-7309 - Implement Message#isBodyAssignableTo and Message#getBody methods
(#979)
AMQ-7309 - Implement Message#isBodyAssignableTo and Message#getBody
methods
---
.../java/org/apache/activemq/ActiveMQProducer.java | 28 +--
.../activemq/command/ActiveMQBytesMessage.java | 17 ++
.../activemq/command/ActiveMQMapMessage.java | 42 ++++-
.../apache/activemq/command/ActiveMQMessage.java | 24 ++-
.../activemq/command/ActiveMQObjectMessage.java | 31 +++-
.../activemq/command/ActiveMQStreamMessage.java | 5 +
.../activemq/command/ActiveMQTextMessage.java | 17 ++
.../org/apache/activemq/util/ByteSequence.java | 12 ++
.../activemq/jms2/ActiveMQJMS2MessageTest.java | 196 +++++++++++++++++++++
9 files changed, 332 insertions(+), 40 deletions(-)
diff --git
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java
index 7a6c621a9..2596bf2aa 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java
@@ -108,33 +108,7 @@ public class ActiveMQProducer implements JMSProducer {
if (body != null) {
try {
for (Map.Entry<String, Object> mapEntry : body.entrySet()) {
- final String key = mapEntry.getKey();
- final Object value = mapEntry.getValue();
- final Class<?> valueObject = value.getClass();
- if (String.class.isAssignableFrom(valueObject)) {
- mapMessage.setString(key, String.class.cast(value));
- } else if (Integer.class.isAssignableFrom(valueObject)) {
- mapMessage.setInt(key, Integer.class.cast(value));
- } else if (Long.class.isAssignableFrom(valueObject)) {
- mapMessage.setLong(key, Long.class.cast(value));
- } else if (Double.class.isAssignableFrom(valueObject)) {
- mapMessage.setDouble(key, Double.class.cast(value));
- } else if (Boolean.class.isAssignableFrom(valueObject)) {
- mapMessage.setBoolean(key, Boolean.class.cast(value));
- } else if (Character.class.isAssignableFrom(valueObject)) {
- mapMessage.setChar(key, Character.class.cast(value));
- } else if (Short.class.isAssignableFrom(valueObject)) {
- mapMessage.setShort(key, Short.class.cast(value));
- } else if (Float.class.isAssignableFrom(valueObject)) {
- mapMessage.setFloat(key, Float.class.cast(value));
- } else if (Byte.class.isAssignableFrom(valueObject)) {
- mapMessage.setByte(key, Byte.class.cast(value));
- } else if (byte[].class.isAssignableFrom(valueObject)) {
- byte[] array = byte[].class.cast(value);
- mapMessage.setBytes(key, array, 0, array.length);
- } else {
- mapMessage.setObject(key, value);
- }
+ mapMessage.setObject(mapEntry.getKey(),
mapEntry.getValue());
}
} catch (JMSException e) {
throw new MessageFormatRuntimeException(e.getMessage());
diff --git
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
index 14028d732..2800050b6 100644
---
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
+++
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
@@ -956,4 +956,21 @@ public class ActiveMQBytesMessage extends ActiveMQMessage
implements BytesMessag
}
}
}
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public boolean isBodyAssignableTo(Class c) {
+ return getContent() == null || c.isAssignableFrom(byte[].class);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected <T> T doGetBody(Class<T> asType) {
+ //Make sure the bytes are stored before trying to copy and return
+ if (dataOut != null && getContent() == null) {
+ storeContent();
+ }
+
+ final ByteSequence content = getContent();
+ return content != null ? (T) content.toArray() : null;
+ }
}
diff --git
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
index 97edf3e9c..5619e87e8 100644
---
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
+++
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectStreamException;
import java.io.OutputStream;
+import java.io.Serializable;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
@@ -174,9 +175,16 @@ public class ActiveMQMapMessage extends ActiveMQMessage
implements MapMessage {
* @throws IOException
*/
private void loadContent() throws JMSException {
+ if (getContent() != null && map.isEmpty()) {
+ map = deserialize(getContent());
+ }
+ }
+
+ private Map<String, Object> deserialize(ByteSequence content) throws
JMSException {
+ final Map<String, Object> map;
+
try {
- if (getContent() != null && map.isEmpty()) {
- ByteSequence content = getContent();
+ if (content != null) {
InputStream is = new ByteArrayInputStream(content);
if (isCompressed()) {
is = new InflaterInputStream(is);
@@ -184,10 +192,14 @@ public class ActiveMQMapMessage extends ActiveMQMessage
implements MapMessage {
DataInputStream dataIn = new DataInputStream(is);
map = MarshallingSupport.unmarshalPrimitiveMap(dataIn);
dataIn.close();
+ } else {
+ map = new HashMap<>();
}
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
+
+ return map;
}
@Override
@@ -827,4 +839,30 @@ public class ActiveMQMapMessage extends ActiveMQMessage
implements MapMessage {
initializeReading();
return map;
}
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public boolean isBodyAssignableTo(Class c) throws JMSException {
+ final Map<String, Object> map = getContentMap();
+ if (map == null || map.isEmpty()) {
+ return true;
+ }
+ return c.isAssignableFrom(java.util.Map.class);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected <T> T doGetBody(Class<T> asType) throws JMSException {
+ storeContent();
+ final ByteSequence content = getContent();
+ final Map<String, Object> map = content != null ? deserialize(content)
: null;
+
+ //This implementation treats an empty map as not having a body so if
empty
+ //we should return null as well
+ if (map != null && !map.isEmpty()) {
+ map.replaceAll((k, v) -> v instanceof UTF8Buffer ? v.toString() :
v);
+ return (T) map;
+ } else {
+ return null;
+ }
+ }
}
diff --git
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
index 1dd28eebc..49144b6a8 100644
---
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
+++
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
@@ -795,14 +795,22 @@ public class ActiveMQMessage extends Message implements
org.apache.activemq.Mess
this.deliveryTime = deliveryTime;
}
- @Override
- public <T> T getBody(Class<T> c) throws JMSException {
- throw new UnsupportedOperationException("getBody(Class<T>) is not
supported");
- }
+ @Override
+ public final <T> T getBody(Class<T> asType) throws JMSException {
+ if (isBodyAssignableTo(asType)) {
+ return doGetBody(asType);
+ }
- @Override
- public boolean isBodyAssignableTo(Class c) throws JMSException {
- throw new UnsupportedOperationException("isBodyAssignableTo(Class) is
not supported");
- }
+ throw new MessageFormatException("Message body cannot be read as type:
" + asType);
+ }
+
+ @Override
+ public boolean isBodyAssignableTo(Class c) throws JMSException {
+ return true;
+ }
+
+ protected <T> T doGetBody(Class<T> asType) throws JMSException {
+ return null;
+ }
}
diff --git
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
index 621fd1fdc..41b98b511 100644
---
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
+++
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
@@ -194,9 +194,18 @@ public class ActiveMQObjectMessage extends ActiveMQMessage
implements ObjectMess
*/
@Override
public Serializable getObject() throws JMSException {
- if (object == null && getContent() != null) {
+ final ByteSequence content = getContent();
+ if (object == null && content != null) {
+ this.object = deserialize(content);
+ }
+ return this.object;
+ }
+
+ private Serializable deserialize(ByteSequence content) throws JMSException
{
+ Serializable object = null;
+
+ if (content != null) {
try {
- ByteSequence content = getContent();
InputStream is = new ByteArrayInputStream(content);
if (isCompressed()) {
is = new InflaterInputStream(is);
@@ -216,7 +225,7 @@ public class ActiveMQObjectMessage extends ActiveMQMessage
implements ObjectMess
throw JMSExceptionSupport.create("Failed to build body from
bytes. Reason: " + e, e);
}
}
- return this.object;
+ return object;
}
@Override
@@ -277,4 +286,20 @@ public class ActiveMQObjectMessage extends ActiveMQMessage
implements ObjectMess
trustedPackages =
Arrays.asList(ClassLoadingAwareObjectInputStream.serializablePackages);
trustAllPackages = false;
}
+
+ @Override
+ public boolean isBodyAssignableTo(Class c) throws JMSException {
+ final Serializable object = getObject();
+ if (object == null) {
+ return true;
+ }
+ return Serializable.class == c || Object.class == c ||
c.isInstance(object);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected <T> T doGetBody(Class<T> asType) throws JMSException {
+ storeContent();
+ final ByteSequence content = getContent();
+ return content != null ? (T) deserialize(content) : null;
+ }
}
diff --git
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
index 074ba8590..db6694434 100644
---
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
+++
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
@@ -1218,4 +1218,9 @@ public class ActiveMQStreamMessage extends
ActiveMQMessage implements StreamMess
public String toString() {
return super.toString() + " ActiveMQStreamMessage{ " + "bytesOut = " +
bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
}
+
+ @Override
+ public boolean isBodyAssignableTo(Class c) {
+ return false;
+ }
}
diff --git
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
index c70f54fa6..5b57c9935 100644
---
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
+++
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
@@ -210,4 +210,21 @@ public class ActiveMQTextMessage extends ActiveMQMessage
implements TextMessage
}
return super.toString();
}
+
+ @SuppressWarnings("unchecked")
+ public boolean isBodyAssignableTo(Class c) throws JMSException {
+ /*
+ * If null the JMS spec says this method always returns true
+ * regardless of the passed in class type.
+ */
+ if (getText() == null) {
+ return true;
+ }
+ return c.isAssignableFrom(java.lang.String.class);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected <T> T doGetBody(Class<T> asType) throws JMSException {
+ return (T) getText();
+ }
}
diff --git
a/activemq-client/src/main/java/org/apache/activemq/util/ByteSequence.java
b/activemq-client/src/main/java/org/apache/activemq/util/ByteSequence.java
index 5a19f4349..da4dd2b64 100644
--- a/activemq-client/src/main/java/org/apache/activemq/util/ByteSequence.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/ByteSequence.java
@@ -17,6 +17,8 @@
package org.apache.activemq.util;
+import java.util.Arrays;
+
public class ByteSequence {
public byte[] data;
@@ -126,4 +128,14 @@ public class ByteSequence {
}
return true;
}
+
+ /**
+ * Makes a deep copy of the data into a new byte array
+ * starting at the offset.
+ *
+ * @return
+ */
+ public byte[] toArray() {
+ return Arrays.copyOfRange(getData(), getOffset(), getLength());
+ }
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2MessageTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2MessageTest.java
new file mode 100644
index 000000000..ac7355cdb
--- /dev/null
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2MessageTest.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms2;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Serializable;
+import java.util.Map;
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageFormatException;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.util.ByteSequence;
+import org.junit.Test;
+
+public class ActiveMQJMS2MessageTest {
+
+ @Test
+ public void testMessageIsAssignableTo() throws JMSException {
+ Message message = new ActiveMQMessage();
+ assertTrue(message.isBodyAssignableTo(String.class));
+ assertTrue(message.isBodyAssignableTo(Integer.class));
+ assertTrue(message.isBodyAssignableTo(Object.class));
+ }
+
+ @Test
+ public void testMessageGetBody() throws JMSException {
+ Message message = new ActiveMQMessage();
+ assertNull(message.getBody(String.class));
+ assertNull(message.getBody(Object.class));
+ }
+
+ @Test
+ public void testStringMessageIsAssignableTo() throws JMSException {
+ TextMessage nullBody = new ActiveMQTextMessage();
+ assertTrue(nullBody.isBodyAssignableTo(String.class));
+ //Spec says type is ignored and returns true if null body
+ assertTrue(nullBody.isBodyAssignableTo(Integer.class));
+
+ TextMessage message = new ActiveMQTextMessage();
+ message.setText("Test message");
+ assertTrue(message.isBodyAssignableTo(String.class));
+ assertFalse(message.isBodyAssignableTo(Integer.class));
+ }
+
+ @Test
+ public void testStringMessageGetBody() throws JMSException {
+ TextMessage nullMessage = new ActiveMQTextMessage();
+ assertNull(nullMessage.getBody(String.class));
+
+ TextMessage message = new ActiveMQTextMessage();
+ message.setText("Test message");
+ assertEquals("Test message", message.getBody(String.class));
+ }
+
+ @Test(expected = MessageFormatException.class)
+ public void testStringMessageGetBodyWrongType() throws JMSException {
+ TextMessage message = new ActiveMQTextMessage();
+ message.setText("Test message");
+ message.getBody(Integer.class);
+ }
+
+ @Test
+ public void testByteMessageIsAssignableTo() throws JMSException {
+ BytesMessage nullBody = new ActiveMQBytesMessage();
+ //Spec says type is ignored and returns true if null body
+ assertTrue(nullBody.isBodyAssignableTo(String.class));
+ assertTrue(nullBody.isBodyAssignableTo(Integer.class));
+
+ ByteSequence testBytes = new ByteSequence("test".getBytes());
+ ActiveMQBytesMessage message = new ActiveMQBytesMessage();
+ message.setContent(testBytes);
+ assertArrayEquals(testBytes.getData(), message.getBody(byte[].class));
+ }
+
+ @Test
+ public void testByteMessageGetBody() throws JMSException {
+ BytesMessage nullMessage = new ActiveMQBytesMessage();
+ assertNull(nullMessage.getBody(String.class));
+
+ ByteSequence testBytes = new ByteSequence("test".getBytes());
+ ActiveMQBytesMessage message = new ActiveMQBytesMessage();
+ message.setContent(testBytes);
+ assertArrayEquals(testBytes.getData(), message.getBody(byte[].class));
+ }
+
+ @Test(expected = MessageFormatException.class)
+ public void testByteMessageGetBodyWrongType() throws JMSException {
+ ByteSequence testBytes = new ByteSequence("test".getBytes());
+ ActiveMQBytesMessage message = new ActiveMQBytesMessage();
+ message.setContent(testBytes);
+ message.getBody(Integer.class);
+ }
+
+ @Test
+ public void testObjectMessageIsAssignableTo() throws JMSException {
+ ObjectMessage nullBody = new ActiveMQObjectMessage();
+ //Spec says type is ignored and returns true if null body
+ assertTrue(nullBody.isBodyAssignableTo(String.class));
+ assertTrue(nullBody.isBodyAssignableTo(Object.class));
+ assertTrue(nullBody.isBodyAssignableTo(Integer.class));
+
+ ObjectMessage message = new ActiveMQObjectMessage();
+ message.setObject("Test message");
+ assertTrue(message.isBodyAssignableTo(String.class));
+ assertFalse(message.isBodyAssignableTo(Integer.class));
+ }
+
+ @Test
+ public void testObjectMessageGetBody() throws JMSException {
+ ObjectMessage nullMessage = new ActiveMQObjectMessage();
+ assertNull(nullMessage.getBody(String.class));
+
+ ObjectMessage message = new ActiveMQObjectMessage();
+ message.setObject("Test message");
+ assertEquals("Test message", message.getBody(Serializable.class));
+ assertEquals("Test message", message.getBody(String.class));
+ }
+
+ @Test(expected = MessageFormatException.class)
+ public void testObjectMessageGetBodyWrongType() throws JMSException {
+ ObjectMessage message = new ActiveMQObjectMessage();
+ message.setObject("Test message");
+ message.getBody(Integer.class);
+ }
+
+ @Test
+ public void testMapMessageIsAssignableTo() throws JMSException {
+ MapMessage nullBody = new ActiveMQMapMessage();
+ //Spec says type is ignored and returns true if null body
+ assertTrue(nullBody.isBodyAssignableTo(String.class));
+ assertTrue(nullBody.isBodyAssignableTo(Integer.class));
+ assertTrue(nullBody.isBodyAssignableTo(Map.class));
+ }
+
+ @Test
+ public void testMapMessageGetBody() throws JMSException {
+ MapMessage nullMessage = new ActiveMQMapMessage();
+ assertNull(nullMessage.getBody(Map.class));
+
+ MapMessage message = new ActiveMQMapMessage();
+ message.setString("testkey", "testvalue");
+ assertEquals("testvalue", message.getBody(Map.class).get("testkey"));
+ }
+
+ @Test(expected = MessageFormatException.class)
+ public void testMapMessageGetBodyWrongType() throws JMSException {
+ MapMessage message = new ActiveMQMapMessage();
+ message.setString("testkey", "testvalue");
+ message.getBody(String.class);
+ }
+
+ @Test
+ public void testStreamMessageIsAssignableTo() throws JMSException {
+ StreamMessage nullBody = new ActiveMQStreamMessage();
+ //Spec says always false
+ assertFalse(nullBody.isBodyAssignableTo(String.class));
+ assertFalse(nullBody.isBodyAssignableTo(Integer.class));
+ assertFalse(nullBody.isBodyAssignableTo(Map.class));
+ }
+
+ @Test(expected = MessageFormatException.class)
+ public void testStreamMessageGetBody() throws JMSException {
+ StreamMessage message = new ActiveMQStreamMessage();
+ //spec says always throws exception
+ message.getBody(Object.class);
+ }
+}