This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 3a48258f7d816f28968e581a65df994b442d01d0
Author: Justin Bertram <jbert...@apache.org>
AuthorDate: Mon May 8 11:58:05 2023 -0500

    ARTEMIS-2824 clientID not set on some notifications
---
 .../core/server/impl/ServerConsumerImpl.java       |   6 +-
 .../core/server/impl/ServerSessionImpl.java        |   4 +
 .../multiprotocol/JMSClientIdNotificationTest.java | 144 +++++++++++++++++++++
 .../MultiprotocolJMSClientTestSupport.java         |  22 ++--
 4 files changed, 164 insertions(+), 12 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 6630604425..42295aaa8b 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -607,10 +607,14 @@ public class ServerConsumerImpl implements 
ServerConsumer, ReadyListener {
          // HORNETQ-946
          props.putSimpleStringProperty(ManagementHelper.HDR_USER, 
SimpleString.toSimpleString(session.getUsername()));
 
-         props.putSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS, 
SimpleString.toSimpleString(((ServerSessionImpl) 
session).getRemotingConnection().getRemoteAddress()));
+         props.putSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS, 
SimpleString.toSimpleString(session.getRemotingConnection().getRemoteAddress()));
 
          props.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME, 
SimpleString.toSimpleString(session.getName()));
 
+         if (session.getRemotingConnection().getClientID() != null) {
+            props.putSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID, 
SimpleString.toSimpleString(session.getRemotingConnection().getClientID()));
+         }
+
          Notification notification = new Notification(null, 
CoreNotificationType.CONSUMER_CLOSED, props);
 
          managementService.sendNotification(notification);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 6f7fb3c897..1984dd8099 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -620,6 +620,10 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
             props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, 
filterString);
          }
 
+         if (remotingConnection.getClientID() != null) {
+            props.putSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID, 
SimpleString.toSimpleString(remotingConnection.getClientID()));
+         }
+
          Notification notification = new Notification(null, 
CoreNotificationType.CONSUMER_CREATED, props);
 
          if (logger.isDebugEnabled()) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSClientIdNotificationTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSClientIdNotificationTest.java
new file mode 100644
index 0000000000..713483613e
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSClientIdNotificationTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
+import org.apache.activemq.artemis.api.core.management.ManagementHelper;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import static 
org.apache.activemq.artemis.api.core.management.ManagementHelper.HDR_CLIENT_ID;
+
+public class JMSClientIdNotificationTest extends 
MultiprotocolJMSClientTestSupport {
+
+   private ClientConsumer notificationConsumer;
+   private String clientID;
+
+   @Before
+   public void setClientID() {
+      clientID = RandomUtil.randomString();
+   }
+
+   @Before
+   public void createNotificationConsumer() throws Exception {
+      ServerLocator locator = addServerLocator(createInVMNonHALocator());
+      ClientSessionFactory sf = 
addSessionFactory(locator.createSessionFactory());
+      ClientSession session = addClientSession(sf.createSession(false, true, 
true));
+      session.start();
+      SimpleString notificationQueue = RandomUtil.randomSimpleString();
+      session.createQueue(new 
QueueConfiguration(notificationQueue).setAddress(ActiveMQDefaultConfiguration.getDefaultManagementNotificationAddress()).setDurable(false));
+      notificationConsumer = 
addClientConsumer(session.createConsumer(notificationQueue));
+   }
+
+   private void flush() throws ActiveMQException {
+      ClientMessage message;
+      do {
+         message = notificationConsumer.receiveImmediate();
+      }
+      while (message != null);
+   }
+
+   @Override
+   protected void addConfiguration(ActiveMQServer server) throws Exception {
+      server.getConfiguration().addAcceptorConfiguration("invm", "vm://0");
+   }
+
+   @Test(timeout = 30000)
+   public void testConsumerNotificationAMQP() throws Exception {
+      
testConsumerNotification(createConnection(getBrokerQpidJMSConnectionURI(), 
null, null, clientID, true));
+   }
+
+   @Test(timeout = 30000)
+   public void testConsumerNotificationCore() throws Exception {
+      
testConsumerNotification(createCoreConnection(getBrokerCoreJMSConnectionString(),
 null, null, clientID, true));
+   }
+
+   @Test(timeout = 30000)
+   public void testConsumerNotificationOpenWire() throws Exception {
+      
testConsumerNotification(createOpenWireConnection(getBrokerOpenWireJMSConnectionString(),
 null, null, clientID, true));
+   }
+
+   private void testConsumerNotification(Connection connection) throws 
Exception {
+      final String subscriptionName = "mySub";
+      try {
+         flush();
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Topic topic = session.createTopic(getTopicName());
+         flush();
+         MessageConsumer consumer = session.createDurableSubscriber(topic, 
subscriptionName);
+         notificationConsumer.receiveImmediate(); // clear the BINDING_ADDED 
notification for the subscription queue
+         validateClientIdOnNotification(CoreNotificationType.CONSUMER_CREATED);
+         consumer.close();
+         validateClientIdOnNotification(CoreNotificationType.CONSUMER_CLOSED);
+         session.unsubscribe(subscriptionName);
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 30000)
+   public void testSessionNotificationAMQP() throws Exception {
+      
testSessionNotification(createConnection(getBrokerQpidJMSConnectionURI(), null, 
null, clientID, true));
+   }
+
+   @Test(timeout = 30000)
+   public void testSessionNotificationCore() throws Exception {
+      
testSessionNotification(createCoreConnection(getBrokerCoreJMSConnectionString(),
 null, null, clientID, true));
+   }
+
+   @Test(timeout = 30000)
+   public void testSessionNotificationOpenWire() throws Exception {
+      
testSessionNotification(createOpenWireConnection(getBrokerOpenWireJMSConnectionString(),
 null, null, clientID, true));
+   }
+
+   private void testSessionNotification(Connection connection) throws 
Exception {
+      try {
+         flush();
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         validateClientIdOnNotification(CoreNotificationType.SESSION_CREATED);
+         session.close();
+         validateClientIdOnNotification(CoreNotificationType.SESSION_CLOSED);
+      } finally {
+         connection.close();
+      }
+   }
+
+   private void validateClientIdOnNotification(CoreNotificationType 
notificationType) throws ActiveMQException {
+      Message m = notificationConsumer.receive(1000);
+      assertNotNull(m);
+      assertEquals(notificationType.toString(), 
m.getStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE));
+      assertTrue(m.getPropertyNames().contains(HDR_CLIENT_ID));
+      assertEquals(clientID, m.getStringProperty(HDR_CLIENT_ID));
+   }
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/MultiprotocolJMSClientTestSupport.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/MultiprotocolJMSClientTestSupport.java
index 59b49fc3b5..c4eb134469 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/MultiprotocolJMSClientTestSupport.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/MultiprotocolJMSClientTestSupport.java
@@ -173,7 +173,7 @@ public abstract class MultiprotocolJMSClientTestSupport 
extends ActiveMQTestBase
       return server;
    }
 
-   protected void addConfiguration(ActiveMQServer server) {
+   protected void addConfiguration(ActiveMQServer server) throws Exception {
 
    }
 
@@ -368,11 +368,11 @@ public abstract class MultiprotocolJMSClientTestSupport 
extends ActiveMQTestBase
       return createCoreConnection(getBrokerCoreJMSConnectionString(), null, 
null, null, start);
    }
 
-   private Connection createCoreConnection(String connectionString,
-                                                  String username,
-                                                  String password,
-                                                  String clientId,
-                                                  boolean start) throws 
JMSException {
+   protected Connection createCoreConnection(String connectionString,
+                                             String username,
+                                             String password,
+                                             String clientId,
+                                             boolean start) throws 
JMSException {
       ActiveMQJMSConnectionFactory factory = new 
ActiveMQJMSConnectionFactory(connectionString);
 
       Connection connection = 
trackJMSConnection(factory.createConnection(username, password));
@@ -414,11 +414,11 @@ public abstract class MultiprotocolJMSClientTestSupport 
extends ActiveMQTestBase
       return createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), 
null, null, null, start);
    }
 
-   private Connection createOpenWireConnection(String connectionString,
-                                                      String username,
-                                                      String password,
-                                                      String clientId,
-                                                      boolean start) throws 
JMSException {
+   protected Connection createOpenWireConnection(String connectionString,
+                                                 String username,
+                                                 String password,
+                                                 String clientId,
+                                                 boolean start) throws 
JMSException {
       ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(connectionString);
 
       Connection connection = 
trackJMSConnection(factory.createConnection(username, password));

Reply via email to