Repository: activemq Updated Branches: refs/heads/trunk 89e876797 -> 94b404d0a
Fixed AMQ-5160, allowed wildcard subscriptions for future destinations, added tests for wildcard authorization, fixed consumer and producer AdvisoryTopic names for composite destinations by replacing ',' with '‚' Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/94b404d0 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/94b404d0 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/94b404d0 Branch: refs/heads/trunk Commit: 94b404d0aba15b6963e66f18ae31393cb64bfe79 Parents: a38a7c0 Author: Dhiraj Bokde <[email protected]> Authored: Thu May 1 19:13:01 2014 -0700 Committer: Dejan Bosanac <[email protected]> Committed: Mon May 5 10:06:06 2014 +0200 ---------------------------------------------------------------------- .../activemq/broker/region/AbstractRegion.java | 27 +++++-- .../broker/region/AbstractSubscription.java | 7 +- .../activemq/broker/region/Subscription.java | 6 ++ .../activemq/security/AuthorizationBroker.java | 2 +- .../activemq/advisory/AdvisorySupport.java | 16 +++- .../filter/CompositeDestinationFilter.java | 8 +- .../activemq/filter/DestinationFilter.java | 1 + .../org/apache/activemq/AuthorizationTest.java | 85 +++++++++++++++++++- .../authorizationTest-wildcard-users-guests.xml | 57 +++++++++++++ .../region/QueueDuplicatesFromStoreTest.java | 5 ++ .../region/SubscriptionAddRemoveQueueTest.java | 8 +- 11 files changed, 207 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/94b404d0/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index 9760501..e443d53 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -168,8 +168,13 @@ public abstract class AbstractRegion implements Region { try { dest.addSubscription(context, sub); rc.add(sub); - } catch (Exception e) { - LOG.error("Subscription error for " + sub + ": " + e.getMessage(), e); + } catch (SecurityException e) { + if (sub.isWildcard()) { + LOG.debug("Subscription denied for " + sub + " to destination " + + dest.getActiveMQDestination() + ": " + e.getMessage()); + } else { + throw e; + } } } } @@ -318,10 +323,20 @@ public abstract class AbstractRegion implements Region { try { dest.addSubscription(context, sub); removeList.add(dest); - } finally { - // remove subscriptions added earlier - for (Destination remove : removeList) { - remove.removeSubscription(context, sub, info.getLastDeliveredSequenceId()); + } catch (SecurityException e){ + if (sub.isWildcard()) { + LOG.debug("Subscription denied for " + sub + " to destination " + + dest.getActiveMQDestination() + ": " + e.getMessage()); + } else { + // remove partial subscriptions + for (Destination remove : removeList) { + try { + remove.removeSubscription(context, sub, info.getLastDeliveredSequenceId()); + } catch (Exception ex) { + LOG.error("Error unsubscribing " + sub + " from " + remove + ": " + ex.getMessage(), ex); + } + } + throw e; } } } http://git-wip-us.apache.org/repos/asf/activemq/blob/94b404d0/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java index 3a2e2ee..1ed2fae 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java @@ -21,10 +21,10 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; - import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.management.ObjectName; + import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; @@ -109,6 +109,11 @@ public abstract class AbstractSubscription implements Subscription { } @Override + public boolean isWildcard() { + return destinationFilter.isWildcard(); + } + + @Override public boolean matches(ActiveMQDestination destination) { return destinationFilter.matches(destination); } http://git-wip-us.apache.org/repos/asf/activemq/blob/94b404d0/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java index a2c4502..ee0e218 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java @@ -57,6 +57,12 @@ public interface Subscription extends SubscriptionRecovery { Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception; /** + * Returns true if this subscription is a Wildcard subscription. + * @return true if wildcard subscription. + */ + boolean isWildcard(); + + /** * Is the subscription interested in the message? * @param node * @param context http://git-wip-us.apache.org/repos/asf/activemq/blob/94b404d0/activemq-broker/src/main/java/org/apache/activemq/security/AuthorizationBroker.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/security/AuthorizationBroker.java b/activemq-broker/src/main/java/org/apache/activemq/security/AuthorizationBroker.java index 7b254e4..db482ba 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/security/AuthorizationBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/security/AuthorizationBroker.java @@ -67,7 +67,7 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB authorizationMap = map; } - public SecurityContext checkSecurityContext(ConnectionContext context) throws SecurityException { + protected SecurityContext checkSecurityContext(ConnectionContext context) throws SecurityException { final SecurityContext securityContext = context.getSecurityContext(); if (securityContext == null) { throw new SecurityException("User is not authenticated."); http://git-wip-us.apache.org/repos/asf/activemq/blob/94b404d0/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java b/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java index fb37f76..bcf1a07 100755 --- a/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java +++ b/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java @@ -105,11 +105,13 @@ public final class AdvisorySupport { } public static ActiveMQTopic getConsumerAdvisoryTopic(ActiveMQDestination destination) { + String prefix; if (destination.isQueue()) { - return new ActiveMQTopic(QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName()); + prefix = QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX; } else { - return new ActiveMQTopic(TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName()); + prefix = TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX; } + return getAdvisoryTopic(destination, prefix, true); } public static ActiveMQTopic getProducerAdvisoryTopic(Destination destination) throws JMSException { @@ -117,11 +119,17 @@ public final class AdvisorySupport { } public static ActiveMQTopic getProducerAdvisoryTopic(ActiveMQDestination destination) { + String prefix; if (destination.isQueue()) { - return new ActiveMQTopic(QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName()); + prefix = QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX; } else { - return new ActiveMQTopic(TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName()); + prefix = TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX; } + return getAdvisoryTopic(destination, prefix, false); + } + + private static ActiveMQTopic getAdvisoryTopic(ActiveMQDestination destination, String prefix, boolean consumerTopics) { + return new ActiveMQTopic(prefix + destination.getPhysicalName().replaceAll(",", "‚")); } public static ActiveMQTopic getExpiredMessageTopic(Destination destination) throws JMSException { http://git-wip-us.apache.org/repos/asf/activemq/blob/94b404d0/activemq-client/src/main/java/org/apache/activemq/filter/CompositeDestinationFilter.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/filter/CompositeDestinationFilter.java b/activemq-client/src/main/java/org/apache/activemq/filter/CompositeDestinationFilter.java index 43ba696..20827c2 100755 --- a/activemq-client/src/main/java/org/apache/activemq/filter/CompositeDestinationFilter.java +++ b/activemq-client/src/main/java/org/apache/activemq/filter/CompositeDestinationFilter.java @@ -46,6 +46,12 @@ public class CompositeDestinationFilter extends DestinationFilter { } public boolean isWildcard() { - return true; + for (DestinationFilter filter : filters) { + if (filter.isWildcard()) { + return true; + } + } + return false; } + } http://git-wip-us.apache.org/repos/asf/activemq/blob/94b404d0/activemq-client/src/main/java/org/apache/activemq/filter/DestinationFilter.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationFilter.java b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationFilter.java index 0424524..fc1587c 100755 --- a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationFilter.java +++ b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationFilter.java @@ -72,4 +72,5 @@ public abstract class DestinationFilter implements BooleanExpression { return new SimpleDestinationFilter(destination); } + public abstract boolean isWildcard(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/94b404d0/activemq-runtime-config/src/test/java/org/apache/activemq/AuthorizationTest.java ---------------------------------------------------------------------- diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/AuthorizationTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/AuthorizationTest.java index 33e762b..4cd92ac 100644 --- a/activemq-runtime-config/src/test/java/org/apache/activemq/AuthorizationTest.java +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/AuthorizationTest.java @@ -17,14 +17,19 @@ package org.apache.activemq; import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; import javax.jms.Session; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import org.junit.Test; public class AuthorizationTest extends RuntimeConfigTestSupport { + private static final int RECEIVE_TIMEOUT = 1000; String configurationSeed = "authorizationTest"; @Test @@ -44,7 +49,6 @@ public class AuthorizationTest extends RuntimeConfigTestSupport { assertAllowed("user", "USERS.A"); assertAllowed("guest", "GUESTS.A"); assertDenied("user", "GUESTS.A"); - assertDenied("user", ">"); assertAllowedTemp("guest"); } @@ -68,6 +72,85 @@ public class AuthorizationTest extends RuntimeConfigTestSupport { assertDeniedTemp("guest"); } + @Test + public void testWildcard() throws Exception { + final String brokerConfig = configurationSeed + "-auth-broker"; + applyNewConfig(brokerConfig, configurationSeed + "-wildcard-users-guests"); + startBroker(brokerConfig); + assertTrue("broker alive", brokerService.isStarted()); + + final String ALL_USERS = "ALL.USERS"; + final String ALL_GUESTS = "ALL.GUESTS"; + + assertAllowed("user", ALL_USERS); + assertAllowed("guest", ALL_GUESTS); + assertDenied("user", ALL_USERS + "," + ALL_GUESTS); + assertDenied("guest", ALL_GUESTS + "," + ALL_USERS); + + final String ALL_PREFIX = "ALL.>"; + final String ALL_WILDCARD = "ALL.*"; + + assertAllowed("user", ALL_PREFIX); + assertAllowed("user", ALL_WILDCARD); + assertAllowed("guest", ALL_PREFIX); + assertAllowed("guest", ALL_WILDCARD); + + assertAllowed("user", "ALL.USERS,ALL.>"); + assertAllowed("guest", "ALL.GUESTS,ALL.*"); + assertDenied("user", "ALL.GUESTS,ALL.>"); + assertDenied("guest", "ALL.USERS,ALL.*"); + + assertDenied("user", "ALL.USERS,ALL.GUESTS.>"); + assertDenied("guest", "ALL.GUESTS,ALL.USERS.*"); + assertDenied("user", "ALL.USERS.*,ALL.GUESTS.>"); + assertDenied("guest", "ALL.GUESTS.>,ALL.USERS.*"); + + // subscribe to wildcards and check whether messages are actually filtered + final ActiveMQConnection userConn = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection("user", "user"); + final ActiveMQConnection guestConn = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection("guest", "guest"); + userConn.start(); + guestConn.start(); + try { + final Session userSession = userConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Session guestSession = guestConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageProducer userProducer = userSession.createProducer(null); + final MessageProducer guestProducer = guestSession.createProducer(null); + + // test prefix filter + MessageConsumer userConsumer = userSession.createConsumer(userSession.createQueue(ALL_PREFIX)); + MessageConsumer guestConsumer = guestSession.createConsumer(userSession.createQueue(ALL_PREFIX)); + + userProducer.send(userSession.createQueue(ALL_USERS), userSession.createTextMessage(ALL_USERS)); + assertNotNull(userConsumer.receive(RECEIVE_TIMEOUT)); + assertNull(guestConsumer.receive(RECEIVE_TIMEOUT)); + + guestProducer.send(guestSession.createQueue(ALL_GUESTS), guestSession.createTextMessage(ALL_GUESTS)); + assertNotNull(guestConsumer.receive(RECEIVE_TIMEOUT)); + assertNull(userConsumer.receive(RECEIVE_TIMEOUT)); + + userConsumer.close(); + guestConsumer.close(); + + // test wildcard filter + userConsumer = userSession.createConsumer(userSession.createQueue(ALL_WILDCARD)); + guestConsumer = guestSession.createConsumer(userSession.createQueue(ALL_WILDCARD)); + + userProducer.send(userSession.createQueue(ALL_USERS), userSession.createTextMessage(ALL_USERS)); + assertNotNull(userConsumer.receive(RECEIVE_TIMEOUT)); + assertNull(guestConsumer.receive(RECEIVE_TIMEOUT)); + + guestProducer.send(guestSession.createQueue(ALL_GUESTS), guestSession.createTextMessage(ALL_GUESTS)); + assertNotNull(guestConsumer.receive(RECEIVE_TIMEOUT)); + assertNull(userConsumer.receive(RECEIVE_TIMEOUT)); + + } finally { + userConn.close(); + guestConn.close(); + } + + assertAllowedTemp("guest"); + } + private void assertDeniedTemp(String userPass) { try { assertAllowedTemp(userPass); http://git-wip-us.apache.org/repos/asf/activemq/blob/94b404d0/activemq-runtime-config/src/test/resources/org/apache/activemq/authorizationTest-wildcard-users-guests.xml ---------------------------------------------------------------------- diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/authorizationTest-wildcard-users-guests.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/authorizationTest-wildcard-users-guests.xml new file mode 100644 index 0000000..667cf7f --- /dev/null +++ b/activemq-runtime-config/src/test/resources/org/apache/activemq/authorizationTest-wildcard-users-guests.xml @@ -0,0 +1,57 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<beans + xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd + http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> + + <broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false"> + <plugins> + <runtimeConfigurationPlugin checkPeriod="1000"/> + + <!-- use JAAS to authenticate using the login.config file on the classpath to configure JAAS --> + <jaasAuthenticationPlugin configuration="activemq-domain"/> + + <!-- lets configure a destination based authorization mechanism --> + <authorizationPlugin> + <map> + <authorizationMap> + <authorizationEntries> + <authorizationEntry queue=">" read="admins" write="admins" admin="admins"/> + <authorizationEntry queue="ALL.USERS.>" read="users" write="users" admin="users"/> + <authorizationEntry queue="ALL.GUESTS.>" read="guests" write="guests,users" admin="guests,users"/> + + <authorizationEntry topic=">" read="admins" write="admins" admin="admins"/> + <authorizationEntry topic="ALL.USERS.>" read="users" write="users" admin="users"/> + <authorizationEntry topic="ALL.GUESTS.>" read="guests" write="guests,users" admin="guests,users"/> + + <authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" + admin="guests,users"/> + </authorizationEntries> + + <tempDestinationAuthorizationEntry> + <tempDestinationAuthorizationEntry read="tempDestinationAdmins,guests" write="tempDestinationAdmins,guests" + admin="tempDestinationAdmins,guests"/> + </tempDestinationAuthorizationEntry> + </authorizationMap> + </map> + </authorizationPlugin> + </plugins> + </broker> +</beans> http://git-wip-us.apache.org/repos/asf/activemq/blob/94b404d0/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java index e9c6664..f69c380 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java @@ -282,6 +282,11 @@ public class QueueDuplicatesFromStoreTest extends TestCase { } @Override + public boolean isWildcard() { + return false; + } + + @Override public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { return null; http://git-wip-us.apache.org/repos/asf/activemq/blob/94b404d0/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java index 2fa6fa7..6964842 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java @@ -42,7 +42,7 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.InvalidSelectorException; import javax.management.ObjectName; -import junit.framework.TestCase; + import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; @@ -61,6 +61,7 @@ import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.state.ProducerState; import org.apache.activemq.store.MessageStore; import org.apache.activemq.thread.TaskRunnerFactory; +import junit.framework.TestCase; public class SubscriptionAddRemoveQueueTest extends TestCase { @@ -322,6 +323,11 @@ public class SubscriptionAddRemoveQueueTest extends TestCase { return null; } + @Override + public boolean isWildcard() { + return false; + } + public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { return new ArrayList<MessageReference>(dispatched);
