This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 95bcfae  ARTEMIS-2262: Correlate management response messages with the 
request
     new 7fceaeb  This closes #2568
95bcfae is described below

commit 95bcfaeb702263411eebf10b0e98cf754dfd3cc7
Author: Keith Wall <[email protected]>
AuthorDate: Tue Feb 26 19:28:19 2019 +0000

    ARTEMIS-2262: Correlate management response messages with the request
---
 .../apache/activemq/artemis/api/core/Message.java  |  9 +++
 .../artemis/core/message/impl/CoreMessage.java     | 11 ++++
 .../artemis/protocol/amqp/broker/AMQPMessage.java  | 15 +++++
 .../protocol/amqp/converter/AmqpCoreConverter.java |  9 ++-
 .../protocol/amqp/converter/CoreAmqpConverter.java | 14 +++--
 .../amqp/converter/jms/ServerJMSMessage.java       | 21 ++++---
 .../management/impl/ManagementServiceImpl.java     | 26 ++++++++
 .../tests/integration/amqp/AmqpManagementTest.java | 69 ++++++++++++++++++++++
 .../management/ManagementServiceImplTest.java      | 46 +++++++++++++++
 9 files changed, 207 insertions(+), 13 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 98e7f80..e701482 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -266,6 +266,15 @@ public interface Message {
       return this;
    }
 
+   default Object getCorrelationID() {
+      return null;
+   }
+
+   default Message setCorrelationID(Object correlationID) {
+
+      return this;
+   }
+
    SimpleString getReplyTo();
 
    Message setReplyTo(SimpleString address);
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 9fdd05b..06304bc 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -297,6 +297,17 @@ public class CoreMessage extends RefCountMessage 
implements ICoreMessage {
       return this.putIntProperty(Message.HDR_GROUP_SEQUENCE, sequence);
    }
 
+   @Override
+   public Object getCorrelationID() {
+      return getObjectProperty(MessageUtil.CORRELATIONID_HEADER_NAME);
+   }
+
+   @Override
+   public Message setCorrelationID(final Object correlationID) {
+      putObjectProperty(MessageUtil.CORRELATIONID_HEADER_NAME, correlationID);
+      return this;
+   }
+
    /**
     * @param sendBuffer
     * @param deliveryCount Some protocols (AMQP) will have this as part of the 
message. ignored on core
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 9c95574..66e1d24 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -1148,6 +1148,21 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    @Override
+   public Object getCorrelationID() {
+      return properties != null ? properties.getCorrelationId() : null;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message setCorrelationID(final 
Object correlationID) {
+      if (properties == null) {
+         properties = new Properties();
+      }
+
+      properties.setCorrelationId(correlationID);
+      return this;
+   }
+
+   @Override
    public Long getScheduledDeliveryTime() {
       if (scheduledTime < 0) {
          Object objscheduledTime = 
getMessageAnnotation(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME);
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
index 1fcd9ab..17ec6a2 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
@@ -325,8 +325,13 @@ public class AmqpCoreConverter {
          if (properties.getReplyTo() != null) {
             jms.setJMSReplyTo(new ServerDestination(properties.getReplyTo()));
          }
-         if (properties.getCorrelationId() != null) {
-            
jms.setJMSCorrelationID(AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(properties.getCorrelationId()));
+         Object correlationID = properties.getCorrelationId();
+         if (correlationID != null) {
+            try {
+               
jms.getInnerMessage().setCorrelationID(AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(correlationID));
+            } catch (IllegalArgumentException e) {
+               
jms.getInnerMessage().setCorrelationID(String.valueOf(correlationID));
+            }
          }
          if (properties.getContentType() != null) {
             jms.setStringProperty(JMS_AMQP_CONTENT_TYPE, 
properties.getContentType().toString());
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
index 3bad75e..af85c06 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
@@ -72,6 +72,7 @@ import javax.jms.Topic;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import 
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
 import 
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
@@ -177,14 +178,19 @@ public class CoreAmqpConverter {
          properties.setReplyTo(toAddress(replyTo));
          maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo));
       }
-      String correlationId = message.getJMSCorrelationID();
-      if (correlationId != null) {
+
+      Object correlationID = message.getInnerMessage().getCorrelationID();
+      if (correlationID instanceof String || correlationID instanceof 
SimpleString) {
+         String c = correlationID instanceof String ? ((String) correlationID) 
: ((SimpleString) correlationID).toString();
          try {
-            
properties.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId));
+            
properties.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(c));
          } catch (ActiveMQAMQPIllegalStateException e) {
-            properties.setCorrelationId(correlationId);
+            properties.setCorrelationId(correlationID);
          }
+      } else {
+         properties.setCorrelationId(correlationID);
       }
+
       long expiration = message.getJMSExpiration();
       if (expiration != 0) {
          long ttl = expiration - System.currentTimeMillis();
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
index abc324e..2ca589a 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
@@ -24,7 +24,6 @@ import java.util.Collections;
 import java.util.Enumeration;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
@@ -118,21 +117,29 @@ public class ServerJMSMessage implements Message {
 
    @Override
    public final void setJMSCorrelationIDAsBytes(byte[] correlationID) throws 
JMSException {
-      try {
-         MessageUtil.setJMSCorrelationIDAsBytes(message, correlationID);
-      } catch (ActiveMQException e) {
-         throw new JMSException(e.getMessage());
+      if (correlationID == null || correlationID.length == 0) {
+         throw new JMSException("Please specify a non-zero length byte[]");
       }
+      message.setCorrelationID(correlationID);
    }
 
    @Override
    public final String getJMSCorrelationID() throws JMSException {
-      return MessageUtil.getJMSCorrelationID(message);
+
+      Object correlationID = message.getCorrelationID();
+      if (correlationID instanceof String) {
+
+         return ((String) correlationID);
+      } else if (correlationID != null) {
+         return String.valueOf(correlationID);
+      } else {
+         return null;
+      }
    }
 
    @Override
    public final void setJMSCorrelationID(String correlationID) throws 
JMSException {
-      MessageUtil.setJMSCorrelationID(message, correlationID);
+      message.setCorrelationID(correlationID);
    }
 
    @Override
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index b0e3a84..0cb0f19 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.core.server.management.impl;
 
+import static 
org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID;
+
 import javax.management.InstanceNotFoundException;
 import javax.management.MBeanRegistrationException;
 import javax.management.MBeanServer;
@@ -390,6 +392,11 @@ public class ManagementServiceImpl implements 
ManagementService {
       reply.setType(Message.TEXT_TYPE);
       reply.setReplyTo(message.getReplyTo());
 
+      Object correlationID = getCorrelationIdentity(message);
+      if (correlationID != null) {
+         reply.setCorrelationID(correlationID);
+      }
+
       String resourceName = 
message.getStringProperty(ManagementHelper.HDR_RESOURCE_NAME);
       if (logger.isDebugEnabled()) {
          logger.debug("handling management message for " + resourceName);
@@ -781,5 +788,24 @@ public class ManagementServiceImpl implements 
ManagementService {
       return result;
    }
 
+   /**
+    * Correlate management responses using the Correlation ID Pattern, if the 
request supplied a correlation id,
+    * or fallback to the Message ID Pattern providing the request had a 
message id.
+
+    * @param request
+    * @return correlation identify
+    */
+   private Object getCorrelationIdentity(final Message request) {
+      Object correlationId = request.getCorrelationID();
+      if (correlationId == null) {
+         // CoreMessage#getUserId returns UUID, so to implement this part a 
alternative API that returned object. This part of the
+         // change is a nice to have for my point of view. I suggested it for 
completeness.  The application could
+         // always supply unique correl ids on the request and achieve the 
same effect.  I'd be happy to drop this part.
+         Object underlying = request.getUserID() != null ? request.getUserID() 
: request.getStringProperty(NATIVE_MESSAGE_ID);
+         correlationId = underlying == null ? null : 
String.valueOf(underlying);
+      }
+      return correlationId;
+   }
+
    // Inner classes -------------------------------------------------
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpManagementTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpManagementTest.java
index 98b18e8..181dfa7 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpManagementTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpManagementTest.java
@@ -16,7 +16,9 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
+import java.nio.charset.StandardCharsets;
 import java.util.LinkedHashMap;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
@@ -27,6 +29,8 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
+
+import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.UnsignedByte;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.apache.qpid.proton.amqp.UnsignedLong;
@@ -39,6 +43,8 @@ import static 
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSup
 
 public class AmqpManagementTest extends AmqpClientTestSupport {
 
+   private static final Binary BINARY_CORRELATION_ID = new 
Binary("mystring".getBytes(StandardCharsets.UTF_8));
+
    @Test(timeout = 60000)
    public void testManagementQueryOverAMQP() throws Throwable {
       AmqpClient client = createAmqpClient();
@@ -101,4 +107,67 @@ public class AmqpManagementTest extends 
AmqpClientTestSupport {
       msg = createMapMessage(1, map, null);
       assertEquals(msg.getByte("sequence"), sequence);
    }
+
+   @Test(timeout = 60000)
+   public void testCorrelationByMessageIDUUID() throws Throwable {
+      doTestReplyCorrelation(UUID.randomUUID(), false);
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationByMessageIDString() throws Throwable {
+      doTestReplyCorrelation("mystring", false);
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationByMessageIDBinary() throws Throwable {
+      doTestReplyCorrelation(BINARY_CORRELATION_ID, false);
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationByCorrelationIDUUID() throws Throwable {
+      doTestReplyCorrelation(UUID.randomUUID(), true);
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationByCorrelationIDString() throws Throwable {
+      doTestReplyCorrelation("mystring", true);
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationByCorrelationIDBinary() throws Throwable {
+      doTestReplyCorrelation(BINARY_CORRELATION_ID, true);
+   }
+
+   private void doTestReplyCorrelation(final Object correlationId, final 
boolean sendCorrelAsCorrelation) throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+
+      try {
+         String destinationAddress = getQueueName(1);
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender("activemq.management");
+         AmqpReceiver receiver = session.createReceiver(destinationAddress);
+         receiver.flow(10);
+
+         // Create request message for getQueueNames query
+         AmqpMessage request = new AmqpMessage();
+         request.setApplicationProperty("_AMQ_ResourceName", 
ResourceNames.BROKER);
+         request.setApplicationProperty("_AMQ_OperationName", "getQueueNames");
+         request.setReplyToAddress(destinationAddress);
+         if (sendCorrelAsCorrelation) {
+            request.setRawCorrelationId(correlationId);
+         } else {
+            request.setRawMessageId(correlationId);
+         }
+         request.setText("[]");
+
+         sender.send(request);
+         AmqpMessage response = receiver.receive(5, TimeUnit.SECONDS);
+         Assert.assertNotNull(response);
+         Assert.assertEquals(correlationId, response.getRawCorrelationId());
+         response.accept();
+      } finally {
+         connection.close();
+      }
+   }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementServiceImplTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementServiceImplTest.java
index 151341f..3647db7 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementServiceImplTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementServiceImplTest.java
@@ -31,10 +31,13 @@ import 
org.apache.activemq.artemis.core.server.ActiveMQServers;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import 
org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl;
+import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.tests.integration.server.FakeStorageManager;
 import org.apache.activemq.artemis.tests.unit.core.postoffice.impl.FakeQueue;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.UUID;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -153,6 +156,49 @@ public class ManagementServiceImplTest extends 
ActiveMQTestBase {
       Assert.assertEquals(queue.getName().toString(), queueControl.getName());
    }
 
+   @Test
+   public void testCorrelateResponseByCorrelationID() throws Exception {
+      String queue = RandomUtil.randomString();
+      String address = RandomUtil.randomString();
+      String correlationID = UUIDGenerator.getInstance().generateStringUUID();
+
+      Configuration config = 
createBasicConfig().setJMXManagementEnabled(false);
+
+      ActiveMQServer server = 
addServer(ActiveMQServers.newActiveMQServer(config, false));
+      server.start();
+
+      // invoke attribute and operation on the server
+      CoreMessage message = new CoreMessage(1, 100);
+      MessageUtil.setJMSCorrelationID(message, correlationID);
+      ManagementHelper.putOperationInvocation(message, ResourceNames.BROKER, 
"createQueue", queue, address);
+
+      Message reply = server.getManagementService().handleMessage(message);
+      Assert.assertTrue(ManagementHelper.hasOperationSucceeded(reply));
+      Assert.assertEquals(correlationID, 
MessageUtil.getJMSCorrelationID(reply));
+   }
+
+   @Test
+   public void testCorrelateResponseByMessageID() throws Exception {
+      String queue = RandomUtil.randomString();
+      String address = RandomUtil.randomString();
+      UUID messageId =  UUIDGenerator.getInstance().generateUUID();
+
+      Configuration config = 
createBasicConfig().setJMXManagementEnabled(false);
+
+      ActiveMQServer server = 
addServer(ActiveMQServers.newActiveMQServer(config, false));
+      server.start();
+
+      // invoke attribute and operation on the server
+      CoreMessage message = new CoreMessage(1, 100);
+      message.setUserID(messageId);
+      ManagementHelper.putOperationInvocation(message, ResourceNames.BROKER, 
"createQueue", queue, address);
+
+      Message reply = server.getManagementService().handleMessage(message);
+      Assert.assertTrue(ManagementHelper.hasOperationSucceeded(reply));
+      Assert.assertEquals(messageId.toString(), 
MessageUtil.getJMSCorrelationID(reply));
+   }
+
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Reply via email to