This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit a57c48ec556158a9f28c8fc9c318a81b7295b24e Author: Justin Bertram <jbert...@apache.org> AuthorDate: Wed May 10 12:08:53 2023 -0500 ARTEMIS-4275 add ID to consumer notifications --- .../core/server/impl/ServerConsumerImpl.java | 2 + .../core/server/impl/ServerSessionImpl.java | 2 + ...ificationTest.java => JMSNotificationTest.java} | 51 ++++++++++++++++------ 3 files changed, 41 insertions(+), 14 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 42295aaa8b..b720baf64c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -615,6 +615,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { props.putSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID, SimpleString.toSimpleString(session.getRemotingConnection().getClientID())); } + props.putLongProperty(ManagementHelper.HDR_CONSUMER_NAME, getID()); + Notification notification = new Notification(null, CoreNotificationType.CONSUMER_CLOSED, props); managementService.sendNotification(notification); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 1984dd8099..3284654049 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -624,6 +624,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { props.putSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID, SimpleString.toSimpleString(remotingConnection.getClientID())); } + props.putLongProperty(ManagementHelper.HDR_CONSUMER_NAME, consumer.getID()); + Notification notification = new Notification(null, CoreNotificationType.CONSUMER_CREATED, props); if (logger.isDebugEnabled()) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSClientIdNotificationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSNotificationTest.java similarity index 66% rename from tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSClientIdNotificationTest.java rename to tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSNotificationTest.java index 713483613e..32e897ff21 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSClientIdNotificationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSNotificationTest.java @@ -39,8 +39,9 @@ import org.junit.Before; import org.junit.Test; import static org.apache.activemq.artemis.api.core.management.ManagementHelper.HDR_CLIENT_ID; +import static org.apache.activemq.artemis.api.core.management.ManagementHelper.HDR_CONSUMER_NAME; -public class JMSClientIdNotificationTest extends MultiprotocolJMSClientTestSupport { +public class JMSNotificationTest extends MultiprotocolJMSClientTestSupport { private ClientConsumer notificationConsumer; private String clientID; @@ -76,20 +77,20 @@ public class JMSClientIdNotificationTest extends MultiprotocolJMSClientTestSuppo @Test(timeout = 30000) public void testConsumerNotificationAMQP() throws Exception { - testConsumerNotification(createConnection(getBrokerQpidJMSConnectionURI(), null, null, clientID, true)); + testConsumerNotifications(createConnection(getBrokerQpidJMSConnectionURI(), null, null, clientID, true)); } @Test(timeout = 30000) public void testConsumerNotificationCore() throws Exception { - testConsumerNotification(createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, clientID, true)); + testConsumerNotifications(createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, clientID, true)); } @Test(timeout = 30000) public void testConsumerNotificationOpenWire() throws Exception { - testConsumerNotification(createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), null, null, clientID, true)); + testConsumerNotifications(createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), null, null, clientID, true)); } - private void testConsumerNotification(Connection connection) throws Exception { + private void testConsumerNotifications(Connection connection) throws Exception { final String subscriptionName = "mySub"; try { flush(); @@ -97,16 +98,32 @@ public class JMSClientIdNotificationTest extends MultiprotocolJMSClientTestSuppo Topic topic = session.createTopic(getTopicName()); flush(); MessageConsumer consumer = session.createDurableSubscriber(topic, subscriptionName); - notificationConsumer.receiveImmediate(); // clear the BINDING_ADDED notification for the subscription queue - validateClientIdOnNotification(CoreNotificationType.CONSUMER_CREATED); + Message m = receiveNotification(CoreNotificationType.CONSUMER_CREATED, notificationConsumer); + validateClientIdOnNotification(m, CoreNotificationType.CONSUMER_CREATED); + String consumerID = validatePropertyOnNotification(m, CoreNotificationType.CONSUMER_CREATED, HDR_CONSUMER_NAME, null, false); consumer.close(); - validateClientIdOnNotification(CoreNotificationType.CONSUMER_CLOSED); + m = receiveNotification(CoreNotificationType.CONSUMER_CLOSED, notificationConsumer); + validateClientIdOnNotification(m, CoreNotificationType.CONSUMER_CLOSED); + validatePropertyOnNotification(m, CoreNotificationType.CONSUMER_CLOSED, HDR_CONSUMER_NAME, consumerID, true); session.unsubscribe(subscriptionName); } finally { connection.close(); } } + ClientMessage receiveNotification(CoreNotificationType notificationType, ClientConsumer consumer) throws Exception { + for (;;) { + ClientMessage message = consumer.receive(1000); + if (message == null) { + return null; + } + String receivedType = message.getStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE); + if (String.valueOf(receivedType).equals(notificationType.toString())) { + return message; + } + } + } + @Test(timeout = 30000) public void testSessionNotificationAMQP() throws Exception { testSessionNotification(createConnection(getBrokerQpidJMSConnectionURI(), null, null, clientID, true)); @@ -126,19 +143,25 @@ public class JMSClientIdNotificationTest extends MultiprotocolJMSClientTestSuppo try { flush(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - validateClientIdOnNotification(CoreNotificationType.SESSION_CREATED); + validateClientIdOnNotification(notificationConsumer.receive(1000), CoreNotificationType.SESSION_CREATED); session.close(); - validateClientIdOnNotification(CoreNotificationType.SESSION_CLOSED); + validateClientIdOnNotification(notificationConsumer.receive(1000), CoreNotificationType.SESSION_CLOSED); } finally { connection.close(); } } - private void validateClientIdOnNotification(CoreNotificationType notificationType) throws ActiveMQException { - Message m = notificationConsumer.receive(1000); + private void validateClientIdOnNotification(Message m, CoreNotificationType notificationType) { + validatePropertyOnNotification(m, notificationType, HDR_CLIENT_ID, clientID, true); + } + + private String validatePropertyOnNotification(Message m, CoreNotificationType notificationType, SimpleString propertyName, String propertyValue, boolean checkValue) { assertNotNull(m); assertEquals(notificationType.toString(), m.getStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE)); - assertTrue(m.getPropertyNames().contains(HDR_CLIENT_ID)); - assertEquals(clientID, m.getStringProperty(HDR_CLIENT_ID)); + assertTrue(m.getPropertyNames().contains(propertyName)); + if (checkValue) { + assertEquals(propertyValue, m.getStringProperty(propertyName)); + } + return m.getStringProperty(propertyName); } }