Repository: activemq-artemis
Updated Branches:
  refs/heads/master 78016cbe6 -> 88b23994c


ARTEMIS-1872 Check for queue exists before creating shared queue

1. Add tests case to verify issue and fix, tests also tests for same behavior 
using CORE, OPENWIRE and AMQP JMS Clients.
2. Update Core Client to check for queue before creating, sharedQueue as per 
createQueue logic.
3. Update ServerSessionPacketHandler to handle packets from old clients to 
perform to implement the same fix server side for older clients.
4. Correct AMQP protocol so correct error code is returned on security 
exception so that amqp jms can correctly throw JMSsecurityException
5. Correct AMQP protocol to check for queue exists before create
6. Correct OpenWire protocol to check for address exists before create


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a9d9731f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a9d9731f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a9d9731f

Branch: refs/heads/master
Commit: a9d9731f0a644ff5bf6fb65583f1fabc832d9ae5
Parents: 78016cb
Author: Michael André Pearce <[email protected]>
Authored: Fri May 18 22:09:32 2018 +0100
Committer: Clebert Suconic <[email protected]>
Committed: Wed May 23 13:11:25 2018 -0400

----------------------------------------------------------------------
 .../core/protocol/core/impl/PacketImpl.java     |   2 +
 .../artemis/jms/client/ActiveMQSession.java     |  23 +-
 .../amqp/broker/AMQPSessionCallback.java        |  27 +-
 .../amqp/proton/ProtonServerSenderContext.java  |   6 +-
 .../protocol/openwire/OpenWireConnection.java   |   6 +-
 .../core/ServerSessionPacketHandler.java        |  11 +-
 .../server/SecureConfigurationTest.java         | 260 +++++++++++++++++++
 .../src/test/resources/multicast_topic.xml      | 146 +++++++++++
 8 files changed, 459 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a9d9731f/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index 684ca5c..c275e21 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -31,6 +31,8 @@ public class PacketImpl implements Packet {
 
    // 2.0.0
    public static final int ADDRESSING_CHANGE_VERSION = 129;
+   public static final int SHARED_QUEUE_SECURITY_FIX_CHANGE_VERSION = 130;
+
 
    public static final SimpleString OLD_QUEUE_PREFIX = new 
SimpleString("jms.queue.");
    public static final SimpleString OLD_TOPIC_PREFIX = new 
SimpleString("jms.topic.");

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a9d9731f/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
----------------------------------------------------------------------
diff --git 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
index 5f29211..3149ff0 100644
--- 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
+++ 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
@@ -46,6 +46,7 @@ import javax.transaction.xa.XAResource;
 import java.io.Serializable;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -630,16 +631,20 @@ public class ActiveMQSession implements QueueSession, 
TopicSession {
 
          queueName = 
ActiveMQDestination.createQueueNameForSubscription(durability == 
ConsumerDurability.DURABLE, connection.getClientID(), subscriptionName);
 
-         try {
-            if (durability == ConsumerDurability.DURABLE) {
-               createSharedQueue(dest, RoutingType.MULTICAST, queueName, 
coreFilterString, true, response.getDefaultMaxConsumers(), 
response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), 
response.isDefaultLastValueQueue());
-            } else {
-               createSharedQueue(dest, RoutingType.MULTICAST, queueName, 
coreFilterString, false, response.getDefaultMaxConsumers(), 
response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), 
response.isDefaultLastValueQueue());
+         QueueQuery subResponse = session.queueQuery(queueName);
+
+         if (!(subResponse.isExists() && 
Objects.equals(subResponse.getAddress(), dest.getSimpleAddress()) && 
Objects.equals(subResponse.getFilterString(), coreFilterString))) {
+            try {
+               if (durability == ConsumerDurability.DURABLE) {
+                  createSharedQueue(dest, RoutingType.MULTICAST, queueName, 
coreFilterString, true, response.getDefaultMaxConsumers(), 
response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), 
response.isDefaultLastValueQueue());
+               } else {
+                  createSharedQueue(dest, RoutingType.MULTICAST, queueName, 
coreFilterString, false, response.getDefaultMaxConsumers(), 
response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), 
response.isDefaultLastValueQueue());
+               }
+            } catch (ActiveMQQueueExistsException ignored) {
+               // We ignore this because querying and then creating the queue 
wouldn't be idempotent
+               // we could also add a parameter to ignore existence what would 
require a bigger work around to avoid
+               // compatibility.
             }
-         } catch (ActiveMQQueueExistsException ignored) {
-            // We ignore this because querying and then creating the queue 
wouldn't be idempotent
-            // we could also add a parameter to ignore existence what would 
require a bigger work around to avoid
-            // compatibility.
          }
 
          consumer = session.createConsumer(queueName, null, false);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a9d9731f/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 105d58a..1301f0b 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
+import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -237,35 +238,51 @@ public class AMQPSessionCallback implements 
SessionCallback {
    }
 
    public void createTemporaryQueue(SimpleString queueName, RoutingType 
routingType) throws Exception {
-      serverSession.createQueue(queueName, queueName, routingType, null, true, 
false);
+      createTemporaryQueue(queueName, queueName, routingType, null);
    }
 
    public void createTemporaryQueue(SimpleString address,
                                     SimpleString queueName,
                                     RoutingType routingType,
                                     SimpleString filter) throws Exception {
-      serverSession.createQueue(address, queueName, routingType, filter, true, 
false);
+      try {
+         serverSession.createQueue(address, queueName, routingType, filter, 
true, false);
+      } catch (ActiveMQSecurityException se) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(se.getMessage());
+      }
    }
 
    public void createUnsharedDurableQueue(SimpleString address,
                                           RoutingType routingType,
                                           SimpleString queueName,
                                           SimpleString filter) throws 
Exception {
-      serverSession.createQueue(address, queueName, routingType, filter, 
false, true, 1, false, false);
+      try {
+         serverSession.createQueue(address, queueName, routingType, filter, 
false, true, 1, false, false);
+      } catch (ActiveMQSecurityException se) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(se.getMessage());
+      }
    }
 
    public void createSharedDurableQueue(SimpleString address,
                                         RoutingType routingType,
                                         SimpleString queueName,
                                         SimpleString filter) throws Exception {
-      serverSession.createQueue(address, queueName, routingType, filter, 
false, true, -1, false, false);
+      try {
+         serverSession.createQueue(address, queueName, routingType, filter, 
false, true, -1, false, false);
+      } catch (ActiveMQSecurityException se) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(se.getMessage());
+      }
    }
 
    public void createSharedVolatileQueue(SimpleString address,
                                          RoutingType routingType,
                                          SimpleString queueName,
                                          SimpleString filter) throws Exception 
{
-      serverSession.createQueue(address, queueName, routingType, filter, 
false, false, -1, true, true);
+      try {
+         serverSession.createQueue(address, queueName, routingType, filter, 
false, false, -1, true, true);
+      } catch (ActiveMQSecurityException se) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(se.getMessage());
+      }
    }
 
    public QueueQueryResult queueQuery(SimpleString queueName, RoutingType 
routingType, boolean autoCreate) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a9d9731f/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 9b4704f..ddd9b39 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -23,7 +23,6 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
-import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RoutingType;
@@ -370,10 +369,9 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
                isVolatile = true;
                if (shared && sender.getName() != null) {
                   queue = 
createQueueName(connection.isUseCoreSubscriptionNaming(), getClientId(), 
sender.getName(), shared, global, isVolatile);
-                  try {
+                  QueueQueryResult result = sessionSPI.queueQuery(queue, 
routingTypeToUse, false);
+                  if (!(result.isExists() && 
Objects.equals(result.getAddress(), addressToUse) && 
Objects.equals(result.getFilterString(), simpleStringSelector))) {
                      sessionSPI.createSharedVolatileQueue(addressToUse, 
RoutingType.MULTICAST, queue, simpleStringSelector);
-                  } catch (ActiveMQQueueExistsException e) {
-                     //this is ok, just means its shared
                   }
                } else {
                   queue = 
SimpleString.toSimpleString(java.util.UUID.randomUUID().toString());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a9d9731f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 21b2d46..225aac4 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -812,8 +812,10 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
             }
          } else if (dest.isTopic() && (addressSettings.isAutoCreateAddresses() 
|| dest.isTemporary())) {
             try {
-               internalSession.createAddress(addressInfo, !dest.isTemporary());
-               created = true;
+               if (internalSession.getAddress(addressInfo.getName()) == null) {
+                  internalSession.createAddress(addressInfo, 
!dest.isTemporary());
+                  created = true;
+               }
             } catch (ActiveMQAddressExistsException exists) {
                // The address may have been created by another thread in the 
mean time.  Catch and do nothing.
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a9d9731f/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index edfd566..36273f8 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 import java.util.List;
+import java.util.Objects;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -362,7 +363,10 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
                case CREATE_SHARED_QUEUE: {
                   CreateSharedQueueMessage request = 
(CreateSharedQueueMessage) packet;
                   requiresResponse = request.isRequiresResponse();
-                  session.createSharedQueue(request.getAddress(), 
request.getQueueName(), request.isDurable(), request.getFilterString());
+                  QueueQueryResult result = 
session.executeQueueQuery(request.getQueueName());
+                  if (!(result.isExists() && 
Objects.equals(result.getAddress(), request.getAddress()) && 
Objects.equals(result.getFilterString(), request.getFilterString()))) {
+                     session.createSharedQueue(request.getAddress(), 
request.getQueueName(), request.isDurable(), request.getFilterString());
+                  }
                   if (requiresResponse) {
                      response = new NullResponseMessage();
                   }
@@ -371,7 +375,10 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
                case CREATE_SHARED_QUEUE_V2: {
                   CreateSharedQueueMessage_V2 request = 
(CreateSharedQueueMessage_V2) packet;
                   requiresResponse = request.isRequiresResponse();
-                  session.createSharedQueue(request.getAddress(), 
request.getQueueName(), request.getRoutingType(), request.getFilterString(), 
request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(), 
request.isExclusive(), request.isLastValue());
+                  QueueQueryResult result = 
session.executeQueueQuery(request.getQueueName());
+                  if (!(result.isExists() && 
Objects.equals(result.getAddress(), request.getAddress()) && 
Objects.equals(result.getFilterString(), request.getFilterString()))) {
+                     session.createSharedQueue(request.getAddress(), 
request.getQueueName(), request.getRoutingType(), request.getFilterString(), 
request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(), 
request.isExclusive(), request.isLastValue());
+                  }
                   if (requiresResponse) {
                      response = new NullResponseMessage();
                   }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a9d9731f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/SecureConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/SecureConfigurationTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/SecureConfigurationTest.java
new file mode 100644
index 0000000..fd0a12e
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/SecureConfigurationTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.server;
+
+import org.apache.activemq.artemis.core.config.FileDeploymentManager;
+import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
+import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.jms.server.config.impl.FileJMSConfiguration;
+import 
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.JMSSecurityException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.util.Arrays;
+import java.util.Collection;
+
+@RunWith(Parameterized.class)
+public class SecureConfigurationTest extends ActiveMQTestBase {
+
+   @Parameterized.Parameters(name = "{index}: protocol={0}")
+   public static Collection<Object[]> parameters() {
+      return Arrays.asList(new Object[][] {
+         {"CORE"}, {"AMQP"}, {"OPENWIRE"}
+      });
+   }
+
+   /* NOT private @see 
https://github.com/junit-team/junit4/wiki/parameterized-tests */
+   @Parameterized.Parameter(0)
+   public String protocol;
+
+   @Test
+   public void testSecureSharedDurableSubscriber() throws Exception {
+      //This is because OpenWire does not support JMS 2.0
+      Assume.assumeFalse(protocol.equals("OPENWIRE"));
+
+      ActiveMQServer server = getActiveMQServer("multicast_topic.xml");
+      try {
+         server.start();
+         internal_testSecureSharedDurableSubscriber(getConnectionFactory("b", 
"b"));
+      } finally {
+         try {
+            server.stop();
+         } catch (Exception e) {
+         }
+      }
+   }
+
+   private void internal_testSecureSharedDurableSubscriber(ConnectionFactory 
connectionFactory) throws JMSException {
+      String message = "blah";
+
+      //Expect to be able to create subscriber on pre-defined/existing queue.
+      String messageRecieved = sendAndReceiveText(connectionFactory, null, 
message, "secured_topic_shared_durable", (t, s) -> 
s.createSharedDurableConsumer(t, "secured_topic_shared_durable/queue"));
+      Assert.assertEquals(message, messageRecieved);
+
+      try {
+         sendAndReceiveText(connectionFactory, null, message, 
"secured_topic_shared_durable", (t, s) -> s.createSharedDurableConsumer(t, 
"secured_topic_shared_durable/non-existant-queue"));
+         Assert.fail("Security exception expected, but did not occur, 
excepetion expected as not permissioned to dynamically create queue");
+      } catch (JMSSecurityException j) {
+         //Expected exception
+      }
+
+      try {
+         sendAndReceiveText(connectionFactory, null, message, 
"secured_topic_shared_durable", (t, s) -> s.createSharedDurableConsumer(t, 
"secured_topic_shared_durable/queue", "age < 10"));
+         Assert.fail("Security exception expected, but did not occur, 
excepetion expected as not permissioned to dynamically create queue");
+      } catch (JMSSecurityException j) {
+         //Expected exception
+      }
+   }
+
+   @Test
+   public void testSecureSharedSubscriber() throws Exception {
+      //This is because OpenWire does not support JMS 2.0
+      Assume.assumeFalse(protocol.equals("OPENWIRE"));
+
+      ActiveMQServer server = getActiveMQServer("multicast_topic.xml");
+      try {
+         server.start();
+         internal_testSecureSharedSubscriber(getConnectionFactory("b", "b"));
+      } finally {
+         try {
+            server.stop();
+         } catch (Exception e) {
+         }
+      }
+   }
+
+   private void internal_testSecureSharedSubscriber(ConnectionFactory 
connectionFactory) throws JMSException {
+      String message = "blah";
+
+      //Expect to be able to create subscriber on pre-defined/existing queue.
+      String messageRecieved = sendAndReceiveText(connectionFactory, null, 
message, "secured_topic_shared", (t, s) -> s.createSharedConsumer(t, 
"secured_topic_shared/queue"));
+      Assert.assertEquals(message, messageRecieved);
+
+      try {
+         sendAndReceiveText(connectionFactory, null, message, 
"secured_topic_shared", (t, s) -> s.createSharedConsumer(t, 
"secured_topic_shared/non-existant-queue"));
+         Assert.fail("Security exception expected, but did not occur, 
excepetion expected as not permissioned to dynamically create queue");
+      } catch (JMSSecurityException j) {
+         //Expected exception
+      }
+
+      try {
+         sendAndReceiveText(connectionFactory, null, message, 
"secured_topic_shared", (t, s) -> s.createSharedConsumer(t, 
"secured_topic_shared/queue", "age < 10"));
+         Assert.fail("Security exception expected, but did not occur, 
excepetion expected as not permissioned to dynamically create queue");
+      } catch (JMSSecurityException j) {
+         //Expected exception
+      }
+   }
+
+   @Test
+   public void testSecureDurableSubscriber() throws Exception {
+      ActiveMQServer server = getActiveMQServer("multicast_topic.xml");
+      try {
+         server.start();
+         internal_testSecureDurableSubscriber(getConnectionFactory("b", "b"));
+      } finally {
+         try {
+            server.stop();
+         } catch (Exception e) {
+         }
+      }
+   }
+
+   private void internal_testSecureDurableSubscriber(ConnectionFactory 
connectionFactory) throws JMSException {
+      String message = "blah";
+
+      //Expect to be able to create subscriber on pre-defined/existing queue.
+      String messageRecieved = sendAndReceiveText(connectionFactory, 
"clientId", message, "secured_topic_durable", (t, s) -> 
s.createDurableSubscriber(t, "secured_topic_durable/queue"));
+      Assert.assertEquals(message, messageRecieved);
+
+      try {
+         sendAndReceiveText(connectionFactory, "clientId", message, 
"secured_topic_durable", (t, s) -> s.createDurableSubscriber(t, 
"secured_topic_durable/non-existant-queue"));
+         Assert.fail("Security exception expected, but did not occur, 
excepetion expected as not permissioned to dynamically create queue");
+      } catch (JMSSecurityException j) {
+         //Expected exception
+      }
+
+      try {
+         sendAndReceiveText(connectionFactory, "clientId", message, 
"secured_topic_durable", (t, s) -> s.createDurableSubscriber(t, 
"secured_topic_durable/queue", "age < 10", false));
+         Assert.fail("Security exception expected, but did not occur, 
excepetion expected as not permissioned to dynamically create queue");
+      } catch (JMSSecurityException j) {
+         //Expected exception
+      }
+
+      try {
+         sendAndReceiveText(connectionFactory, "clientId", message, 
"secured_topic_durable", (t, s) -> s.createDurableSubscriber(t, 
"secured_topic_durable/queue", "age < 10", true));
+         Assert.fail("Security exception expected, but did not occur, 
excepetion expected as not permissioned to dynamically create queue");
+      } catch (JMSSecurityException j) {
+         //Expected exception
+      }
+   }
+
+   private ConnectionFactory getConnectionFactory(String user, String 
password) {
+      switch (protocol) {
+         case "CORE": return getActiveMQConnectionFactory(user, password);
+         case "AMQP" : return getAMQPConnectionFactory(user, password);
+         case "OPENWIRE": return getOpenWireConnectionFactory(user, password);
+         default: throw new IllegalStateException("Unsupported Protocol");
+      }
+   }
+
+   private ActiveMQConnectionFactory getActiveMQConnectionFactory(String user, 
String password) {
+      ActiveMQConnectionFactory activeMQConnection = new 
ActiveMQConnectionFactory("tcp://localhost:61616");
+      activeMQConnection.setUser(user);
+      activeMQConnection.setPassword(password);
+      return activeMQConnection;
+   }
+
+   private JmsConnectionFactory getAMQPConnectionFactory(String user, String 
password) {
+      JmsConnectionFactory jmsConnectionFactory = new 
JmsConnectionFactory("amqp://localhost:61616");
+      jmsConnectionFactory.setUsername(user);
+      jmsConnectionFactory.setPassword(password);
+      return jmsConnectionFactory;
+   }
+
+   private org.apache.activemq.ActiveMQConnectionFactory 
getOpenWireConnectionFactory(String user, String password) {
+      org.apache.activemq.ActiveMQConnectionFactory activeMQConnectionFactory 
= new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616");
+      activeMQConnectionFactory.setUserName(user);
+      activeMQConnectionFactory.setPassword(password);
+      return activeMQConnectionFactory;
+   }
+
+   private String sendAndReceiveText(ConnectionFactory connectionFactory, 
String clientId, String message, String topicName, ConsumerSupplier 
consumerSupplier) throws JMSException {
+      String messageRecieved;
+      try (Connection connection = connectionFactory.createConnection()) {
+         if (clientId != null && !clientId.isEmpty()) {
+            connection.setClientID(clientId);
+         }
+         connection.start();
+         try (Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE)) {
+            Topic topic = session.createTopic(topicName);
+            MessageConsumer messageConsumer = consumerSupplier.create(topic, 
session);
+            messageConsumer.receive(1000);
+
+            TextMessage messageToSend = session.createTextMessage(message);
+            session.createProducer(topic).send(messageToSend);
+
+            TextMessage received = (TextMessage) messageConsumer.receive(1000);
+            messageRecieved = received != null ? received.getText() : null;
+         }
+      }
+      return messageRecieved;
+   }
+
+   protected ActiveMQServer getActiveMQServer(String brokerConfig) throws 
Exception {
+      FileConfiguration fc = new FileConfiguration();
+      FileJMSConfiguration fileConfiguration = new FileJMSConfiguration();
+      FileDeploymentManager deploymentManager = new 
FileDeploymentManager(brokerConfig);
+      deploymentManager.addDeployable(fc);
+      deploymentManager.addDeployable(fileConfiguration);
+      deploymentManager.readConfiguration();
+
+
+      SecurityConfiguration securityConfiguration = new 
SecurityConfiguration();
+      securityConfiguration.addUser("a", "a");
+      securityConfiguration.addRole("a", "a");
+
+      securityConfiguration.addUser("b", "b");
+      securityConfiguration.addRole("b", "b");
+
+
+      ActiveMQJAASSecurityManager sm = new 
ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), 
securityConfiguration);
+
+      return addServer(new ActiveMQServerImpl(fc, sm));
+   }
+
+   private interface ConsumerSupplier {
+      MessageConsumer create(Topic topic, Session session) throws JMSException;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a9d9731f/tests/integration-tests/src/test/resources/multicast_topic.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/resources/multicast_topic.xml 
b/tests/integration-tests/src/test/resources/multicast_topic.xml
new file mode 100644
index 0000000..cf5430e
--- /dev/null
+++ b/tests/integration-tests/src/test/resources/multicast_topic.xml
@@ -0,0 +1,146 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xsi:schemaLocation="urn:activemq 
/schema/artemis-configuration.xsd">
+
+
+   <core xmlns="urn:activemq:core">
+
+      <name>0.0.0.0</name>
+
+      
<configuration-file-refresh-period>100</configuration-file-refresh-period>
+
+      <persistence-enabled>false</persistence-enabled>
+
+      <security-enabled>true</security-enabled>
+
+      <!-- this could be ASYNCIO or NIO
+       -->
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./data/paging</paging-directory>
+
+      <bindings-directory>./data/bindings</bindings-directory>
+
+      <journal-directory>./data/journal</journal-directory>
+
+      
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+      <journal-min-files>2</journal-min-files>
+
+      <journal-pool-files>-1</journal-pool-files>
+
+      
<amqp-use-core-subscription-naming>true</amqp-use-core-subscription-naming>
+
+      <!--
+       This value was determined through a calculation.
+       Your system could perform 25 writes per millisecond
+       on the current journal configuration.
+       That translates as a sync write every 40000 nanoseconds
+      -->
+      <journal-buffer-timeout>40000</journal-buffer-timeout>
+
+      <addresses>
+         <address name="secured_topic_shared_durable">
+            <multicast>
+               <queue name="secured_topic_shared_durable/queue" />
+            </multicast>
+         </address>
+
+         <address name="secured_topic_shared">
+            <multicast>
+               <queue name="nonDurable.secured_topic_shared/queue" 
purge-on-no-consumers="true" />
+            </multicast>
+         </address>
+
+         <address name="secured_topic_durable">
+            <multicast>
+               <queue name="clientId.secured_topic_durable/queue" />
+            </multicast>
+         </address>
+      </addresses>
+
+      <acceptors>
+         <acceptor name="netty-acceptor">tcp://localhost:61616</acceptor>
+      </acceptors>
+
+      <security-settings>
+         <security-setting match="#">
+            <permission type="createNonDurableQueue" roles="a,b"/>
+            <permission type="deleteNonDurableQueue" roles="a,b"/>
+            <permission type="createDurableQueue" roles="a,b"/>
+            <permission type="deleteDurableQueue" roles="a,b"/>
+            <permission type="browse" roles="a"/>
+            <permission type="send" roles="a,b"/>
+            <permission type="consume" roles="a,b" />
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="a"/>
+         </security-setting>
+         <security-setting match="secured_topic_shared_durable">
+            <permission type="createNonDurableQueue" roles="a"/>
+            <permission type="deleteNonDurableQueue" roles="a"/>
+            <permission type="createDurableQueue" roles="a"/>
+            <permission type="deleteDurableQueue" roles="a"/>
+            <permission type="browse" roles="a"/>
+            <permission type="send" roles="a,b"/>
+            <permission type="consume" roles="a,b" />
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="a"/>
+         </security-setting>
+         <security-setting match="secured_topic_shared">
+            <permission type="createNonDurableQueue" roles="a"/>
+            <permission type="deleteNonDurableQueue" roles="a"/>
+            <permission type="createDurableQueue" roles="a"/>
+            <permission type="deleteDurableQueue" roles="a"/>
+            <permission type="browse" roles="a"/>
+            <permission type="send" roles="a,b"/>
+            <permission type="consume" roles="a,b" />
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="a"/>
+         </security-setting>
+         <security-setting match="secured_topic_durable">
+            <permission type="createNonDurableQueue" roles="a"/>
+            <permission type="deleteNonDurableQueue" roles="a"/>
+            <permission type="createDurableQueue" roles="a"/>
+            <permission type="deleteDurableQueue" roles="a"/>
+            <permission type="browse" roles="a"/>
+            <permission type="send" roles="a,b"/>
+            <permission type="consume" roles="a,b" />
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="a"/>
+         </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!--default for catch all-->
+         <address-setting match="#">
+            <auto-create-queues>false</auto-create-queues>
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <max-size-bytes>10Mb</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>BLOCK</address-full-policy>
+         </address-setting>
+      </address-settings>
+   </core>
+</configuration>

Reply via email to