This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.15.x by this push:
new 614e55d [AMQ-7068] Advisory messages are empty when received with a
AMQP subscription (#312)
614e55d is described below
commit 614e55d04e8f3715a80a4027b622498a3f58f7ec
Author: Johannes Bäurle <[email protected]>
AuthorDate: Sun Nov 17 17:40:15 2019 +0100
[AMQ-7068] Advisory messages are empty when received with a AMQP
subscription (#312)
(cherry picked from commit 6f338aa2817c221c16b6a97b7d3377daeaf42726)
---
.../message/JMSMappingOutboundTransformer.java | 48 ++++++++++++-
.../message/JMSMappingOutboundTransformerTest.java | 80 ++++++++++++++++++++++
2 files changed, 127 insertions(+), 1 deletion(-)
diff --git
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
index ffe9ccc..67d0344 100644
---
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
+++
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
@@ -52,6 +52,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import javax.jms.JMSException;
@@ -67,7 +68,11 @@ import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.CommandTypes;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.transport.amqp.AmqpProtocolException;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.TypeConversionSupport;
@@ -333,6 +338,15 @@ public class JMSMappingOutboundTransformer implements
OutboundTransformer {
apMap = new HashMap<>();
}
apMap.put(key, value);
+
+ int messageType = message.getDataStructureType();
+ if (messageType == CommandTypes.ACTIVEMQ_MESSAGE) {
+ // Type of command to recognize advisory message
+ Object data = message.getDataStructure();
+ if(data != null) {
+ apMap.put("ActiveMqDataStructureType",
data.getClass().getSimpleName());
+ }
+ }
}
final AmqpWritableBuffer buffer = new AmqpWritableBuffer();
@@ -376,7 +390,39 @@ public class JMSMappingOutboundTransformer implements
OutboundTransformer {
int messageType = message.getDataStructureType();
- if (messageType == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) {
+ if (messageType == CommandTypes.ACTIVEMQ_MESSAGE) {
+ Object data = message.getDataStructure();
+ if (data instanceof ConnectionInfo) {
+ ConnectionInfo connectionInfo = (ConnectionInfo)data;
+ final HashMap<String, Object> connectionMap = new
LinkedHashMap<String, Object>();
+
+ connectionMap.put("ConnectionId",
connectionInfo.getConnectionId().getValue());
+ connectionMap.put("ClientId",
connectionInfo.getClientId());
+ connectionMap.put("ClientIp",
connectionInfo.getClientIp());
+ connectionMap.put("UserName",
connectionInfo.getUserName());
+ connectionMap.put("BrokerMasterConnector",
connectionInfo.isBrokerMasterConnector());
+ connectionMap.put("Manageable",
connectionInfo.isManageable());
+ connectionMap.put("ClientMaster",
connectionInfo.isClientMaster());
+ connectionMap.put("FaultTolerant",
connectionInfo.isFaultTolerant());
+ connectionMap.put("FailoverReconnect",
connectionInfo.isFailoverReconnect());
+
+ body = new AmqpValue(connectionMap);
+ } else if (data instanceof RemoveInfo) {
+ RemoveInfo removeInfo =
(RemoveInfo)message.getDataStructure();
+ final HashMap<String, Object> removeMap = new
LinkedHashMap<String, Object>();
+
+ if (removeInfo.isConnectionRemove()) {
+ removeMap.put(ConnectionId.class.getSimpleName(),
((ConnectionId)removeInfo.getObjectId()).getValue());
+ } else if (removeInfo.isConsumerRemove()) {
+ removeMap.put(ConsumerId.class.getSimpleName(),
((ConsumerId)removeInfo.getObjectId()).getValue());
+ removeMap.put("SessionId",
((ConsumerId)removeInfo.getObjectId()).getSessionId());
+ removeMap.put("ConnectionId",
((ConsumerId)removeInfo.getObjectId()).getConnectionId());
+ removeMap.put("ParentId",
((ConsumerId)removeInfo.getObjectId()).getParentId());
+ }
+
+ body = new AmqpValue(removeMap);
+ }
+ } else if (messageType == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) {
Binary payload = getBinaryFromMessageBody((ActiveMQBytesMessage)
message);
if (payload == null) {
diff --git
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
index e9da261..1d3adea 100644
---
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
+++
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
@@ -39,6 +39,7 @@ import java.io.ObjectInputStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -57,6 +58,9 @@ import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
@@ -771,6 +775,82 @@ public class JMSMappingOutboundTransformerTest {
String contents = new String(data.getArray(), data.getArrayOffset(),
data.getLength(), StandardCharsets.UTF_8);
assertEquals(contentString, contents);
}
+
+ @Test
+ public void testConvertConnectionInfo() throws Exception {
+ String connectionId = "myConnectionId";
+ String clientId = "myClientId";
+
+ ConnectionInfo dataStructure = new ConnectionInfo();
+ dataStructure.setConnectionId(new ConnectionId(connectionId));
+ dataStructure.setClientId(clientId);
+
+ ActiveMQMessage outbound = createMessage();
+ Map<String, String> properties = new HashMap<String, String>();
+ properties.put("originUrl", "localhost");
+ outbound.setProperties(properties);
+ outbound.setDataStructure(dataStructure);
+ outbound.onSend();
+ outbound.storeContent();
+
+ JMSMappingOutboundTransformer transformer = new
JMSMappingOutboundTransformer();
+
+ EncodedMessage encoded = transformer.transform(outbound);
+ assertNotNull(encoded);
+
+ Message amqp = encoded.decode();
+
+ assertNotNull(amqp.getApplicationProperties());
+
+ Map<String, Object> apMap = amqp.getApplicationProperties().getValue();
+ assertEquals(ConnectionInfo.class.getSimpleName(),
apMap.get("ActiveMqDataStructureType"));
+
+ assertNotNull(amqp.getBody());
+ assertTrue(amqp.getBody() instanceof AmqpValue);
+ assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
+
+ @SuppressWarnings("unchecked")
+ Map<Object, Object> amqpMap = (Map<Object, Object>) ((AmqpValue)
amqp.getBody()).getValue();
+
+ assertTrue(connectionId.equals(amqpMap.get("ConnectionId")));
+ assertTrue(clientId.equals(amqpMap.get("ClientId")));
+ }
+
+ @Test
+ public void testConvertRemoveInfo() throws Exception {
+ String connectionId = "myConnectionId";
+
+ RemoveInfo dataStructure = new RemoveInfo(new
ConnectionId(connectionId));
+
+ ActiveMQMessage outbound = createMessage();
+ Map<String, String> properties = new HashMap<String, String>();
+ properties.put("originUrl", "localhost");
+ outbound.setProperties(properties);
+ outbound.setDataStructure(dataStructure);
+ outbound.onSend();
+ outbound.storeContent();
+
+ JMSMappingOutboundTransformer transformer = new
JMSMappingOutboundTransformer();
+
+ EncodedMessage encoded = transformer.transform(outbound);
+ assertNotNull(encoded);
+
+ Message amqp = encoded.decode();
+
+ assertNotNull(amqp.getApplicationProperties());
+
+ Map<String, Object> apMap = amqp.getApplicationProperties().getValue();
+ assertEquals(RemoveInfo.class.getSimpleName(),
apMap.get("ActiveMqDataStructureType"));
+
+ assertNotNull(amqp.getBody());
+ assertTrue(amqp.getBody() instanceof AmqpValue);
+ assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
+
+ @SuppressWarnings("unchecked")
+ Map<Object, Object> amqpMap = (Map<Object, Object>) ((AmqpValue)
amqp.getBody()).getValue();
+
+ assertTrue(connectionId.equals(amqpMap.get("ConnectionId")));
+ }
//----- Test JMSDestination Handling
-------------------------------------//