Repository: activemq-artemis
Updated Branches:
  refs/heads/master 3c04de3ab -> 9a8055bd3


ARTEMIS-1365 Advisory consumers listed in Console

Openwire clients create consumers to advisory topics to receive
notifications. As a result there are internal queues created
on advisory topics. Those consumer shouldn't be exposed via
management APIs which are used by the Console

To fix that the broker doesn't register any queues from
advisory addresses.

Also refactors a code to remove Openwire specific contants
from AddressInfo class.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5fe88e1c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5fe88e1c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5fe88e1c

Branch: refs/heads/master
Commit: 5fe88e1c667870a398d85e427d324542399f9dfe
Parents: 3c04de3
Author: Howard Gao <howard....@gmail.com>
Authored: Fri Nov 17 17:39:18 2017 +0800
Committer: Howard Gao <howard....@gmail.com>
Committed: Fri Nov 17 17:39:40 2017 +0800

----------------------------------------------------------------------
 .../activemq/artemis/utils/PrefixUtil.java      |  13 +--
 .../protocol/openwire/OpenWireConnection.java   |  23 +++-
 .../openwire/OpenWireProtocolManager.java       |  24 ++++
 .../core/protocol/openwire/amq/AMQConsumer.java |  17 ++-
 .../core/protocol/openwire/amq/AMQSession.java  |  10 +-
 .../stomp/VersionedStompFrameHandler.java       |   3 +-
 .../artemis/core/server/ActiveMQServer.java     |   4 +
 .../artemis/core/server/ServerSession.java      |  22 +++-
 .../core/server/impl/ActiveMQServerImpl.java    | 113 +++++++++++++++++++
 .../artemis/core/server/impl/AddressInfo.java   |  36 ++++--
 .../core/server/impl/ServerSessionImpl.java     |  68 +++++++----
 .../management/impl/ManagementServiceImpl.java  |  22 +++-
 .../core/settings/HierarchicalRepository.java   |   1 +
 .../en/protocols-interoperability.md            |  27 +++++
 .../management/OpenWireManagementTest.java      | 101 ++++++++++++++++-
 15 files changed, 424 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java
index e3fc5e6..5313a8a 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java
@@ -26,17 +26,6 @@ import org.apache.activemq.artemis.api.core.RoutingType;
 
 public class PrefixUtil {
 
-   public static Pair<SimpleString, RoutingType> 
getAddressAndRoutingType(SimpleString address,
-                                                                   RoutingType 
defaultRoutingType,
-                                                                   
Map<SimpleString, RoutingType> prefixes) {
-      for (Map.Entry<SimpleString, RoutingType> entry : prefixes.entrySet()) {
-         if (address.startsWith(entry.getKey())) {
-            return new Pair<>(removePrefix(address, entry.getKey()), 
entry.getValue());
-         }
-      }
-      return new Pair<>(address, defaultRoutingType);
-   }
-
    public static Pair<SimpleString, Set<RoutingType>> 
getAddressAndRoutingTypes(SimpleString address,
                                                                           
Set<RoutingType> defaultRoutingTypes,
                                                                           
Map<SimpleString, RoutingType> prefixes) {
@@ -59,7 +48,7 @@ public class PrefixUtil {
       return address;
    }
 
-   private static SimpleString removePrefix(SimpleString string, SimpleString 
prefix) {
+   public static SimpleString removePrefix(SimpleString string, SimpleString 
prefix) {
       return string.subSeq(prefix.length(), string.length());
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index f8d5f57..19d158d 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -67,6 +67,7 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
 import org.apache.activemq.artemis.core.server.TempQueueObserver;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.RefsOperation;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.ResourceManager;
@@ -726,15 +727,22 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
    public void addDestination(DestinationInfo info) throws Exception {
       boolean created = false;
       ActiveMQDestination dest = info.getDestination();
+      if (!protocolManager.isSupportAdvisory() && 
AdvisorySupport.isAdvisoryTopic(dest)) {
+         return;
+      }
 
       SimpleString qName = SimpleString.toSimpleString(dest.getPhysicalName());
       if (server.locateQueue(qName) == null) {
          AddressSettings addressSettings = 
server.getAddressSettingsRepository().getMatch(dest.getPhysicalName());
+         AddressInfo addressInfo = new AddressInfo(qName, dest.isTopic() ? 
RoutingType.MULTICAST : RoutingType.ANYCAST);
+         if (AdvisorySupport.isAdvisoryTopic(dest) && 
protocolManager.isSuppressInternalManagementObjects()) {
+            addressInfo.setInternal(true);
+         }
          if (dest.isQueue() && (addressSettings.isAutoCreateQueues() || 
dest.isTemporary())) {
-            internalSession.createQueue(qName, qName, RoutingType.ANYCAST, 
null, dest.isTemporary(), !dest.isTemporary(), !dest.isTemporary());
+            internalSession.createQueue(addressInfo, qName, null, 
dest.isTemporary(), !dest.isTemporary(), !dest.isTemporary());
             created = true;
          } else if (dest.isTopic() && (addressSettings.isAutoCreateAddresses() 
|| dest.isTemporary())) {
-            internalSession.createAddress(qName, RoutingType.MULTICAST, 
!dest.isTemporary());
+            internalSession.createAddress(addressInfo, !dest.isTemporary());
             created = true;
          }
       }
@@ -783,6 +791,9 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
          }
 
          List<AMQConsumer> consumersList = amqSession.createConsumer(info, new 
SlowConsumerDetection());
+         if (consumersList.size() == 0) {
+            return;
+         }
 
          this.addConsumerBrokerExchange(info.getConsumerId(), amqSession, 
consumersList);
          ss.addConsumer(info);
@@ -878,6 +889,14 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
       return state.getTempDestinations();
    }
 
+   public boolean isSuppressInternalManagementObjects() {
+      return protocolManager.isSuppressInternalManagementObjects();
+   }
+
+   public boolean isSuppportAdvisory() {
+      return protocolManager.isSupportAdvisory();
+   }
+
    class SlowConsumerDetection implements SlowConsumerDetectionListener {
 
       @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index b552c35..87925b5 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -119,6 +119,11 @@ public class OpenWireProtocolManager implements 
ProtocolManager<Interceptor>, Cl
    private long maxInactivityDurationInitalDelay = 10 * 1000L;
    private boolean useKeepAlive = true;
 
+   private boolean supportAdvisory = true;
+   //prevents advisory addresses/queues to be registered
+   //to management service
+   private boolean suppressInternalManagementObjects = true;
+
    private final OpenWireMessageConverter internalConverter;
 
    private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();
@@ -348,6 +353,9 @@ public class OpenWireProtocolManager implements 
ProtocolManager<Interceptor>, Cl
                             Command command,
                             ConsumerId targetConsumerId,
                             String originalConnectionId) throws Exception {
+      if (!this.isSupportAdvisory()) {
+         return;
+      }
       ActiveMQMessage advisoryMessage = new ActiveMQMessage();
 
       if (originalConnectionId == null) {
@@ -583,4 +591,20 @@ public class OpenWireProtocolManager implements 
ProtocolManager<Interceptor>, Cl
    public OpenWireMessageConverter getInternalConverter() {
       return internalConverter;
    }
+
+   public boolean isSupportAdvisory() {
+      return supportAdvisory;
+   }
+
+   public void setSupportAdvisory(boolean supportAdvisory) {
+      this.supportAdvisory = supportAdvisory;
+   }
+
+   public boolean isSuppressInternalManagementObjects() {
+      return suppressInternalManagementObjects;
+   }
+
+   public void setSuppressInternalManagementObjects(boolean 
suppressInternalManagementObjects) {
+      this.suppressInternalManagementObjects = 
suppressInternalManagementObjects;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 57506a2..fb50582 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -63,11 +63,15 @@ public class AMQConsumer {
    private AtomicInteger currentWindow;
    private long messagePullSequence = 0;
    private MessagePullHandler messagePullHandler;
+   //internal means we don't expose
+   //it's address/queue to management service
+   private boolean internalAddress = false;
 
    public AMQConsumer(AMQSession amqSession,
                       org.apache.activemq.command.ActiveMQDestination d,
                       ConsumerInfo info,
-                      ScheduledExecutorService scheduledPool) {
+                      ScheduledExecutorService scheduledPool,
+                      boolean internalAddress) {
       this.session = amqSession;
       this.openwireDestination = d;
       this.info = info;
@@ -77,6 +81,7 @@ public class AMQConsumer {
       if (prefetchSize == 0) {
          messagePullHandler = new MessagePullHandler();
       }
+      this.internalAddress = internalAddress;
    }
 
    public void init(SlowConsumerDetectionListener 
slowConsumerDetectionListener, long nativeId) throws Exception {
@@ -143,7 +148,10 @@ public class AMQConsumer {
       AddressInfo addressInfo = 
session.getCoreServer().getAddressInfo(address);
       if (addressInfo != null) {
          addressInfo.addRoutingType(RoutingType.MULTICAST);
+      } else {
+         addressInfo = new AddressInfo(address, RoutingType.MULTICAST);
       }
+      addressInfo.setInternal(internalAddress);
       if (isDurable) {
          queueName = new 
SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true,
 clientID, subscriptionName));
          QueueQueryResult result = 
session.getCoreSession().executeQueueQuery(queueName);
@@ -166,16 +174,15 @@ public class AMQConsumer {
                session.getCoreSession().deleteQueue(queueName);
 
                // Create the new one
-               session.getCoreSession().createQueue(address, queueName, 
RoutingType.MULTICAST, selector, false, true);
+               session.getCoreSession().createQueue(addressInfo, queueName, 
selector, false, true);
             }
          } else {
-            session.getCoreSession().createQueue(address, queueName, 
RoutingType.MULTICAST, selector, false, true);
+            session.getCoreSession().createQueue(addressInfo, queueName, 
selector, false, true);
          }
       } else {
          queueName = new SimpleString(UUID.randomUUID().toString());
 
-         session.getCoreSession().createQueue(address, queueName, 
RoutingType.MULTICAST, selector, true, false);
-
+         session.getCoreSession().createQueue(addressInfo, queueName, 
selector, true, false);
       }
 
       return queueName;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 330ac35..057072b 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -159,6 +160,13 @@ public class AMQSession implements SessionCallback {
       List<AMQConsumer> consumersList = new java.util.LinkedList<>();
 
       for (ActiveMQDestination openWireDest : dests) {
+         boolean isInternalAddress = false;
+         if (AdvisorySupport.isAdvisoryTopic(dest)) {
+            if (!connection.isSuppportAdvisory()) {
+               continue;
+            }
+            isInternalAddress = 
connection.isSuppressInternalManagementObjects();
+         }
          if (openWireDest.isQueue()) {
             SimpleString queueName = new 
SimpleString(convertWildcard(openWireDest.getPhysicalName()));
 
@@ -166,7 +174,7 @@ public class AMQSession implements SessionCallback {
                throw new InvalidDestinationException("Destination doesn't 
exist: " + queueName);
             }
          }
-         AMQConsumer consumer = new AMQConsumer(this, openWireDest, info, 
scheduledPool);
+         AMQConsumer consumer = new AMQConsumer(this, openWireDest, info, 
scheduledPool, isInternalAddress);
 
          long nativeID = consumerIDGenerator.generateID();
          consumer.init(slowConsumerDetectionListener, nativeID);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index bdae6fc..2259a17 100644
--- 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -20,6 +20,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RoutingType;
@@ -350,7 +351,7 @@ public abstract class VersionedStompFrameHandler {
       if (typeHeader != null) {
          routingType = RoutingType.valueOf(typeHeader);
       } else {
-         routingType = 
connection.getSession().getCoreSession().getAddressAndRoutingType(SimpleString.toSimpleString(destination),
 null).getB();
+         routingType = 
connection.getSession().getCoreSession().getAddressAndRoutingType(new 
AddressInfo(new SimpleString(destination))).getRoutingType();
       }
       return routingType;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 643d2be..5eeb3db 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -318,6 +318,10 @@ public interface ActiveMQServer extends ServiceComponent {
                      SimpleString user, boolean durable, boolean temporary, 
boolean autoCreated, Integer maxConsumers,
                      Boolean purgeOnNoConsumers, boolean autoCreateAddress) 
throws Exception;
 
+   Queue createQueue(AddressInfo addressInfo, SimpleString queueName, 
SimpleString filter,
+                     SimpleString user, boolean durable, boolean temporary, 
boolean autoCreated, Integer maxConsumers,
+                     Boolean purgeOnNoConsumers, boolean autoCreateAddress) 
throws Exception;
+
    Queue createQueue(SimpleString address, RoutingType routingType, 
SimpleString queueName, SimpleString filter,
                      SimpleString user, boolean durable, boolean temporary, 
boolean ignoreIfExists, boolean transientQueue,
                      boolean autoCreated, int maxConsumers, boolean 
purgeOnNoConsumers, boolean autoCreateAddress) throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index 7162acd..10de4dc 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -115,6 +115,12 @@ public interface ServerSession extends SecurityAuth {
                      boolean temporary,
                      boolean durable) throws Exception;
 
+   Queue createQueue(AddressInfo address,
+                     SimpleString name,
+                     SimpleString filterString,
+                     boolean temporary,
+                     boolean durable) throws Exception;
+
    /**
     * Create queue with default delivery mode
     *
@@ -150,6 +156,13 @@ public interface ServerSession extends SecurityAuth {
                      boolean durable,
                      boolean autoCreated) throws Exception;
 
+   Queue createQueue(AddressInfo addressInfo,
+                     SimpleString name,
+                     SimpleString filterString,
+                     boolean temporary,
+                     boolean durable,
+                     boolean autoCreated) throws Exception;
+
    AddressInfo createAddress(SimpleString address,
                              Set<RoutingType> routingTypes,
                              boolean autoCreated) throws Exception;
@@ -158,6 +171,9 @@ public interface ServerSession extends SecurityAuth {
                              RoutingType routingType,
                              boolean autoCreated) throws Exception;
 
+   AddressInfo createAddress(AddressInfo addressInfo,
+                             boolean autoCreated) throws Exception;
+
    void deleteQueue(SimpleString name) throws Exception;
 
    ServerConsumer createConsumer(long consumerID,
@@ -270,13 +286,11 @@ public interface ServerSession extends SecurityAuth {
    /**
     * Get the canonical (i.e. non-prefixed) address and the corresponding 
routing-type.
     *
-    * @param address the address to inspect
-    * @param defaultRoutingType the {@code 
org.apache.activemq.artemis.api.core.RoutingType} to return if no prefix
-    *                           match is found.
+    * @param addressInfo the address to inspect
     * @return a {@code org.apache.activemq.artemis.api.core.Pair} representing 
the canonical (i.e. non-prefixed) address
     *         name and the {@code 
org.apache.activemq.artemis.api.core.RoutingType} corresponding to the that 
prefix.
     */
-   Pair<SimpleString, RoutingType> getAddressAndRoutingType(SimpleString 
address, RoutingType defaultRoutingType);
+   AddressInfo getAddressAndRoutingType(AddressInfo addressInfo);
 
    /**
     * Get the canonical (i.e. non-prefixed) address and the corresponding 
routing-type.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 90b0f83..469828a 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1674,6 +1674,7 @@ public class ActiveMQServerImpl implements ActiveMQServer 
{
    }
 
    @Override
+   @Deprecated
    public Queue createQueue(SimpleString address,
                             RoutingType routingType,
                             SimpleString queueName,
@@ -1688,6 +1689,11 @@ public class ActiveMQServerImpl implements 
ActiveMQServer {
       return createQueue(address, routingType, queueName, filter, user, 
durable, temporary, false, false, autoCreated, maxConsumers, 
purgeOnNoConsumers, autoCreateAddress);
    }
 
+   @Override
+   public Queue createQueue(AddressInfo addressInfo, SimpleString queueName, 
SimpleString filter, SimpleString user, boolean durable, boolean temporary, 
boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, boolean 
autoCreateAddress) throws Exception {
+      return createQueue(addressInfo, queueName, filter, user, durable, 
temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, 
autoCreateAddress);
+   }
+
    @Deprecated
    @Override
    public Queue createQueue(final SimpleString address,
@@ -2666,6 +2672,113 @@ public class ActiveMQServerImpl implements 
ActiveMQServer {
       return postOffice.getAddressInfo(address);
    }
 
+   public Queue createQueue(final AddressInfo addrInfo,
+                            final SimpleString queueName,
+                            final SimpleString filterString,
+                            final SimpleString user,
+                            final boolean durable,
+                            final boolean temporary,
+                            final boolean ignoreIfExists,
+                            final boolean transientQueue,
+                            final boolean autoCreated,
+                            final int maxConsumers,
+                            final boolean purgeOnNoConsumers,
+                            final boolean autoCreateAddress) throws Exception {
+      final QueueBinding binding = (QueueBinding) 
postOffice.getBinding(queueName);
+      if (binding != null) {
+         if (ignoreIfExists) {
+            return binding.getQueue();
+         } else {
+            throw ActiveMQMessageBundle.BUNDLE.queueAlreadyExists(queueName, 
binding.getAddress());
+         }
+      }
+
+      final Filter filter = FilterImpl.createFilter(filterString);
+
+      final long txID = storageManager.generateID();
+      final long queueID = storageManager.generateID();
+
+      final QueueConfig.Builder queueConfigBuilder;
+
+      final SimpleString addressToUse = addrInfo == null ? queueName : 
addrInfo.getName();
+
+      queueConfigBuilder = QueueConfig.builderWith(queueID, queueName, 
addressToUse);
+
+      AddressInfo info = postOffice.getAddressInfo(addressToUse);
+
+      RoutingType routingType = addrInfo == null ? null : 
addrInfo.getRoutingType();
+      RoutingType rt = (routingType == null ? 
ActiveMQDefaultConfiguration.getDefaultRoutingType() : routingType);
+      if (autoCreateAddress) {
+         if (info == null) {
+            final AddressInfo addressInfo = new AddressInfo(addressToUse, rt);
+            addressInfo.setAutoCreated(true);
+            addressInfo.setInternal(addrInfo == null ? false : 
addrInfo.isInternal());
+            addAddressInfo(addressInfo);
+         } else if (!info.getRoutingTypes().contains(rt)) {
+            Set<RoutingType> routingTypes = new HashSet<>();
+            routingTypes.addAll(info.getRoutingTypes());
+            routingTypes.add(rt);
+            updateAddressInfo(info.getName(), routingTypes);
+         }
+      } else if (info == null) {
+         throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressToUse);
+      } else if (!info.getRoutingTypes().contains(rt)) {
+         throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(rt, 
info.getName().toString(), info.getRoutingTypes());
+      }
+
+      final QueueConfig queueConfig = 
queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).routingType(addrInfo.getRoutingType()).maxConsumers(maxConsumers).purgeOnNoConsumers(purgeOnNoConsumers).build();
+
+      callBrokerPlugins(hasBrokerPlugins() ? plugin -> 
plugin.beforeCreateQueue(queueConfig) : null);
+
+      final Queue queue = queueFactory.createQueueWith(queueConfig);
+
+      if (transientQueue) {
+         queue.setConsumersRefCount(new TransientQueueManagerImpl(this, 
queue.getName()));
+      } else {
+         queue.setConsumersRefCount(new QueueManagerImpl(this, 
queue.getName()));
+      }
+
+      final QueueBinding localQueueBinding = new 
LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId());
+
+      if (queue.isDurable()) {
+         storageManager.addQueueBinding(txID, localQueueBinding);
+      }
+
+      try {
+         postOffice.addBinding(localQueueBinding);
+         if (queue.isDurable()) {
+            storageManager.commitBindings(txID);
+         }
+      } catch (Exception e) {
+         try {
+            if (durable) {
+               storageManager.rollbackBindings(txID);
+            }
+            final PageSubscription pageSubscription = 
queue.getPageSubscription();
+            try {
+               queue.close();
+            } finally {
+               if (pageSubscription != null) {
+                  pageSubscription.destroy();
+               }
+            }
+         } catch (Throwable ignored) {
+            logger.debug(ignored.getMessage(), ignored);
+         }
+         throw e;
+      }
+
+      if (addrInfo == null || !addrInfo.isInternal()) {
+         managementService.registerQueue(queue, queue.getAddress(), 
storageManager);
+      }
+
+      callPostQueueCreationCallbacks(queue.getName());
+
+      callBrokerPlugins(hasBrokerPlugins() ? plugin -> 
plugin.afterCreateQueue(queue) : null);
+
+      return queue;
+   }
+
    @Override
    public Queue createQueue(final SimpleString address,
                             final RoutingType routingType,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
index 3fb808f..8a7691e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
@@ -16,17 +16,16 @@
  */
 package org.apache.activemq.artemis.core.server.impl;
 
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.utils.PrefixUtil;
+
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.RoutingType;
-
 public class AddressInfo {
 
-   //from openwire
-   public static final SimpleString ADVISORY_TOPIC = new 
SimpleString("ActiveMQ.Advisory.");
-
    private long id;
 
    private final SimpleString name;
@@ -35,6 +34,8 @@ public class AddressInfo {
 
    private Set<RoutingType> routingTypes;
 
+   private boolean internal = false;
+
    public AddressInfo(SimpleString name) {
       this(name, new HashSet<>());
    }
@@ -130,6 +131,27 @@ public class AddressInfo {
    }
 
    public boolean isInternal() {
-      return this.name.startsWith(ADVISORY_TOPIC);
+      return this.internal;
    }
+
+   public void setInternal(boolean internal) {
+      this.internal = internal;
+   }
+
+   public AddressInfo create(SimpleString name, RoutingType routingType) {
+      AddressInfo info = new AddressInfo(name, routingType);
+      info.setInternal(this.internal);
+      return info;
+   }
+
+   public AddressInfo getAddressAndRoutingType(Map<SimpleString, RoutingType> 
prefixes) {
+      for (Map.Entry<SimpleString, RoutingType> entry : prefixes.entrySet()) {
+         if (this.getName().startsWith(entry.getKey())) {
+            AddressInfo newAddressInfo = 
this.create(PrefixUtil.removePrefix(this.getName(), entry.getKey()), 
entry.getValue());
+            return newAddressInfo;
+         }
+      }
+      return this;
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index c48cb2e..dacc6bf 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -548,9 +548,13 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
    }
 
    @Override
-   public Queue createQueue(final SimpleString address,
+   public Queue createQueue(AddressInfo addressInfo, SimpleString name, 
SimpleString filterString, boolean temporary, boolean durable) throws Exception 
{
+      AddressSettings as = 
server.getAddressSettingsRepository().getMatch(addressInfo.getName().toString());
+      return createQueue(addressInfo, name, filterString, temporary, durable, 
as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), false);
+   }
+
+   public Queue createQueue(final AddressInfo addressInfo,
                             final SimpleString name,
-                            final RoutingType routingType,
                             final SimpleString filterString,
                             final boolean temporary,
                             final boolean durable,
@@ -559,18 +563,18 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
                             final boolean autoCreated) throws Exception {
       final SimpleString unPrefixedName = removePrefix(name);
 
-      Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(address, 
routingType);
+      AddressInfo art = getAddressAndRoutingType(addressInfo);
 
       if (durable) {
          // make sure the user has privileges to create this queue
-         securityCheck(address, name, CheckType.CREATE_DURABLE_QUEUE, this);
+         securityCheck(addressInfo.getName(), name, 
CheckType.CREATE_DURABLE_QUEUE, this);
       } else {
-         securityCheck(address, name, CheckType.CREATE_NON_DURABLE_QUEUE, 
this);
+         securityCheck(addressInfo.getName(), name, 
CheckType.CREATE_NON_DURABLE_QUEUE, this);
       }
 
       server.checkQueueCreationLimit(getUsername());
 
-      Queue queue = server.createQueue(art.getA(), art.getB(), unPrefixedName, 
filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, 
autoCreated, maxConsumers, purgeOnNoConsumers, 
server.getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateAddresses());
+      Queue queue = server.createQueue(art, unPrefixedName, filterString, 
SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, 
maxConsumers, purgeOnNoConsumers, 
server.getAddressSettingsRepository().getMatch(art.getName().toString()).isAutoCreateAddresses());
 
       if (temporary) {
          // Temporary queue in core simply means the queue will be deleted if
@@ -591,13 +595,25 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
       }
 
       if (logger.isDebugEnabled()) {
-         logger.debug("Queue " + unPrefixedName + " created on address " + 
address +
-                         " with filter=" + filterString + " temporary = " +
-                         temporary + " durable=" + durable + " on session 
user=" + this.username + ", connection=" + this.remotingConnection);
+         logger.debug("Queue " + unPrefixedName + " created on address " + 
addressInfo.getName() +
+                 " with filter=" + filterString + " temporary = " +
+                 temporary + " durable=" + durable + " on session user=" + 
this.username + ", connection=" + this.remotingConnection);
       }
 
       return queue;
+   }
 
+   @Override
+   public Queue createQueue(final SimpleString address,
+                            final SimpleString name,
+                            final RoutingType routingType,
+                            final SimpleString filterString,
+                            final boolean temporary,
+                            final boolean durable,
+                            final int maxConsumers,
+                            final boolean purgeOnNoConsumers,
+                            final boolean autoCreated) throws Exception {
+      return createQueue(new AddressInfo(address, routingType), name, 
filterString, temporary, durable, maxConsumers, purgeOnNoConsumers, 
autoCreated);
    }
 
    @Override
@@ -613,6 +629,12 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
    }
 
    @Override
+   public Queue createQueue(AddressInfo addressInfo, SimpleString name, 
SimpleString filterString, boolean temporary, boolean durable, boolean 
autoCreated) throws Exception {
+      AddressSettings as = 
server.getAddressSettingsRepository().getMatch(addressInfo.getName().toString());
+      return createQueue(addressInfo, name, filterString, temporary, durable, 
as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), autoCreated);
+   }
+
+   @Override
    public AddressInfo createAddress(final SimpleString address,
                                     Set<RoutingType> routingTypes,
                                     final boolean autoCreated) throws 
Exception {
@@ -626,10 +648,15 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
    public AddressInfo createAddress(final SimpleString address,
                                     RoutingType routingType,
                                     final boolean autoCreated) throws 
Exception {
-      Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(address, 
routingType);
-      securityCheck(art.getA(), CheckType.CREATE_ADDRESS, this);
-      server.addOrUpdateAddressInfo(new AddressInfo(art.getA(), 
art.getB()).setAutoCreated(autoCreated));
-      return server.getAddressInfo(art.getA());
+      return createAddress(new AddressInfo(address, routingType), autoCreated);
+   }
+
+   @Override
+   public AddressInfo createAddress(AddressInfo addressInfo, boolean 
autoCreated) throws Exception {
+      AddressInfo art = getAddressAndRoutingType(addressInfo);
+      securityCheck(art.getName(), CheckType.CREATE_ADDRESS, this);
+      server.addOrUpdateAddressInfo(art.setAutoCreated(autoCreated));
+      return server.getAddressInfo(art.getName());
    }
 
    @Override
@@ -1672,12 +1699,12 @@ public class ServerSessionImpl implements 
ServerSession, FailureListener {
             }
          } */
 
-      Pair<SimpleString, RoutingType> art = 
getAddressAndRoutingType(msg.getAddressSimpleString(), routingType);
+      AddressInfo art = getAddressAndRoutingType(new 
AddressInfo(msg.getAddressSimpleString(), routingType));
 
       // Consumer
       // check the user has write access to this address.
       try {
-         securityCheck(art.getA(), CheckType.SEND, this);
+         securityCheck(art.getName(), CheckType.SEND, this);
       } catch (ActiveMQException e) {
          if (!autoCommitSends && tx != null) {
             tx.markAsRollbackOnly(e);
@@ -1695,8 +1722,8 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
       }
 
       try {
-         routingContext.setAddress(art.getA());
-         routingContext.setRoutingType(art.getB());
+         routingContext.setAddress(art.getName());
+         routingContext.setRoutingType(art.getRoutingType());
 
          result = postOffice.route(msg, routingContext, direct);
 
@@ -1738,12 +1765,11 @@ public class ServerSessionImpl implements 
ServerSession, FailureListener {
    }
 
    @Override
-   public Pair<SimpleString, RoutingType> 
getAddressAndRoutingType(SimpleString address,
-                                                                   RoutingType 
defaultRoutingType) {
+   public AddressInfo getAddressAndRoutingType(AddressInfo addressInfo) {
       if (prefixEnabled) {
-         return PrefixUtil.getAddressAndRoutingType(address, 
defaultRoutingType, prefixes);
+         return addressInfo.getAddressAndRoutingType(prefixes);
       }
-      return new Pair<>(address, defaultRoutingType);
+      return addressInfo;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index 81a8e84..8a7e009 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -233,17 +233,25 @@ public class ManagementServiceImpl implements 
ManagementService {
       unregisterFromJMX(objectName);
       unregisterFromRegistry(ResourceNames.ADDRESS + address);
    }
-   @Override
+
    public synchronized void registerQueue(final Queue queue,
-                                          final SimpleString address,
+                                          final AddressInfo addressInfo,
                                           final StorageManager storageManager) 
throws Exception {
-      QueueControlImpl queueControl = new QueueControlImpl(queue, 
address.toString(), postOffice, storageManager, securityStore, 
addressSettingsRepository);
+
+      if (addressInfo.isInternal()) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("won't register internal queue: " + queue);
+         }
+         return;
+      }
+
+      QueueControlImpl queueControl = new QueueControlImpl(queue, 
addressInfo.getName().toString(), postOffice, storageManager, securityStore, 
addressSettingsRepository);
       if (messageCounterManager != null) {
          MessageCounter counter = new 
MessageCounter(queue.getName().toString(), null, queue, false, 
queue.isDurable(), messageCounterManager.getMaxDayCount());
          queueControl.setMessageCounter(counter);
          
messageCounterManager.registerMessageCounter(queue.getName().toString(), 
counter);
       }
-      ObjectName objectName = objectNameBuilder.getQueueObjectName(address, 
queue.getName(), queue.getRoutingType());
+      ObjectName objectName = 
objectNameBuilder.getQueueObjectName(addressInfo.getName(), queue.getName(), 
queue.getRoutingType());
       registerInJMX(objectName, queueControl);
       registerInRegistry(ResourceNames.QUEUE + queue.getName(), queueControl);
 
@@ -251,6 +259,12 @@ public class ManagementServiceImpl implements 
ManagementService {
          logger.debug("registered queue " + objectName);
       }
    }
+   @Override
+   public synchronized void registerQueue(final Queue queue,
+                                          final SimpleString address,
+                                          final StorageManager storageManager) 
throws Exception {
+      registerQueue(queue, new AddressInfo(address), storageManager);
+   }
 
    @Override
    public synchronized void unregisterQueue(final SimpleString name, final 
SimpleString address, RoutingType routingType) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/HierarchicalRepository.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/HierarchicalRepository.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/HierarchicalRepository.java
index 1e1c8c4..cb2054a 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/HierarchicalRepository.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/HierarchicalRepository.java
@@ -23,6 +23,7 @@ import java.util.Set;
 /**
  * allows objects to be mapped against a regex pattern and held in order in a 
list
  */
+//tmp comment: Can we use AddressInfo as the 'match' key?
 public interface HierarchicalRepository<T> {
 
    void disableListeners();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/docs/user-manual/en/protocols-interoperability.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/protocols-interoperability.md 
b/docs/user-manual/en/protocols-interoperability.md
index 8755935..7f8b0f5 100644
--- a/docs/user-manual/en/protocols-interoperability.md
+++ b/docs/user-manual/en/protocols-interoperability.md
@@ -178,6 +178,33 @@ maxInactivityDurationInitalDelay. The shortest duration is 
taken for the connect
 
 More details please see [ActiveMQ 
InactivityMonitor](http://activemq.apache.org/activemq-inactivitymonitor.html).
 
+### Disable/Enable Advisories
+
+By default, advisory topics ([ActiveMQ 
Advisory](http://activemq.apache.org/advisory-message.html))
+are created in order to send certain type of advisory messages to listening 
clients. As a result,
+advisory addresses and queues will be displayed on the management console, 
along with user deployed
+addresses and queues. This sometimes cause confusion because the advisory 
objects are internally
+managed without user being aware of them. In addition, users may not want the 
advisory topics at all
+(they cause extra resources and performance penalty) and it is convenient to 
disable them at all
+from the broker side.
+
+The protocol provides two parameters to control advisory behaviors on the 
broker side.
+
+* supportAdvisory
+Whether or not the broker supports advisory messages. If the value is true, 
advisory addresses/
+queues will be created. If the value is false, no advisory addresses/queues 
are created. Default
+value is true. 
+
+* suppressInternalManagementObjects
+Whether or not the advisory addresses/queues, if any, will be registered to 
management service
+(e.g. JMX registry). If set to true, no advisory addresses/queues will be 
registered. If set to
+false, those are registered and will be displayed on the management console. 
Default value is
+true.
+
+The two parameters are configured on openwire acceptors, via URLs or API. For 
example:
+
+    <acceptor 
name="artemis">tcp://127.0.0.1:61616?protocols=CORE,AMQP,OPENWIRE;supportAdvisory=true;suppressInternalManagementObjects=false</acceptor>
+
 ## MQTT
 
 MQTT is a light weight, client to server, publish / subscribe messaging 
protocol.  MQTT has been specifically

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fe88e1c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/management/OpenWireManagementTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/management/OpenWireManagementTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/management/OpenWireManagementTest.java
index 3ff3b6b..6a31a48 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/management/OpenWireManagementTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/management/OpenWireManagementTest.java
@@ -17,24 +17,35 @@
 package org.apache.activemq.artemis.tests.integration.openwire.management;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.advisory.ConsumerEventSource;
 import org.apache.activemq.advisory.ProducerEventSource;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
 import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
 import org.apache.activemq.artemis.core.config.Configuration;
-import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.jms.client.ActiveMQSession;
 import 
org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
 import org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.Session;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
 
+@RunWith(Parameterized.class)
 public class OpenWireManagementTest extends OpenWireTestBase {
 
    private ActiveMQServerControl serverControl;
@@ -44,6 +55,27 @@ public class OpenWireManagementTest extends OpenWireTestBase 
{
 
    private ConnectionFactory factory;
 
+   @Parameterized.Parameters(name = 
"useDefault={0},supportAdvisory={1},suppressJmx={2}")
+   public static Iterable<Object[]> data() {
+      return Arrays.asList(new Object[][] {
+         {true, false, false},
+         {false, true, false},
+         {false, true, true},
+         {false, false, false},
+         {false, false, true}
+      });
+   }
+
+   private boolean useDefault;
+   private boolean supportAdvisory;
+   private boolean suppressJmx;
+
+   public OpenWireManagementTest(boolean useDefault, boolean supportAdvisory, 
boolean suppressJmx) {
+      this.useDefault = useDefault;
+      this.supportAdvisory = supportAdvisory;
+      this.suppressJmx = suppressJmx;
+   }
+
    @Before
    @Override
    public void setUp() throws Exception {
@@ -55,6 +87,19 @@ public class OpenWireManagementTest extends OpenWireTestBase 
{
    @Override
    protected void extraServerConfig(Configuration serverConfig) {
       serverConfig.setJMXManagementEnabled(true);
+      if (useDefault) {
+         //don't set parameters explicitly
+         return;
+      }
+      Set<TransportConfiguration> acceptorConfigs = 
serverConfig.getAcceptorConfigurations();
+      for (TransportConfiguration tconfig : acceptorConfigs) {
+         if ("netty".equals(tconfig.getName())) {
+            Map<String, Object> params = tconfig.getExtraParams();
+            params.put("supportAdvisory", supportAdvisory);
+            params.put("suppressInternalManagementObjects", suppressJmx);
+            System.out.println("Now use properties: " + params);
+         }
+      }
    }
 
    @Test
@@ -67,7 +112,7 @@ public class OpenWireManagementTest extends OpenWireTestBase 
{
       String[] addresses = serverControl.getAddressNames();
       assertEquals(3, addresses.length);
       for (String addr : addresses) {
-         assertFalse(addr.startsWith(AddressInfo.ADVISORY_TOPIC.toString()));
+         assertFalse(addr.startsWith(AdvisorySupport.ADVISORY_TOPIC_PREFIX));
       }
 
       try (Connection connection = factory.createConnection()) {
@@ -88,11 +133,61 @@ public class OpenWireManagementTest extends 
OpenWireTestBase {
          //after that point several advisory addresses are created.
          //make sure they are not accessible via management api.
          addresses = serverControl.getAddressNames();
+         boolean hasInternalAddress = false;
          for (String addr : addresses) {
-            
assertFalse(addr.startsWith(AddressInfo.ADVISORY_TOPIC.toString()));
+            hasInternalAddress = 
addr.startsWith(AdvisorySupport.ADVISORY_TOPIC_PREFIX);
+            if (hasInternalAddress) {
+               break;
+            }
          }
+         assertEquals(!useDefault && supportAdvisory && !suppressJmx, 
hasInternalAddress);
+
          consumerEventSource.stop();
          producerEventSource.stop();
       }
    }
+
+   @Test
+   public void testHiddenInternalQueue() throws Exception {
+
+      server.createQueue(queueName1, RoutingType.ANYCAST, queueName1, null, 
true, false, -1, false, true);
+
+      String[] queues = serverControl.getQueueNames();
+      assertEquals(1, queues.length);
+      for (String queue : queues) {
+         assertFalse(checkQueueFromInternalAddress(queue));
+      }
+
+      try (Connection connection = factory.createConnection()) {
+         connection.start();
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Destination destination = session.createQueue(queueName1.toString());
+
+         //this causes advisory queues to be created
+         session.createProducer(destination);
+
+         queues = serverControl.getQueueNames();
+         boolean hasInternal = false;
+         String targetQueue = null;
+         for (String queue : queues) {
+            hasInternal = checkQueueFromInternalAddress(queue);
+            if (hasInternal) {
+               targetQueue = queue;
+               break;
+            }
+         }
+         assertEquals("targetQueue: " + targetQueue, !useDefault && 
supportAdvisory && !suppressJmx, hasInternal);
+      }
+   }
+
+   private boolean checkQueueFromInternalAddress(String queue) throws 
JMSException, ActiveMQException {
+      try (Connection coreConn = coreCf.createConnection()) {
+         ActiveMQSession session = (ActiveMQSession) coreConn.createSession();
+         ClientSession coreSession = session.getCoreSession();
+         ClientSession.QueueQuery query = coreSession.queueQuery(new 
SimpleString(queue));
+         assertTrue("Queue doesn't exist: " + queue, query.isExists());
+         SimpleString qAddr = query.getAddress();
+         return 
qAddr.toString().startsWith(AdvisorySupport.ADVISORY_TOPIC_PREFIX);
+      }
+   }
 }

Reply via email to