This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit a57c48ec556158a9f28c8fc9c318a81b7295b24e
Author: Justin Bertram <jbert...@apache.org>
AuthorDate: Wed May 10 12:08:53 2023 -0500

    ARTEMIS-4275 add ID to consumer notifications
---
 .../core/server/impl/ServerConsumerImpl.java       |  2 +
 .../core/server/impl/ServerSessionImpl.java        |  2 +
 ...ificationTest.java => JMSNotificationTest.java} | 51 ++++++++++++++++------
 3 files changed, 41 insertions(+), 14 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 42295aaa8b..b720baf64c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -615,6 +615,8 @@ public class ServerConsumerImpl implements ServerConsumer, 
ReadyListener {
             props.putSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID, 
SimpleString.toSimpleString(session.getRemotingConnection().getClientID()));
          }
 
+         props.putLongProperty(ManagementHelper.HDR_CONSUMER_NAME, getID());
+
          Notification notification = new Notification(null, 
CoreNotificationType.CONSUMER_CLOSED, props);
 
          managementService.sendNotification(notification);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 1984dd8099..3284654049 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -624,6 +624,8 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
             props.putSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID, 
SimpleString.toSimpleString(remotingConnection.getClientID()));
          }
 
+         props.putLongProperty(ManagementHelper.HDR_CONSUMER_NAME, 
consumer.getID());
+
          Notification notification = new Notification(null, 
CoreNotificationType.CONSUMER_CREATED, props);
 
          if (logger.isDebugEnabled()) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSClientIdNotificationTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSNotificationTest.java
similarity index 66%
rename from 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSClientIdNotificationTest.java
rename to 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSNotificationTest.java
index 713483613e..32e897ff21 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSClientIdNotificationTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSNotificationTest.java
@@ -39,8 +39,9 @@ import org.junit.Before;
 import org.junit.Test;
 
 import static 
org.apache.activemq.artemis.api.core.management.ManagementHelper.HDR_CLIENT_ID;
+import static 
org.apache.activemq.artemis.api.core.management.ManagementHelper.HDR_CONSUMER_NAME;
 
-public class JMSClientIdNotificationTest extends 
MultiprotocolJMSClientTestSupport {
+public class JMSNotificationTest extends MultiprotocolJMSClientTestSupport {
 
    private ClientConsumer notificationConsumer;
    private String clientID;
@@ -76,20 +77,20 @@ public class JMSClientIdNotificationTest extends 
MultiprotocolJMSClientTestSuppo
 
    @Test(timeout = 30000)
    public void testConsumerNotificationAMQP() throws Exception {
-      
testConsumerNotification(createConnection(getBrokerQpidJMSConnectionURI(), 
null, null, clientID, true));
+      
testConsumerNotifications(createConnection(getBrokerQpidJMSConnectionURI(), 
null, null, clientID, true));
    }
 
    @Test(timeout = 30000)
    public void testConsumerNotificationCore() throws Exception {
-      
testConsumerNotification(createCoreConnection(getBrokerCoreJMSConnectionString(),
 null, null, clientID, true));
+      
testConsumerNotifications(createCoreConnection(getBrokerCoreJMSConnectionString(),
 null, null, clientID, true));
    }
 
    @Test(timeout = 30000)
    public void testConsumerNotificationOpenWire() throws Exception {
-      
testConsumerNotification(createOpenWireConnection(getBrokerOpenWireJMSConnectionString(),
 null, null, clientID, true));
+      
testConsumerNotifications(createOpenWireConnection(getBrokerOpenWireJMSConnectionString(),
 null, null, clientID, true));
    }
 
-   private void testConsumerNotification(Connection connection) throws 
Exception {
+   private void testConsumerNotifications(Connection connection) throws 
Exception {
       final String subscriptionName = "mySub";
       try {
          flush();
@@ -97,16 +98,32 @@ public class JMSClientIdNotificationTest extends 
MultiprotocolJMSClientTestSuppo
          Topic topic = session.createTopic(getTopicName());
          flush();
          MessageConsumer consumer = session.createDurableSubscriber(topic, 
subscriptionName);
-         notificationConsumer.receiveImmediate(); // clear the BINDING_ADDED 
notification for the subscription queue
-         validateClientIdOnNotification(CoreNotificationType.CONSUMER_CREATED);
+         Message m = 
receiveNotification(CoreNotificationType.CONSUMER_CREATED, 
notificationConsumer);
+         validateClientIdOnNotification(m, 
CoreNotificationType.CONSUMER_CREATED);
+         String consumerID = validatePropertyOnNotification(m, 
CoreNotificationType.CONSUMER_CREATED, HDR_CONSUMER_NAME, null, false);
          consumer.close();
-         validateClientIdOnNotification(CoreNotificationType.CONSUMER_CLOSED);
+         m = receiveNotification(CoreNotificationType.CONSUMER_CLOSED, 
notificationConsumer);
+         validateClientIdOnNotification(m, 
CoreNotificationType.CONSUMER_CLOSED);
+         validatePropertyOnNotification(m, 
CoreNotificationType.CONSUMER_CLOSED, HDR_CONSUMER_NAME, consumerID, true);
          session.unsubscribe(subscriptionName);
       } finally {
          connection.close();
       }
    }
 
+   ClientMessage receiveNotification(CoreNotificationType notificationType, 
ClientConsumer consumer) throws Exception {
+      for (;;) {
+         ClientMessage message = consumer.receive(1000);
+         if (message == null) {
+            return null;
+         }
+         String receivedType = 
message.getStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE);
+         if (String.valueOf(receivedType).equals(notificationType.toString())) 
{
+            return message;
+         }
+      }
+   }
+
    @Test(timeout = 30000)
    public void testSessionNotificationAMQP() throws Exception {
       
testSessionNotification(createConnection(getBrokerQpidJMSConnectionURI(), null, 
null, clientID, true));
@@ -126,19 +143,25 @@ public class JMSClientIdNotificationTest extends 
MultiprotocolJMSClientTestSuppo
       try {
          flush();
          Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-         validateClientIdOnNotification(CoreNotificationType.SESSION_CREATED);
+         validateClientIdOnNotification(notificationConsumer.receive(1000), 
CoreNotificationType.SESSION_CREATED);
          session.close();
-         validateClientIdOnNotification(CoreNotificationType.SESSION_CLOSED);
+         validateClientIdOnNotification(notificationConsumer.receive(1000), 
CoreNotificationType.SESSION_CLOSED);
       } finally {
          connection.close();
       }
    }
 
-   private void validateClientIdOnNotification(CoreNotificationType 
notificationType) throws ActiveMQException {
-      Message m = notificationConsumer.receive(1000);
+   private void validateClientIdOnNotification(Message m, CoreNotificationType 
notificationType) {
+      validatePropertyOnNotification(m, notificationType, HDR_CLIENT_ID, 
clientID, true);
+   }
+
+   private String validatePropertyOnNotification(Message m, 
CoreNotificationType notificationType, SimpleString propertyName, String 
propertyValue, boolean checkValue) {
       assertNotNull(m);
       assertEquals(notificationType.toString(), 
m.getStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE));
-      assertTrue(m.getPropertyNames().contains(HDR_CLIENT_ID));
-      assertEquals(clientID, m.getStringProperty(HDR_CLIENT_ID));
+      assertTrue(m.getPropertyNames().contains(propertyName));
+      if (checkValue) {
+         assertEquals(propertyValue, m.getStringProperty(propertyName));
+      }
+      return m.getStringProperty(propertyName);
    }
 }

Reply via email to