This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 51f39fc34d1f7c5c5b8797ffca2640161950064c
Author: a181321 <anton.roskv...@volvo.com>
AuthorDate: Fri Mar 15 11:15:07 2024 -0400

    ARTEMIS-4498 Expose internal queues for management and observability
---
 .../artemis/api/core/management/QueueControl.java  |   6 +
 .../src/main/webapp/plugin/js/components/queues.js |   9 +-
 .../core/management/impl/QueueControlImpl.java     |  15 ++
 .../core/management/impl/view/QueueField.java      |   3 +-
 .../core/management/impl/view/QueueView.java       |   5 +-
 .../core/postoffice/impl/PostOfficeImpl.java       |   5 +-
 .../core/server/impl/ActiveMQServerImpl.java       |   4 +-
 .../artemis/core/server/impl/AddressInfo.java      |   2 +
 .../artemis/core/server/impl/QueueImpl.java        |   2 +-
 .../distribution/SimpleSymmetricClusterTest.java   |  52 ++++++
 .../integration/management/QueueControlTest.java   |  20 +++
 .../management/QueueControlUsingCoreTest.java      |   5 +
 .../management/OpenWireManagementTest.java         | 191 ---------------------
 13 files changed, 116 insertions(+), 203 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
index 882d1de602..29aa30f028 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
@@ -127,6 +127,12 @@ public interface QueueControl {
    @Attribute(desc = DURABLE_PERSISTENT_SIZE_DESCRIPTION)
    long getDurablePersistentSize();
 
+   /**
+    * Returns whether this queue was created for the broker's internal use.
+    */
+   @Attribute(desc = "whether this queue was created for the broker's internal 
use")
+   boolean isInternalQueue();
+
    /**
     * Returns the number of scheduled messages in this queue.
     */
diff --git 
a/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/queues.js 
b/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/queues.js
index 72570de551..4d5bcd6d6b 100644
--- 
a/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/queues.js
+++ 
b/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/queues.js
@@ -104,7 +104,8 @@ var Artemis;
                   {name: "Ring Size", visible: false},
                   {name: "Consumers Before Dispatch", visible: false},
                   {name: "Delay Before Dispatch", visible: false},
-                  {name: "Auto Delete", visible: false}
+                  {name: "Auto Delete", visible: false},
+                  {name: "Internal", visible: false}
              ]
         };
 
@@ -142,7 +143,8 @@ var Artemis;
                 {id: 'paused', name: 'Paused'},
                 {id: 'temporary', name: 'Temporary'},
                 {id: 'autoCreated', name: 'Auto Created'},
-                {id: 'autoDelete', name: 'Auto Delete'}
+                {id: 'autoDelete', name: 'Auto Delete'},
+                {id: 'internalQueue', name: 'Internal'}
             ],
             operationOptions: [
                 {id: 'EQUALS', name: 'Equals'},
@@ -218,7 +220,8 @@ var Artemis;
             { header: 'Ring Size', itemField: 'ringSize'},
             { header: 'Consumers Before Dispatch', itemField: 
'consumersBeforeDispatch'},
             { header: 'Delay Before Dispatch', itemField: 
'delayBeforeDispatch'},
-            { header: 'Auto Delete', itemField: 'autoDelete'}
+            { header: 'Auto Delete', itemField: 'autoDelete'},
+            { header: 'Internal', itemField: 'internalQueue'}
         ];
 
         ctrl.refresh = function () {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index c45627dc2c..db91541c69 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -1054,6 +1054,21 @@ public class QueueControlImpl extends AbstractControl 
implements QueueControl {
       return value == null ? 0 : value;
    }
 
+   @Override
+   public boolean isInternalQueue() {
+      if (AuditLogger.isBaseLoggingEnabled()) {
+         AuditLogger.isInternal(queue);
+      }
+      checkStarted();
+
+      clearIO();
+      try {
+         return queue.isInternalQueue();
+      } finally {
+         blockOnIO();
+      }
+   }
+
    @Override
    public String countMessages(final String filterStr, final String 
groupByProperty) throws Exception {
       if (AuditLogger.isBaseLoggingEnabled()) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/QueueField.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/QueueField.java
index 12105c4e0f..d48533ff9f 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/QueueField.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/QueueField.java
@@ -53,7 +53,8 @@ public enum QueueField {
    RING_SIZE("ringSize"),
    CONSUMERS_BEFORE_DISPATCH("consumersBeforeDispatch"),
    DELAY_BEFORE_DISPATCH("delayBeforeDispatch"),
-   AUTO_DELETE("autoDelete");
+   AUTO_DELETE("autoDelete"),
+   INTERNAL_QUEUE("internalQueue");
 
    private static final Map<String, QueueField> lookup = new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/QueueView.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/QueueView.java
index e346e71b73..f825e1744c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/QueueView.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/QueueView.java
@@ -77,7 +77,8 @@ public class QueueView extends 
ActiveMQAbstractView<QueueControl> {
          .add(QueueField.RING_SIZE.getName(), toString(queue.getRingSize()))
          .add(QueueField.CONSUMERS_BEFORE_DISPATCH.getName(), 
toString(queue.getConsumersBeforeDispatch()))
          .add(QueueField.DELAY_BEFORE_DISPATCH.getName(), 
toString(queue.getDelayBeforeDispatch()))
-         .add(QueueField.AUTO_DELETE.getName(), toString(q.isAutoDelete()));
+         .add(QueueField.AUTO_DELETE.getName(), toString(q.isAutoDelete()))
+         .add(QueueField.INTERNAL_QUEUE.getName(), 
toString(q.isInternalQueue()));
       return obj;
    }
 
@@ -152,6 +153,8 @@ public class QueueView extends 
ActiveMQAbstractView<QueueControl> {
             return q.getConsumersBeforeDispatch();
          case DELAY_BEFORE_DISPATCH:
             return q.getDelayBeforeDispatch();
+         case INTERNAL_QUEUE:
+            return q.isInternalQueue();
          default:
             throw new IllegalArgumentException("Unsupported field, " + 
fieldName);
       }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 312d39c2e6..752cb23cab 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -555,9 +555,8 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
          // only register address if it is new
          if (result) {
             try {
-               if (!addressInfo.isInternal()) {
-                  managementService.registerAddress(addressInfo);
-               }
+               managementService.registerAddress(addressInfo);
+
                if (server.hasBrokerAddressPlugins()) {
                   server.callBrokerAddressPlugins(plugin -> 
plugin.afterAddAddress(addressInfo, reload));
                }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 0eeb8190ca..a24f5d8fae 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -4191,9 +4191,7 @@ public class ActiveMQServerImpl implements ActiveMQServer 
{
             throw e;
          }
 
-         if (!queueConfiguration.isInternal()) {
-            managementService.registerQueue(queue, queue.getAddress(), 
storageManager);
-         }
+         managementService.registerQueue(queue, queue.getAddress(), 
storageManager);
 
          copyRetroactiveMessages(queue);
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
index 311a77c14c..61359cf411 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
@@ -427,6 +427,8 @@ public class AddressInfo {
       } else if (key.equals("created-timestamp")) {
          JsonNumber jsonLong = (JsonNumber) value;
          this.createdTimestamp = jsonLong.longValue();
+      } else if (key.equals("internal")) {
+         this.internal = Boolean.valueOf(value.toString());
       }
    }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index fb2808ab5d..a2c77fd7af 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2974,7 +2974,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    }
 
    /**
-    * @return the internalQueue
+    * @return if queue is internal
     */
    @Override
    public boolean isInternalQueue() {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
index c650014c62..c22378198d 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.tests.integration.cluster.distribution;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 
@@ -27,6 +28,7 @@ import 
org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.core.config.DivertConfiguration;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import 
org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
@@ -35,6 +37,7 @@ import 
org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
 import 
org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
 import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import 
org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.junit.Assert;
 import org.junit.Ignore;
@@ -363,6 +366,55 @@ public class SimpleSymmetricClusterTest extends 
ClusterTestBase {
 
    }
 
+   @Test
+   public void testSimpleSnFManagement() throws Exception {
+      final String address = "queues.testaddress";
+      final String queue = "queue0";
+
+      setupServer(0, false, isNetty());
+      setupServer(1, false, isNetty());
+
+      setupClusterConnection("cluster0", "queues", 
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
+      setupClusterConnection("cluster1", "queues", 
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
+
+      startServers(0, 1);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+
+      createQueue(0, address, queue, null, false);
+      createQueue(1, address, queue, null, false);
+
+      addConsumer(0, 0, queue, null);
+      addConsumer(1, 1, queue, null);
+
+      waitForBindings(0, address, 1, 1, true);
+      waitForBindings(1, address, 1, 1, true);
+
+      waitForBindings(0, address, 1, 1, false);
+      waitForBindings(1, address, 1, 1, false);
+
+      SimpleString SnFQueueName = SimpleString.toSimpleString(
+         
Arrays.stream(servers[0].getActiveMQServerControl().getQueueNames()).filter(
+            queueName -> 
queueName.contains(servers[0].getInternalNamingPrefix()))
+            .findFirst()
+            .orElse(null));
+
+      Assert.assertNotNull(SnFQueueName);
+
+      QueueControl queueControl = 
ManagementControlHelper.createQueueControl(SnFQueueName, SnFQueueName, 
RoutingType.MULTICAST, servers[0].getMBeanServer());
+
+      //check that internal queue can be managed
+      queueControl.pause();
+      Assert.assertTrue(queueControl.isPaused());
+
+      queueControl.resume();
+      Assert.assertFalse(queueControl.isPaused());
+
+      closeAllConsumers();
+
+   }
+
    @Test
    public void testSimple2() throws Exception {
       setupServer(0, true, isNetty());
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index b82d78f807..412cc5b033 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -278,6 +278,26 @@ public class QueueControlTest extends ManagementTestBase {
       session.deleteQueue(queue);
    }
 
+   @Test
+   public void testRegisterInternalQueues() throws Exception {
+      SimpleString queue = RandomUtil.randomSimpleString();
+
+      server.createQueue(new 
QueueConfiguration(queue).setDurable(durable).setInternal(true));
+
+      QueueControl queueControl = createManagementControl(queue, queue);
+      Assert.assertNotNull(queueControl);
+      Assert.assertTrue(server.locateQueue(queue).isInternalQueue());
+      Assert.assertEquals(queue.toString(), queueControl.getName());
+      Assert.assertEquals(durable, queueControl.isDurable());
+
+      //check that internal queue can be managed
+      queueControl.pause();
+      Assert.assertTrue(queueControl.isPaused());
+
+      queueControl.resume();
+      Assert.assertFalse(queueControl.isPaused());
+   }
+
    @Test
    public void testAutoDeleteAttribute() throws Exception {
       SimpleString address = RandomUtil.randomSimpleString();
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index fe613253ba..cc2a05e9af 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -252,6 +252,11 @@ public class QueueControlUsingCoreTest extends 
QueueControlTest {
             return (String) proxy.retrieveAttributeValue("lastValueKey");
          }
 
+         @Override
+         public boolean isInternalQueue() {
+            return (boolean) proxy.retrieveAttributeValue("internalQueue");
+         }
+
          @Override
          public int getConsumersBeforeDispatch() {
             return (Integer) 
proxy.retrieveAttributeValue("consumersBeforeDispatch");
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/management/OpenWireManagementTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/management/OpenWireManagementTest.java
deleted file mode 100644
index e451c233ba..0000000000
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/management/OpenWireManagementTest.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.openwire.management;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Session;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.advisory.AdvisorySupport;
-import org.apache.activemq.advisory.ConsumerEventSource;
-import org.apache.activemq.advisory.ProducerEventSource;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.QueueConfiguration;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.TransportConfiguration;
-import org.apache.activemq.artemis.api.core.client.ClientSession;
-import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
-import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
-import org.apache.activemq.artemis.core.config.Configuration;
-import org.apache.activemq.artemis.jms.client.ActiveMQSession;
-import 
org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
-import org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class OpenWireManagementTest extends OpenWireTestBase {
-
-   private ActiveMQServerControl serverControl;
-   private SimpleString queueName1 = new SimpleString("queue1");
-   private SimpleString queueName2 = new SimpleString("queue2");
-   private SimpleString queueName3 = new SimpleString("queue3");
-
-   private ConnectionFactory factory;
-
-   @Parameterized.Parameters(name = 
"useDefault={0},supportAdvisory={1},suppressJmx={2}")
-   public static Iterable<Object[]> data() {
-      return Arrays.asList(new Object[][] {
-         {true, false, false},
-         {false, true, false},
-         {false, true, true},
-         {false, false, false},
-         {false, false, true}
-      });
-   }
-
-   private boolean useDefault;
-   private boolean supportAdvisory;
-   private boolean suppressJmx;
-
-   public OpenWireManagementTest(boolean useDefault, boolean supportAdvisory, 
boolean suppressJmx) {
-      this.useDefault = useDefault;
-      this.supportAdvisory = supportAdvisory;
-      this.suppressJmx = suppressJmx;
-   }
-
-   @Before
-   @Override
-   public void setUp() throws Exception {
-      super.setUp();
-      serverControl = (ActiveMQServerControl) 
ManagementControlHelper.createProxy(ObjectNameBuilder.DEFAULT.getActiveMQServerObjectName(),
 ActiveMQServerControl.class, mbeanServer);
-      factory = new ActiveMQConnectionFactory(urlString);
-   }
-
-   @Override
-   protected void extraServerConfig(Configuration serverConfig) {
-      serverConfig.setJMXManagementEnabled(true);
-      if (useDefault) {
-         //don't set parameters explicitly
-         return;
-      }
-      Set<TransportConfiguration> acceptorConfigs = 
serverConfig.getAcceptorConfigurations();
-      for (TransportConfiguration tconfig : acceptorConfigs) {
-         if ("netty".equals(tconfig.getName())) {
-            Map<String, Object> params = tconfig.getExtraParams();
-            params.put("supportAdvisory", supportAdvisory);
-            params.put("suppressInternalManagementObjects", suppressJmx);
-         }
-      }
-   }
-
-   @Test
-   public void testHiddenInternalAddress() throws Exception {
-      server.createQueue(new 
QueueConfiguration(queueName1).setRoutingType(RoutingType.ANYCAST).setAutoCreateAddress(true));
-      server.createQueue(new 
QueueConfiguration(queueName2).setRoutingType(RoutingType.ANYCAST).setAutoCreateAddress(true));
-      server.createQueue(new 
QueueConfiguration(queueName3).setRoutingType(RoutingType.ANYCAST).setAutoCreateAddress(true));
-
-      String[] addresses = serverControl.getAddressNames();
-      assertEquals(4, addresses.length);
-      for (String addr : addresses) {
-         assertFalse(addr.startsWith(AdvisorySupport.ADVISORY_TOPIC_PREFIX));
-      }
-
-      try (Connection connection = factory.createConnection()) {
-         connection.start();
-         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-         Destination destination = session.createQueue(queueName1.toString());
-
-         ConsumerEventSource consumerEventSource = new 
ConsumerEventSource(connection, destination);
-         consumerEventSource.setConsumerListener(consumerEvent -> {
-         });
-         consumerEventSource.start();
-
-         ProducerEventSource producerEventSource = new 
ProducerEventSource(connection, destination);
-         producerEventSource.setProducerListener(producerEvent -> {
-         });
-         producerEventSource.start();
-
-         //after that point several advisory addresses are created.
-         //make sure they are not accessible via management api.
-         addresses = serverControl.getAddressNames();
-         boolean hasInternalAddress = false;
-         for (String addr : addresses) {
-            hasInternalAddress = 
addr.startsWith(AdvisorySupport.ADVISORY_TOPIC_PREFIX);
-            if (hasInternalAddress) {
-               break;
-            }
-         }
-         assertEquals(!useDefault && supportAdvisory && !suppressJmx, 
hasInternalAddress);
-
-         consumerEventSource.stop();
-         producerEventSource.stop();
-      }
-   }
-
-   @Test
-   public void testHiddenInternalQueue() throws Exception {
-      server.createQueue(new 
QueueConfiguration(queueName1).setRoutingType(RoutingType.ANYCAST));
-
-      String[] queues = serverControl.getQueueNames();
-      assertEquals(1, queues.length);
-      for (String queue : queues) {
-         assertFalse(checkQueueFromInternalAddress(queue));
-      }
-
-      try (Connection connection = factory.createConnection()) {
-         connection.start();
-         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-         Destination destination = session.createQueue(queueName1.toString());
-
-         //this causes advisory queues to be created
-         session.createProducer(destination);
-
-         queues = serverControl.getQueueNames();
-         boolean hasInternal = false;
-         String targetQueue = null;
-         for (String queue : queues) {
-            hasInternal = checkQueueFromInternalAddress(queue);
-            if (hasInternal) {
-               targetQueue = queue;
-               break;
-            }
-         }
-         assertEquals("targetQueue: " + targetQueue, !useDefault && 
supportAdvisory && !suppressJmx, hasInternal);
-      }
-   }
-
-   private boolean checkQueueFromInternalAddress(String queue) throws 
JMSException, ActiveMQException {
-      try (Connection coreConn = coreCf.createConnection()) {
-         ActiveMQSession session = (ActiveMQSession) coreConn.createSession();
-         ClientSession coreSession = session.getCoreSession();
-         ClientSession.QueueQuery query = coreSession.queueQuery(new 
SimpleString(queue));
-         assertTrue("Queue doesn't exist: " + queue, query.isExists());
-         SimpleString qAddr = query.getAddress();
-         return 
qAddr.toString().startsWith(AdvisorySupport.ADVISORY_TOPIC_PREFIX);
-      }
-   }
-}

Reply via email to