This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 95bcfae ARTEMIS-2262: Correlate management response messages with the
request
new 7fceaeb This closes #2568
95bcfae is described below
commit 95bcfaeb702263411eebf10b0e98cf754dfd3cc7
Author: Keith Wall <[email protected]>
AuthorDate: Tue Feb 26 19:28:19 2019 +0000
ARTEMIS-2262: Correlate management response messages with the request
---
.../apache/activemq/artemis/api/core/Message.java | 9 +++
.../artemis/core/message/impl/CoreMessage.java | 11 ++++
.../artemis/protocol/amqp/broker/AMQPMessage.java | 15 +++++
.../protocol/amqp/converter/AmqpCoreConverter.java | 9 ++-
.../protocol/amqp/converter/CoreAmqpConverter.java | 14 +++--
.../amqp/converter/jms/ServerJMSMessage.java | 21 ++++---
.../management/impl/ManagementServiceImpl.java | 26 ++++++++
.../tests/integration/amqp/AmqpManagementTest.java | 69 ++++++++++++++++++++++
.../management/ManagementServiceImplTest.java | 46 +++++++++++++++
9 files changed, 207 insertions(+), 13 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 98e7f80..e701482 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -266,6 +266,15 @@ public interface Message {
return this;
}
+ default Object getCorrelationID() {
+ return null;
+ }
+
+ default Message setCorrelationID(Object correlationID) {
+
+ return this;
+ }
+
SimpleString getReplyTo();
Message setReplyTo(SimpleString address);
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 9fdd05b..06304bc 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -297,6 +297,17 @@ public class CoreMessage extends RefCountMessage
implements ICoreMessage {
return this.putIntProperty(Message.HDR_GROUP_SEQUENCE, sequence);
}
+ @Override
+ public Object getCorrelationID() {
+ return getObjectProperty(MessageUtil.CORRELATIONID_HEADER_NAME);
+ }
+
+ @Override
+ public Message setCorrelationID(final Object correlationID) {
+ putObjectProperty(MessageUtil.CORRELATIONID_HEADER_NAME, correlationID);
+ return this;
+ }
+
/**
* @param sendBuffer
* @param deliveryCount Some protocols (AMQP) will have this as part of the
message. ignored on core
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 9c95574..66e1d24 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -1148,6 +1148,21 @@ public class AMQPMessage extends RefCountMessage {
}
@Override
+ public Object getCorrelationID() {
+ return properties != null ? properties.getCorrelationId() : null;
+ }
+
+ @Override
+ public org.apache.activemq.artemis.api.core.Message setCorrelationID(final
Object correlationID) {
+ if (properties == null) {
+ properties = new Properties();
+ }
+
+ properties.setCorrelationId(correlationID);
+ return this;
+ }
+
+ @Override
public Long getScheduledDeliveryTime() {
if (scheduledTime < 0) {
Object objscheduledTime =
getMessageAnnotation(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME);
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
index 1fcd9ab..17ec6a2 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
@@ -325,8 +325,13 @@ public class AmqpCoreConverter {
if (properties.getReplyTo() != null) {
jms.setJMSReplyTo(new ServerDestination(properties.getReplyTo()));
}
- if (properties.getCorrelationId() != null) {
-
jms.setJMSCorrelationID(AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(properties.getCorrelationId()));
+ Object correlationID = properties.getCorrelationId();
+ if (correlationID != null) {
+ try {
+
jms.getInnerMessage().setCorrelationID(AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(correlationID));
+ } catch (IllegalArgumentException e) {
+
jms.getInnerMessage().setCorrelationID(String.valueOf(correlationID));
+ }
}
if (properties.getContentType() != null) {
jms.setStringProperty(JMS_AMQP_CONTENT_TYPE,
properties.getContentType().toString());
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
index 3bad75e..af85c06 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
@@ -72,6 +72,7 @@ import javax.jms.Topic;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
import
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
@@ -177,14 +178,19 @@ public class CoreAmqpConverter {
properties.setReplyTo(toAddress(replyTo));
maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo));
}
- String correlationId = message.getJMSCorrelationID();
- if (correlationId != null) {
+
+ Object correlationID = message.getInnerMessage().getCorrelationID();
+ if (correlationID instanceof String || correlationID instanceof
SimpleString) {
+ String c = correlationID instanceof String ? ((String) correlationID)
: ((SimpleString) correlationID).toString();
try {
-
properties.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId));
+
properties.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(c));
} catch (ActiveMQAMQPIllegalStateException e) {
- properties.setCorrelationId(correlationId);
+ properties.setCorrelationId(correlationID);
}
+ } else {
+ properties.setCorrelationId(correlationID);
}
+
long expiration = message.getJMSExpiration();
if (expiration != 0) {
long ttl = expiration - System.currentTimeMillis();
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
index abc324e..2ca589a 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
@@ -24,7 +24,6 @@ import java.util.Collections;
import java.util.Enumeration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
@@ -118,21 +117,29 @@ public class ServerJMSMessage implements Message {
@Override
public final void setJMSCorrelationIDAsBytes(byte[] correlationID) throws
JMSException {
- try {
- MessageUtil.setJMSCorrelationIDAsBytes(message, correlationID);
- } catch (ActiveMQException e) {
- throw new JMSException(e.getMessage());
+ if (correlationID == null || correlationID.length == 0) {
+ throw new JMSException("Please specify a non-zero length byte[]");
}
+ message.setCorrelationID(correlationID);
}
@Override
public final String getJMSCorrelationID() throws JMSException {
- return MessageUtil.getJMSCorrelationID(message);
+
+ Object correlationID = message.getCorrelationID();
+ if (correlationID instanceof String) {
+
+ return ((String) correlationID);
+ } else if (correlationID != null) {
+ return String.valueOf(correlationID);
+ } else {
+ return null;
+ }
}
@Override
public final void setJMSCorrelationID(String correlationID) throws
JMSException {
- MessageUtil.setJMSCorrelationID(message, correlationID);
+ message.setCorrelationID(correlationID);
}
@Override
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index b0e3a84..0cb0f19 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.server.management.impl;
+import static
org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID;
+
import javax.management.InstanceNotFoundException;
import javax.management.MBeanRegistrationException;
import javax.management.MBeanServer;
@@ -390,6 +392,11 @@ public class ManagementServiceImpl implements
ManagementService {
reply.setType(Message.TEXT_TYPE);
reply.setReplyTo(message.getReplyTo());
+ Object correlationID = getCorrelationIdentity(message);
+ if (correlationID != null) {
+ reply.setCorrelationID(correlationID);
+ }
+
String resourceName =
message.getStringProperty(ManagementHelper.HDR_RESOURCE_NAME);
if (logger.isDebugEnabled()) {
logger.debug("handling management message for " + resourceName);
@@ -781,5 +788,24 @@ public class ManagementServiceImpl implements
ManagementService {
return result;
}
+ /**
+ * Correlate management responses using the Correlation ID Pattern, if the
request supplied a correlation id,
+ * or fallback to the Message ID Pattern providing the request had a
message id.
+
+ * @param request
+ * @return correlation identify
+ */
+ private Object getCorrelationIdentity(final Message request) {
+ Object correlationId = request.getCorrelationID();
+ if (correlationId == null) {
+ // CoreMessage#getUserId returns UUID, so to implement this part a
alternative API that returned object. This part of the
+ // change is a nice to have for my point of view. I suggested it for
completeness. The application could
+ // always supply unique correl ids on the request and achieve the
same effect. I'd be happy to drop this part.
+ Object underlying = request.getUserID() != null ? request.getUserID()
: request.getStringProperty(NATIVE_MESSAGE_ID);
+ correlationId = underlying == null ? null :
String.valueOf(underlying);
+ }
+ return correlationId;
+ }
+
// Inner classes -------------------------------------------------
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpManagementTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpManagementTest.java
index 98b18e8..181dfa7 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpManagementTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpManagementTest.java
@@ -16,7 +16,9 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
+import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
@@ -27,6 +29,8 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
+
+import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.UnsignedLong;
@@ -39,6 +43,8 @@ import static
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSup
public class AmqpManagementTest extends AmqpClientTestSupport {
+ private static final Binary BINARY_CORRELATION_ID = new
Binary("mystring".getBytes(StandardCharsets.UTF_8));
+
@Test(timeout = 60000)
public void testManagementQueryOverAMQP() throws Throwable {
AmqpClient client = createAmqpClient();
@@ -101,4 +107,67 @@ public class AmqpManagementTest extends
AmqpClientTestSupport {
msg = createMapMessage(1, map, null);
assertEquals(msg.getByte("sequence"), sequence);
}
+
+ @Test(timeout = 60000)
+ public void testCorrelationByMessageIDUUID() throws Throwable {
+ doTestReplyCorrelation(UUID.randomUUID(), false);
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationByMessageIDString() throws Throwable {
+ doTestReplyCorrelation("mystring", false);
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationByMessageIDBinary() throws Throwable {
+ doTestReplyCorrelation(BINARY_CORRELATION_ID, false);
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationByCorrelationIDUUID() throws Throwable {
+ doTestReplyCorrelation(UUID.randomUUID(), true);
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationByCorrelationIDString() throws Throwable {
+ doTestReplyCorrelation("mystring", true);
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationByCorrelationIDBinary() throws Throwable {
+ doTestReplyCorrelation(BINARY_CORRELATION_ID, true);
+ }
+
+ private void doTestReplyCorrelation(final Object correlationId, final
boolean sendCorrelAsCorrelation) throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+
+ try {
+ String destinationAddress = getQueueName(1);
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender("activemq.management");
+ AmqpReceiver receiver = session.createReceiver(destinationAddress);
+ receiver.flow(10);
+
+ // Create request message for getQueueNames query
+ AmqpMessage request = new AmqpMessage();
+ request.setApplicationProperty("_AMQ_ResourceName",
ResourceNames.BROKER);
+ request.setApplicationProperty("_AMQ_OperationName", "getQueueNames");
+ request.setReplyToAddress(destinationAddress);
+ if (sendCorrelAsCorrelation) {
+ request.setRawCorrelationId(correlationId);
+ } else {
+ request.setRawMessageId(correlationId);
+ }
+ request.setText("[]");
+
+ sender.send(request);
+ AmqpMessage response = receiver.receive(5, TimeUnit.SECONDS);
+ Assert.assertNotNull(response);
+ Assert.assertEquals(correlationId, response.getRawCorrelationId());
+ response.accept();
+ } finally {
+ connection.close();
+ }
+ }
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementServiceImplTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementServiceImplTest.java
index 151341f..3647db7 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementServiceImplTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementServiceImplTest.java
@@ -31,10 +31,13 @@ import
org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import
org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl;
+import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.tests.integration.server.FakeStorageManager;
import org.apache.activemq.artemis.tests.unit.core.postoffice.impl.FakeQueue;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.UUID;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.junit.Assert;
import org.junit.Test;
@@ -153,6 +156,49 @@ public class ManagementServiceImplTest extends
ActiveMQTestBase {
Assert.assertEquals(queue.getName().toString(), queueControl.getName());
}
+ @Test
+ public void testCorrelateResponseByCorrelationID() throws Exception {
+ String queue = RandomUtil.randomString();
+ String address = RandomUtil.randomString();
+ String correlationID = UUIDGenerator.getInstance().generateStringUUID();
+
+ Configuration config =
createBasicConfig().setJMXManagementEnabled(false);
+
+ ActiveMQServer server =
addServer(ActiveMQServers.newActiveMQServer(config, false));
+ server.start();
+
+ // invoke attribute and operation on the server
+ CoreMessage message = new CoreMessage(1, 100);
+ MessageUtil.setJMSCorrelationID(message, correlationID);
+ ManagementHelper.putOperationInvocation(message, ResourceNames.BROKER,
"createQueue", queue, address);
+
+ Message reply = server.getManagementService().handleMessage(message);
+ Assert.assertTrue(ManagementHelper.hasOperationSucceeded(reply));
+ Assert.assertEquals(correlationID,
MessageUtil.getJMSCorrelationID(reply));
+ }
+
+ @Test
+ public void testCorrelateResponseByMessageID() throws Exception {
+ String queue = RandomUtil.randomString();
+ String address = RandomUtil.randomString();
+ UUID messageId = UUIDGenerator.getInstance().generateUUID();
+
+ Configuration config =
createBasicConfig().setJMXManagementEnabled(false);
+
+ ActiveMQServer server =
addServer(ActiveMQServers.newActiveMQServer(config, false));
+ server.start();
+
+ // invoke attribute and operation on the server
+ CoreMessage message = new CoreMessage(1, 100);
+ message.setUserID(messageId);
+ ManagementHelper.putOperationInvocation(message, ResourceNames.BROKER,
"createQueue", queue, address);
+
+ Message reply = server.getManagementService().handleMessage(message);
+ Assert.assertTrue(ManagementHelper.hasOperationSucceeded(reply));
+ Assert.assertEquals(messageId.toString(),
MessageUtil.getJMSCorrelationID(reply));
+ }
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------