This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.15.x by this push:
     new 614e55d  [AMQ-7068] Advisory messages are empty when received with a 
AMQP subscription (#312)
614e55d is described below

commit 614e55d04e8f3715a80a4027b622498a3f58f7ec
Author: Johannes Bäurle <[email protected]>
AuthorDate: Sun Nov 17 17:40:15 2019 +0100

    [AMQ-7068] Advisory messages are empty when received with a AMQP 
subscription (#312)
    
    (cherry picked from commit 6f338aa2817c221c16b6a97b7d3377daeaf42726)
---
 .../message/JMSMappingOutboundTransformer.java     | 48 ++++++++++++-
 .../message/JMSMappingOutboundTransformerTest.java | 80 ++++++++++++++++++++++
 2 files changed, 127 insertions(+), 1 deletion(-)

diff --git 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
index ffe9ccc..67d0344 100644
--- 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
+++ 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
@@ -52,6 +52,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 import javax.jms.JMSException;
@@ -67,7 +68,11 @@ import org.apache.activemq.command.ActiveMQObjectMessage;
 import org.apache.activemq.command.ActiveMQStreamMessage;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.CommandTypes;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.transport.amqp.AmqpProtocolException;
 import org.apache.activemq.util.JMSExceptionSupport;
 import org.apache.activemq.util.TypeConversionSupport;
@@ -333,6 +338,15 @@ public class JMSMappingOutboundTransformer implements 
OutboundTransformer {
                 apMap = new HashMap<>();
             }
             apMap.put(key, value);
+
+            int messageType = message.getDataStructureType();
+            if (messageType == CommandTypes.ACTIVEMQ_MESSAGE) {
+                // Type of command to recognize advisory message
+                Object data = message.getDataStructure();
+                if(data != null) {
+                    apMap.put("ActiveMqDataStructureType", 
data.getClass().getSimpleName());
+                }
+            }
         }
 
         final AmqpWritableBuffer buffer = new AmqpWritableBuffer();
@@ -376,7 +390,39 @@ public class JMSMappingOutboundTransformer implements 
OutboundTransformer {
 
         int messageType = message.getDataStructureType();
 
-        if (messageType == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) {
+        if (messageType == CommandTypes.ACTIVEMQ_MESSAGE) {
+               Object data = message.getDataStructure();
+            if (data instanceof ConnectionInfo) {
+                       ConnectionInfo connectionInfo = (ConnectionInfo)data;
+                       final HashMap<String, Object> connectionMap = new 
LinkedHashMap<String, Object>();
+                       
+                       connectionMap.put("ConnectionId", 
connectionInfo.getConnectionId().getValue());
+                       connectionMap.put("ClientId", 
connectionInfo.getClientId());
+                       connectionMap.put("ClientIp", 
connectionInfo.getClientIp());
+                       connectionMap.put("UserName", 
connectionInfo.getUserName());
+                       connectionMap.put("BrokerMasterConnector", 
connectionInfo.isBrokerMasterConnector());
+                       connectionMap.put("Manageable", 
connectionInfo.isManageable());
+                       connectionMap.put("ClientMaster", 
connectionInfo.isClientMaster());
+                       connectionMap.put("FaultTolerant", 
connectionInfo.isFaultTolerant());
+                       connectionMap.put("FailoverReconnect", 
connectionInfo.isFailoverReconnect());
+                       
+                       body = new AmqpValue(connectionMap);
+            } else if (data instanceof RemoveInfo) {
+                       RemoveInfo removeInfo = 
(RemoveInfo)message.getDataStructure();
+                       final HashMap<String, Object> removeMap = new 
LinkedHashMap<String, Object>();
+                       
+               if (removeInfo.isConnectionRemove()) {
+                       removeMap.put(ConnectionId.class.getSimpleName(), 
((ConnectionId)removeInfo.getObjectId()).getValue());
+               } else if (removeInfo.isConsumerRemove()) {
+                       removeMap.put(ConsumerId.class.getSimpleName(), 
((ConsumerId)removeInfo.getObjectId()).getValue());
+                       removeMap.put("SessionId", 
((ConsumerId)removeInfo.getObjectId()).getSessionId());
+                       removeMap.put("ConnectionId", 
((ConsumerId)removeInfo.getObjectId()).getConnectionId());
+                       removeMap.put("ParentId", 
((ConsumerId)removeInfo.getObjectId()).getParentId());
+               }
+               
+               body = new AmqpValue(removeMap);
+            }
+        } else if (messageType == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) {
             Binary payload = getBinaryFromMessageBody((ActiveMQBytesMessage) 
message);
 
             if (payload == null) {
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
index e9da261..1d3adea 100644
--- 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
@@ -39,6 +39,7 @@ import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -57,6 +58,9 @@ import org.apache.activemq.command.ActiveMQTempQueue;
 import org.apache.activemq.command.ActiveMQTempTopic;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
@@ -771,6 +775,82 @@ public class JMSMappingOutboundTransformerTest {
         String contents = new String(data.getArray(), data.getArrayOffset(), 
data.getLength(), StandardCharsets.UTF_8);
         assertEquals(contentString, contents);
     }
+    
+    @Test 
+    public void testConvertConnectionInfo() throws Exception {
+       String connectionId = "myConnectionId";
+       String clientId = "myClientId";
+
+       ConnectionInfo dataStructure = new ConnectionInfo();
+       dataStructure.setConnectionId(new ConnectionId(connectionId));
+       dataStructure.setClientId(clientId);
+       
+       ActiveMQMessage outbound = createMessage();
+       Map<String, String> properties = new HashMap<String, String>();
+       properties.put("originUrl", "localhost");
+       outbound.setProperties(properties);
+       outbound.setDataStructure(dataStructure);
+       outbound.onSend();
+       outbound.storeContent();
+       
+       JMSMappingOutboundTransformer transformer = new 
JMSMappingOutboundTransformer();
+
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
+        
+        Message amqp = encoded.decode();
+
+        assertNotNull(amqp.getApplicationProperties());
+        
+        Map<String, Object> apMap = amqp.getApplicationProperties().getValue();
+        assertEquals(ConnectionInfo.class.getSimpleName(), 
apMap.get("ActiveMqDataStructureType"));
+        
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof AmqpValue);
+        assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
+
+        @SuppressWarnings("unchecked")
+        Map<Object, Object> amqpMap = (Map<Object, Object>) ((AmqpValue) 
amqp.getBody()).getValue();
+
+        assertTrue(connectionId.equals(amqpMap.get("ConnectionId")));
+        assertTrue(clientId.equals(amqpMap.get("ClientId")));
+    }
+
+    @Test 
+    public void testConvertRemoveInfo() throws Exception {
+       String connectionId = "myConnectionId";
+
+       RemoveInfo dataStructure = new RemoveInfo(new 
ConnectionId(connectionId));
+       
+       ActiveMQMessage outbound = createMessage();
+       Map<String, String> properties = new HashMap<String, String>();
+       properties.put("originUrl", "localhost");
+       outbound.setProperties(properties);
+       outbound.setDataStructure(dataStructure);
+       outbound.onSend();
+       outbound.storeContent();
+       
+       JMSMappingOutboundTransformer transformer = new 
JMSMappingOutboundTransformer();
+
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
+        
+        Message amqp = encoded.decode();
+
+        assertNotNull(amqp.getApplicationProperties());
+        
+        Map<String, Object> apMap = amqp.getApplicationProperties().getValue();
+        assertEquals(RemoveInfo.class.getSimpleName(), 
apMap.get("ActiveMqDataStructureType"));
+        
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof AmqpValue);
+        assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
+
+        @SuppressWarnings("unchecked")
+        Map<Object, Object> amqpMap = (Map<Object, Object>) ((AmqpValue) 
amqp.getBody()).getValue();
+
+        assertTrue(connectionId.equals(amqpMap.get("ConnectionId")));
+    }
 
     //----- Test JMSDestination Handling 
-------------------------------------//
 

Reply via email to