https://issues.apache.org/jira/browse/AMQ-5915
Adding a new JavaRuntimeConfigurationBroker which allows dynamic changes to parts of the broker through a Java api instead of just through xml configuration. This is useful if starting a broker with java config and not using xml. It is also useful for temporary changes that shouldn't be persisted. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/43c3cae2 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/43c3cae2 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/43c3cae2 Branch: refs/heads/master Commit: 43c3cae2c069c98215e2c0f1ff19e883fd6c0b86 Parents: a01578a Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Mon Aug 10 15:19:26 2015 +0000 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Tue Oct 20 13:53:01 2015 +0000 ---------------------------------------------------------------------- .../AbstractRuntimeConfigurationBroker.java | 151 ++++++++ .../plugin/RuntimeConfigurationBroker.java | 141 ++----- .../plugin/UpdateVirtualDestinationsTask.java | 84 +++++ .../VirtualDestinationInterceptorProcessor.java | 59 +-- .../java/JavaRuntimeConfigurationBroker.java | 190 ++++++++++ .../java/JavaRuntimeConfigurationPlugin.java | 43 +++ .../plugin/jmx/RuntimeConfigurationView.java | 1 + .../activemq/AbstractAuthorizationTest.java | 67 ++++ .../activemq/AbstractVirtualDestTest.java | 129 +++++++ .../org/apache/activemq/AuthorizationTest.java | 52 +-- .../org/apache/activemq/DestinationsTest.java | 10 +- .../java/org/apache/activemq/MBeanTest.java | 12 +- .../apache/activemq/NetworkConnectorTest.java | 12 +- .../activemq/RuntimeConfigTestSupport.java | 2 +- .../org/apache/activemq/VirtualDestTest.java | 108 +----- .../activemq/java/JavaAuthenticationTest.java | 159 ++++++++ .../activemq/java/JavaAuthorizationTest.java | 244 ++++++++++++ .../activemq/java/JavaDestinationsTest.java | 107 ++++++ .../activemq/java/JavaNetworkConnectorTest.java | 170 +++++++++ .../activemq/java/JavaPolicyEntryTest.java | 141 +++++++ .../activemq/java/JavaVirtualDestTest.java | 376 +++++++++++++++++++ 21 files changed, 1929 insertions(+), 329 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/43c3cae2/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/AbstractRuntimeConfigurationBroker.java ---------------------------------------------------------------------- diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/AbstractRuntimeConfigurationBroker.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/AbstractRuntimeConfigurationBroker.java new file mode 100644 index 0000000..1306096 --- /dev/null +++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/AbstractRuntimeConfigurationBroker.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.plugin; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Pattern; + +import javax.management.ObjectName; + +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerFilter; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ConnectionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AbstractRuntimeConfigurationBroker extends BrokerFilter { + + public static final Logger LOG = LoggerFactory.getLogger(AbstractRuntimeConfigurationBroker.class); + protected final ReentrantReadWriteLock addDestinationBarrier = new ReentrantReadWriteLock(); + protected final ReentrantReadWriteLock addConnectionBarrier = new ReentrantReadWriteLock(); + protected Runnable monitorTask; + protected ConcurrentLinkedQueue<Runnable> addDestinationWork = new ConcurrentLinkedQueue<Runnable>(); + protected ConcurrentLinkedQueue<Runnable> addConnectionWork = new ConcurrentLinkedQueue<Runnable>(); + protected ObjectName objectName; + protected String infoString; + + public AbstractRuntimeConfigurationBroker(Broker next) { + super(next); + } + + @Override + public void start() throws Exception { + super.start(); + } + + @Override + public void stop() throws Exception { + if (monitorTask != null) { + try { + this.getBrokerService().getScheduler().cancel(monitorTask); + } catch (Exception letsNotStopStop) { + LOG.warn("Failed to cancel config monitor task", letsNotStopStop); + } + } + unregisterMbean(); + super.stop(); + } + + protected void registerMbean() { + + } + + protected void unregisterMbean() { + + } + + // modification to virtual destinations interceptor needs exclusive access to destination add + @Override + public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemporary) throws Exception { + Runnable work = addDestinationWork.poll(); + if (work != null) { + try { + addDestinationBarrier.writeLock().lockInterruptibly(); + do { + work.run(); + work = addDestinationWork.poll(); + } while (work != null); + return super.addDestination(context, destination, createIfTemporary); + } finally { + addDestinationBarrier.writeLock().unlock(); + } + } else { + try { + addDestinationBarrier.readLock().lockInterruptibly(); + return super.addDestination(context, destination, createIfTemporary); + } finally { + addDestinationBarrier.readLock().unlock(); + } + } + } + + // modification to authentication plugin needs exclusive access to connection add + @Override + public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { + Runnable work = addConnectionWork.poll(); + if (work != null) { + try { + addConnectionBarrier.writeLock().lockInterruptibly(); + do { + work.run(); + work = addConnectionWork.poll(); + } while (work != null); + super.addConnection(context, info); + } finally { + addConnectionBarrier.writeLock().unlock(); + } + } else { + try { + addConnectionBarrier.readLock().lockInterruptibly(); + super.addConnection(context, info); + } finally { + addConnectionBarrier.readLock().unlock(); + } + } + } + + protected void debug(String s) { + LOG.debug(s); + } + + protected void info(String s) { + LOG.info(filterPasswords(s)); + if (infoString != null) { + infoString += s; + infoString += ";"; + } + } + + protected void info(String s, Throwable t) { + LOG.info(filterPasswords(s), t); + if (infoString != null) { + infoString += s; + infoString += ", " + t; + infoString += ";"; + } + } + + Pattern matchPassword = Pattern.compile("password=.*,"); + protected String filterPasswords(Object toEscape) { + return matchPassword.matcher(toEscape.toString()).replaceAll("password=???,"); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/43c3cae2/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/RuntimeConfigurationBroker.java ---------------------------------------------------------------------- diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/RuntimeConfigurationBroker.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/RuntimeConfigurationBroker.java index c815d31..7a06c87 100644 --- a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/RuntimeConfigurationBroker.java +++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/RuntimeConfigurationBroker.java @@ -16,23 +16,10 @@ */ package org.apache.activemq.plugin; -import org.apache.activemq.broker.Broker; -import org.apache.activemq.broker.BrokerContext; -import org.apache.activemq.broker.BrokerFilter; -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.broker.jmx.ManagementContext; -import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.plugin.jmx.RuntimeConfigurationView; -import org.apache.activemq.schema.core.DtoBroker; -import org.apache.activemq.spring.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.core.io.Resource; -import org.w3c.dom.Document; -import org.w3c.dom.Node; -import org.xml.sax.SAXException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.Properties; import javax.management.JMException; import javax.management.ObjectName; @@ -48,30 +35,29 @@ import javax.xml.transform.Source; import javax.xml.transform.stream.StreamSource; import javax.xml.validation.Schema; import javax.xml.validation.SchemaFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Date; -import java.util.Properties; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.regex.Pattern; -public class RuntimeConfigurationBroker extends BrokerFilter { +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerContext; +import org.apache.activemq.broker.jmx.ManagementContext; +import org.apache.activemq.plugin.jmx.RuntimeConfigurationView; +import org.apache.activemq.schema.core.DtoBroker; +import org.apache.activemq.spring.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.io.Resource; +import org.w3c.dom.Document; +import org.w3c.dom.Node; +import org.xml.sax.SAXException; + +public class RuntimeConfigurationBroker extends AbstractRuntimeConfigurationBroker { public static final Logger LOG = LoggerFactory.getLogger(RuntimeConfigurationBroker.class); public static final String objectNamePropsAppendage = ",service=RuntimeConfiguration,name=Plugin"; - private final ReentrantReadWriteLock addDestinationBarrier = new ReentrantReadWriteLock(); - private final ReentrantReadWriteLock addConnectionBarrier = new ReentrantReadWriteLock(); PropertiesPlaceHolderUtil placeHolderUtil = null; private long checkPeriod; private long lastModified = -1; private Resource configToMonitor; private DtoBroker currentConfiguration; - private Runnable monitorTask; - protected ConcurrentLinkedQueue<Runnable> addDestinationWork = new ConcurrentLinkedQueue<Runnable>(); - protected ConcurrentLinkedQueue<Runnable> addConnectionWork = new ConcurrentLinkedQueue<Runnable>(); - private ObjectName objectName; - private String infoString; private Schema schema; public RuntimeConfigurationBroker(Broker next) { @@ -99,19 +85,7 @@ public class RuntimeConfigurationBroker extends BrokerFilter { } @Override - public void stop() throws Exception { - if (monitorTask != null) { - try { - this.getBrokerService().getScheduler().cancel(monitorTask); - } catch (Exception letsNotStopStop) { - LOG.warn("Failed to cancel config monitor task", letsNotStopStop); - } - } - unregisterMbean(); - super.stop(); - } - - private void registerMbean() { + protected void registerMbean() { if (getBrokerService().isUseJmx()) { ManagementContext managementContext = getBrokerService().getManagementContext(); try { @@ -123,7 +97,8 @@ public class RuntimeConfigurationBroker extends BrokerFilter { } } - private void unregisterMbean() { + @Override + protected void unregisterMbean() { if (objectName != null) { try { getBrokerService().getManagementContext().unregisterMBean(objectName); @@ -132,56 +107,6 @@ public class RuntimeConfigurationBroker extends BrokerFilter { } } - // modification to virtual destinations interceptor needs exclusive access to destination add - @Override - public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemporary) throws Exception { - Runnable work = addDestinationWork.poll(); - if (work != null) { - try { - addDestinationBarrier.writeLock().lockInterruptibly(); - do { - work.run(); - work = addDestinationWork.poll(); - } while (work != null); - return super.addDestination(context, destination, createIfTemporary); - } finally { - addDestinationBarrier.writeLock().unlock(); - } - } else { - try { - addDestinationBarrier.readLock().lockInterruptibly(); - return super.addDestination(context, destination, createIfTemporary); - } finally { - addDestinationBarrier.readLock().unlock(); - } - } - } - - // modification to authentication plugin needs exclusive access to connection add - @Override - public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { - Runnable work = addConnectionWork.poll(); - if (work != null) { - try { - addConnectionBarrier.writeLock().lockInterruptibly(); - do { - work.run(); - work = addConnectionWork.poll(); - } while (work != null); - super.addConnection(context, info); - } finally { - addConnectionBarrier.writeLock().unlock(); - } - } else { - try { - addConnectionBarrier.readLock().lockInterruptibly(); - super.addConnection(context, info); - } finally { - addConnectionBarrier.readLock().unlock(); - } - } - } - public String updateNow() { LOG.info("Manual configuration update triggered"); infoString = ""; @@ -210,26 +135,7 @@ public class RuntimeConfigurationBroker extends BrokerFilter { } } - protected void debug(String s) { - LOG.debug(s); - } - - protected void info(String s) { - LOG.info(filterPasswords(s)); - if (infoString != null) { - infoString += s; - infoString += ";"; - } - } - protected void info(String s, Throwable t) { - LOG.info(filterPasswords(s), t); - if (infoString != null) { - infoString += s; - infoString += ", " + t; - infoString += ";"; - } - } private void applyModifications(Resource configToMonitor) { DtoBroker changed = loadConfiguration(configToMonitor); @@ -261,10 +167,7 @@ public class RuntimeConfigurationBroker extends BrokerFilter { processor.processChanges(currentConfiguration, modifiedConfiguration); } - Pattern matchPassword = Pattern.compile("password=.*,"); - private String filterPasswords(Object toEscape) { - return matchPassword.matcher(toEscape.toString()).replaceAll("password=???,"); - } + private DtoBroker loadConfiguration(Resource configToMonitor) { DtoBroker jaxbConfig = null; http://git-wip-us.apache.org/repos/asf/activemq/blob/43c3cae2/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/UpdateVirtualDestinationsTask.java ---------------------------------------------------------------------- diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/UpdateVirtualDestinationsTask.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/UpdateVirtualDestinationsTask.java new file mode 100644 index 0000000..cd0121c --- /dev/null +++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/UpdateVirtualDestinationsTask.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.plugin; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.activemq.broker.region.CompositeDestinationInterceptor; +import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.virtual.VirtualDestination; +import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; + +public abstract class UpdateVirtualDestinationsTask implements Runnable { + + private final AbstractRuntimeConfigurationBroker plugin; + + public UpdateVirtualDestinationsTask( + AbstractRuntimeConfigurationBroker plugin) { + super(); + this.plugin = plugin; + } + + @Override + public void run() { + + boolean updatedExistingInterceptor = false; + RegionBroker regionBroker = (RegionBroker) plugin.getBrokerService() + .getRegionBroker(); + + for (DestinationInterceptor destinationInterceptor : plugin + .getBrokerService().getDestinationInterceptors()) { + if (destinationInterceptor instanceof VirtualDestinationInterceptor) { + // update existing interceptor + final VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) destinationInterceptor; + + virtualDestinationInterceptor + .setVirtualDestinations(getVirtualDestinations()); + plugin.info("applied updates to: " + + virtualDestinationInterceptor); + updatedExistingInterceptor = true; + } + } + + if (!updatedExistingInterceptor) { + // add + VirtualDestinationInterceptor virtualDestinationInterceptor = new VirtualDestinationInterceptor(); + virtualDestinationInterceptor.setVirtualDestinations(getVirtualDestinations()); + + List<DestinationInterceptor> interceptorsList = new ArrayList<DestinationInterceptor>(); + interceptorsList.addAll(Arrays.asList(plugin.getBrokerService() + .getDestinationInterceptors())); + interceptorsList.add(virtualDestinationInterceptor); + + DestinationInterceptor[] destinationInterceptors = interceptorsList + .toArray(new DestinationInterceptor[] {}); + plugin.getBrokerService().setDestinationInterceptors( + destinationInterceptors); + + ((CompositeDestinationInterceptor) regionBroker + .getDestinationInterceptor()) + .setInterceptors(destinationInterceptors); + plugin.info("applied new: " + interceptorsList); + } + regionBroker.reapplyInterceptor(); + } + + protected abstract VirtualDestination[] getVirtualDestinations(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/43c3cae2/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/VirtualDestinationInterceptorProcessor.java ---------------------------------------------------------------------- diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/VirtualDestinationInterceptorProcessor.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/VirtualDestinationInterceptorProcessor.java index 356755c..1cbbb04 100644 --- a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/VirtualDestinationInterceptorProcessor.java +++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/VirtualDestinationInterceptorProcessor.java @@ -16,18 +16,21 @@ */ package org.apache.activemq.plugin; +import java.util.ArrayList; +import java.util.List; + import org.apache.activemq.broker.region.CompositeDestinationInterceptor; import org.apache.activemq.broker.region.DestinationInterceptor; import org.apache.activemq.broker.region.RegionBroker; -import org.apache.activemq.broker.region.virtual.*; +import org.apache.activemq.broker.region.virtual.CompositeQueue; +import org.apache.activemq.broker.region.virtual.CompositeTopic; +import org.apache.activemq.broker.region.virtual.VirtualDestination; +import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; +import org.apache.activemq.broker.region.virtual.VirtualTopic; +import org.apache.activemq.schema.core.DtoCompositeQueue; +import org.apache.activemq.schema.core.DtoCompositeTopic; import org.apache.activemq.schema.core.DtoVirtualDestinationInterceptor; import org.apache.activemq.schema.core.DtoVirtualTopic; -import org.apache.activemq.schema.core.DtoCompositeTopic; -import org.apache.activemq.schema.core.DtoCompositeQueue; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; public class VirtualDestinationInterceptorProcessor extends DefaultConfigurationProcessor { @@ -38,49 +41,23 @@ public class VirtualDestinationInterceptorProcessor extends DefaultConfiguration @Override public void addNew(Object o) { final DtoVirtualDestinationInterceptor dto = (DtoVirtualDestinationInterceptor) o; - plugin.addDestinationWork.add(new Runnable() { - public void run() { - boolean updatedExistingInterceptor = false; - RegionBroker regionBroker = (RegionBroker) plugin.getBrokerService().getRegionBroker(); + plugin.addDestinationWork.add(new UpdateVirtualDestinationsTask(plugin) { - for (DestinationInterceptor destinationInterceptor : plugin.getBrokerService().getDestinationInterceptors()) { - if (destinationInterceptor instanceof VirtualDestinationInterceptor) { - // update existing interceptor - final VirtualDestinationInterceptor virtualDestinationInterceptor = - (VirtualDestinationInterceptor) destinationInterceptor; - - virtualDestinationInterceptor.setVirtualDestinations(fromDto(dto)); - plugin.info("applied updates to: " + virtualDestinationInterceptor); - updatedExistingInterceptor = true; - } - } - - if (!updatedExistingInterceptor) { - // add - VirtualDestinationInterceptor virtualDestinationInterceptor = - new VirtualDestinationInterceptor(); - virtualDestinationInterceptor.setVirtualDestinations(fromDto(dto)); - - List<DestinationInterceptor> interceptorsList = new ArrayList<DestinationInterceptor>(); - interceptorsList.addAll(Arrays.asList(plugin.getBrokerService().getDestinationInterceptors())); - interceptorsList.add(virtualDestinationInterceptor); - - DestinationInterceptor[] destinationInterceptors = interceptorsList.toArray(new DestinationInterceptor[]{}); - plugin.getBrokerService().setDestinationInterceptors(destinationInterceptors); - - ((CompositeDestinationInterceptor) regionBroker.getDestinationInterceptor()).setInterceptors(destinationInterceptors); - plugin.info("applied new: " + interceptorsList); - } - regionBroker.reapplyInterceptor(); + @Override + protected VirtualDestination[] getVirtualDestinations() { + return fromDto(dto); } + }); + } @Override public void remove(Object o) { // whack it plugin.addDestinationWork.add(new Runnable() { + @Override public void run() { List<DestinationInterceptor> interceptorsList = new ArrayList<DestinationInterceptor>(); for (DestinationInterceptor candidate : plugin.getBrokerService().getDestinationInterceptors()) { @@ -113,4 +90,6 @@ public class VirtualDestinationInterceptorProcessor extends DefaultConfiguration answer.toArray(array); return array; } + + } http://git-wip-us.apache.org/repos/asf/activemq/blob/43c3cae2/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/java/JavaRuntimeConfigurationBroker.java ---------------------------------------------------------------------- diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/java/JavaRuntimeConfigurationBroker.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/java/JavaRuntimeConfigurationBroker.java new file mode 100644 index 0000000..cd61f22 --- /dev/null +++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/java/JavaRuntimeConfigurationBroker.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.plugin.java; + +import java.util.Arrays; +import java.util.Set; + +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.DestinationFilter; +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.Topic; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.virtual.VirtualDestination; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.network.DiscoveryNetworkConnector; +import org.apache.activemq.plugin.AbstractRuntimeConfigurationBroker; +import org.apache.activemq.plugin.UpdateVirtualDestinationsTask; +import org.apache.activemq.security.AuthorizationBroker; +import org.apache.activemq.security.AuthorizationMap; +import org.apache.activemq.security.SimpleAuthenticationBroker; +import org.apache.activemq.security.SimpleAuthenticationPlugin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JavaRuntimeConfigurationBroker extends AbstractRuntimeConfigurationBroker { + + /** + * @param next + */ + public JavaRuntimeConfigurationBroker(Broker next) { + super(next); + } + + public static final Logger LOG = LoggerFactory.getLogger(JavaRuntimeConfigurationBroker.class); + + + //Virtual Destinations + public void setVirtualDestinations(final VirtualDestination[] virtualDestinations) { + this.addDestinationWork.add(new UpdateVirtualDestinationsTask(this) { + @Override + protected VirtualDestination[] getVirtualDestinations() { + return virtualDestinations; + } + }); + } + + //New Destinations + public void setDestinations(final ActiveMQDestination[] destinations) { + for (ActiveMQDestination destination : destinations) { + try { + if (!containsDestination(destination)) { + this.addDestination(this.getBrokerService().getAdminConnectionContext(), destination, true); + this.info("Added destination " + destination); + } + } catch (Exception e) { + this.info("Failed to add a new destination for: " + destination, e); + } + } + } + + protected boolean containsDestination(ActiveMQDestination destination) throws Exception { + return Arrays.asList(this.getBrokerService().getRegionBroker().getDestinations()).contains(destination); + } + + public void addNewDestination(ActiveMQDestination destination) { + try { + this.addDestination(this.getBrokerService().getAdminConnectionContext(), destination, true); + this.info("Added destination " + destination); + } catch (Exception e) { + this.info("Failed to add a new destination for: " + destination, e); + } + } + + //Network Connectors + public void addNetworkConnector(final DiscoveryNetworkConnector nc) { + try { + if (!getBrokerService().getNetworkConnectors().contains(nc)) { + getBrokerService().addNetworkConnector(nc); + nc.start(); + info("started new network connector: " + nc); + } else { + info("skipping network connector add, already exists: " + nc); + } + } catch (Exception e) { + info("Failed to add new networkConnector " + nc, e); + } + } + + public void updateNetworkConnector(final DiscoveryNetworkConnector nc) { + removeNetworkConnector(nc); + addNetworkConnector(nc); + } + + public void removeNetworkConnector(final DiscoveryNetworkConnector existingCandidate) { + if (getBrokerService().removeNetworkConnector(existingCandidate)) { + try { + existingCandidate.stop(); + info("stopped and removed networkConnector: " + existingCandidate); + } catch (Exception e) { + info("Failed to stop removed network connector: " + existingCandidate); + } + } + } + + //Policy entries + public void addNewPolicyEntry(PolicyEntry addition) { + PolicyMap existingMap = getBrokerService().getDestinationPolicy(); + existingMap.put(addition.getDestination(), addition); + applyRetrospectively(addition); + info("added policy for: " + addition.getDestination()); + } + + public void modifyPolicyEntry(PolicyEntry existing) { + PolicyMap existingMap = this.getBrokerService().getDestinationPolicy(); + + Set<?> existingEntry = existingMap.get(existing.getDestination()); + if (existingEntry.size() == 1) { + applyRetrospectively(existing); + this.info("updated policy for: " + existing.getDestination()); + } else { + this.info("cannot modify policy matching multiple destinations: " + existingEntry + ", destination:" + existing.getDestination()); + } + } + + protected void applyRetrospectively(PolicyEntry updatedEntry) { + RegionBroker regionBroker = (RegionBroker) this.getBrokerService().getRegionBroker(); + for (Destination destination : regionBroker.getDestinations(updatedEntry.getDestination())) { + Destination target = destination; + if (destination instanceof DestinationFilter) { + target = ((DestinationFilter)destination).getNext(); + } + if (target.getActiveMQDestination().isQueue()) { + updatedEntry.update((Queue) target); + } else if (target.getActiveMQDestination().isTopic()) { + updatedEntry.update((Topic) target); + } + this.debug("applied update to:" + target); + } + } + + //authentication plugin + public void updateSimpleAuthenticationPlugin(final SimpleAuthenticationPlugin updatedPlugin) { + try { + final SimpleAuthenticationBroker authenticationBroker = + (SimpleAuthenticationBroker) getBrokerService().getBroker().getAdaptor(SimpleAuthenticationBroker.class); + addConnectionWork.add(new Runnable() { + @Override + public void run() { + authenticationBroker.setUserGroups(updatedPlugin.getUserGroups()); + authenticationBroker.setUserPasswords(updatedPlugin.getUserPasswords()); + authenticationBroker.setAnonymousAccessAllowed(updatedPlugin.isAnonymousAccessAllowed()); + authenticationBroker.setAnonymousUser(updatedPlugin.getAnonymousUser()); + authenticationBroker.setAnonymousGroup(updatedPlugin.getAnonymousGroup()); + } + }); + } catch (Exception e) { + info("failed to apply SimpleAuthenticationPlugin modifications to SimpleAuthenticationBroker", e); + } + } + + //authorization map + public void updateAuthorizationMap(final AuthorizationMap authorizationMap) { + try { + // replace authorization map - need exclusive write lock to total broker + AuthorizationBroker authorizationBroker = + (AuthorizationBroker) getBrokerService().getBroker().getAdaptor(AuthorizationBroker.class); + + authorizationBroker.setAuthorizationMap(authorizationMap); + } catch (Exception e) { + info("failed to apply modified AuthorizationMap to AuthorizationBroker", e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/43c3cae2/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/java/JavaRuntimeConfigurationPlugin.java ---------------------------------------------------------------------- diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/java/JavaRuntimeConfigurationPlugin.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/java/JavaRuntimeConfigurationPlugin.java new file mode 100644 index 0000000..4eb30ef --- /dev/null +++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/java/JavaRuntimeConfigurationPlugin.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.plugin.java; + +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerPlugin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class JavaRuntimeConfigurationPlugin implements BrokerPlugin { + public static final Logger LOG = LoggerFactory.getLogger(JavaRuntimeConfigurationPlugin.class); + + private JavaRuntimeConfigurationBroker runtimeConfigurationBroker; + + @Override + public Broker installPlugin(Broker broker) throws Exception { + LOG.info("installing javaRuntimeConfiguration plugin"); + runtimeConfigurationBroker = new JavaRuntimeConfigurationBroker(broker); + + return runtimeConfigurationBroker; + } + + public JavaRuntimeConfigurationBroker getBroker() { + return runtimeConfigurationBroker; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/43c3cae2/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/jmx/RuntimeConfigurationView.java ---------------------------------------------------------------------- diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/jmx/RuntimeConfigurationView.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/jmx/RuntimeConfigurationView.java index 476e1ed..8830524 100644 --- a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/jmx/RuntimeConfigurationView.java +++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/jmx/RuntimeConfigurationView.java @@ -17,6 +17,7 @@ package org.apache.activemq.plugin.jmx; import java.util.Date; + import org.apache.activemq.plugin.RuntimeConfigurationBroker; import org.springframework.core.io.Resource; http://git-wip-us.apache.org/repos/asf/activemq/blob/43c3cae2/activemq-runtime-config/src/test/java/org/apache/activemq/AbstractAuthorizationTest.java ---------------------------------------------------------------------- diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/AbstractAuthorizationTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/AbstractAuthorizationTest.java new file mode 100644 index 0000000..286d7c1 --- /dev/null +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/AbstractAuthorizationTest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import static org.junit.Assert.fail; + +import javax.jms.JMSException; +import javax.jms.Session; + +public abstract class AbstractAuthorizationTest extends RuntimeConfigTestSupport { + + protected void assertDeniedTemp(String userPass) { + try { + assertAllowedTemp(userPass); + fail("Expected not allowed exception"); + } catch (Exception expected) { + LOG.debug("got:" + expected, expected); + } + } + + protected void assertAllowedTemp(String userPass) throws Exception { + ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection(userPass, userPass); + connection.start(); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createConsumer(session.createTemporaryQueue()); + } finally { + connection.close(); + } + + } + + protected void assertDenied(String userPass, String destination) { + try { + assertAllowed(userPass, destination); + fail("Expected not allowed exception"); + } catch (JMSException expected) { + LOG.debug("got:" + expected, expected); + } + } + + protected void assertAllowed(String userPass, String dest) throws JMSException { + ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection(userPass, userPass); + connection.start(); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createConsumer(session.createQueue(dest)); + } finally { + connection.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/43c3cae2/activemq-runtime-config/src/test/java/org/apache/activemq/AbstractVirtualDestTest.java ---------------------------------------------------------------------- diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/AbstractVirtualDestTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/AbstractVirtualDestTest.java new file mode 100644 index 0000000..a548d55 --- /dev/null +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/AbstractVirtualDestTest.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.util.Collections; +import java.util.Map; + +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +/** + * + * + */ +public abstract class AbstractVirtualDestTest extends RuntimeConfigTestSupport { + + protected void forceAddDestination(String dest) throws Exception { + ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createConsumer(session.createQueue("Consumer.A." + dest)); + connection.close(); + } + + protected void exerciseVirtualTopic(String topic) throws Exception { + exerciseVirtualTopic("Consumer.A.", topic); + } + + protected void exerciseVirtualTopic(String prefix, String topic) throws Exception { + ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(session.createQueue(prefix + topic)); + LOG.info("new consumer for: " + consumer.getDestination()); + MessageProducer producer = session.createProducer(session.createTopic(topic)); + final String body = "To vt:" + topic; + Message message = sendAndReceiveMessage(session, consumer, producer, body); + assertNotNull("got message", message); + assertEquals("got expected message", body, ((TextMessage) message).getText()); + connection.close(); + } + + protected void exerciseCompositeQueue(String dest, String consumerQ) throws Exception { + ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(session.createQueue(consumerQ)); + LOG.info("new consumer for: " + consumer.getDestination()); + MessageProducer producer = session.createProducer(session.createQueue(dest)); + final String body = "To cq:" + dest; + Message message = sendAndReceiveMessage(session, consumer, producer, body); + assertNotNull("got message", message); + assertEquals("got expected message", body, ((TextMessage) message).getText()); + connection.close(); + } + + protected void exerciseFilteredCompositeQueue(String dest, String consumerDestination, String acceptedHeaderValue) throws Exception { + ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(session.createQueue(consumerDestination)); + LOG.info("new consumer for: " + consumer.getDestination()); + MessageProducer producer = session.createProducer(session.createQueue(dest)); + + // positive test + String body = "To filtered cq:" + dest; + + Message message = sendAndReceiveMessage(session, consumer, producer, body, Collections.singletonMap("odd", acceptedHeaderValue)); + assertNotNull("The message did not reach the destination even though it should pass through the filter.", message); + assertEquals("Did not get expected message", body, ((TextMessage) message).getText()); + + // negative test + message = sendAndReceiveMessage(session, consumer, producer, "Not to filtered cq:" + dest, Collections.singletonMap("odd", "somethingElse")); + assertNull("The message reached the destination, but it should have been removed by the filter.", message); + + connection.close(); + } + + protected Message sendAndReceiveMessage(Session session, + ActiveMQMessageConsumer consumer, MessageProducer producer, + final String messageBody) throws Exception { + return sendAndReceiveMessage(session, consumer, producer, messageBody, + null); + } + + protected Message sendAndReceiveMessage(Session session, + ActiveMQMessageConsumer consumer, MessageProducer producer, + final String messageBody, Map<String, String> propertiesMap) + throws Exception { + TextMessage messageToSend = session.createTextMessage(messageBody); + if (propertiesMap != null) { + for (String headerKey : propertiesMap.keySet()) { + messageToSend.setStringProperty(headerKey, + propertiesMap.get(headerKey)); + } + } + producer.send(messageToSend); + LOG.info("sent to: " + producer.getDestination()); + + Message message = null; + for (int i = 0; i < 10 && message == null; i++) { + message = consumer.receive(1000); + } + return message; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/43c3cae2/activemq-runtime-config/src/test/java/org/apache/activemq/AuthorizationTest.java ---------------------------------------------------------------------- diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/AuthorizationTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/AuthorizationTest.java index b9282a0..0b933e9 100644 --- a/activemq-runtime-config/src/test/java/org/apache/activemq/AuthorizationTest.java +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/AuthorizationTest.java @@ -16,18 +16,11 @@ */ package org.apache.activemq; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; + import org.junit.Test; -public class AuthorizationTest extends RuntimeConfigTestSupport { +public class AuthorizationTest extends AbstractAuthorizationTest { private static final int RECEIVE_TIMEOUT = 1000; String configurationSeed = "authorizationTest"; @@ -104,45 +97,4 @@ public class AuthorizationTest extends RuntimeConfigTestSupport { assertAllowedTemp("guest"); } - private void assertDeniedTemp(String userPass) { - try { - assertAllowedTemp(userPass); - fail("Expected not allowed exception"); - } catch (Exception expected) { - LOG.debug("got:" + expected, expected); - } - } - - private void assertAllowedTemp(String userPass) throws Exception { - ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection(userPass, userPass); - connection.start(); - try { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - session.createConsumer(session.createTemporaryQueue()); - } finally { - connection.close(); - } - - } - - private void assertDenied(String userPass, String destination) { - try { - assertAllowed(userPass, destination); - fail("Expected not allowed exception"); - } catch (JMSException expected) { - LOG.debug("got:" + expected, expected); - } - } - - private void assertAllowed(String userPass, String dest) throws JMSException { - ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection(userPass, userPass); - connection.start(); - try { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - session.createConsumer(session.createQueue(dest)); - } finally { - connection.close(); - } - } - } http://git-wip-us.apache.org/repos/asf/activemq/blob/43c3cae2/activemq-runtime-config/src/test/java/org/apache/activemq/DestinationsTest.java ---------------------------------------------------------------------- diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/DestinationsTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/DestinationsTest.java index 2500659..8e263d7 100644 --- a/activemq-runtime-config/src/test/java/org/apache/activemq/DestinationsTest.java +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/DestinationsTest.java @@ -16,7 +16,10 @@ */ package org.apache.activemq; -import org.apache.activemq.broker.region.Destination; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; + import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -24,11 +27,6 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; -import java.util.Set; - -import static org.junit.Assert.assertTrue; - public class DestinationsTest extends RuntimeConfigTestSupport { public static final Logger LOG = LoggerFactory.getLogger(DestinationsTest.class); http://git-wip-us.apache.org/repos/asf/activemq/blob/43c3cae2/activemq-runtime-config/src/test/java/org/apache/activemq/MBeanTest.java ---------------------------------------------------------------------- diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/MBeanTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/MBeanTest.java index cce07cb..2642dc9 100644 --- a/activemq-runtime-config/src/test/java/org/apache/activemq/MBeanTest.java +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/MBeanTest.java @@ -16,19 +16,19 @@ */ package org.apache.activemq; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + import java.util.HashMap; + import javax.management.ObjectName; + import org.apache.activemq.plugin.RuntimeConfigurationBroker; -import org.apache.activemq.plugin.jmx.RuntimeConfigurationView; import org.apache.activemq.plugin.jmx.RuntimeConfigurationViewMBean; import org.apache.activemq.util.IntrospectionSupport; import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; - public class MBeanTest extends RuntimeConfigTestSupport { @Test http://git-wip-us.apache.org/repos/asf/activemq/blob/43c3cae2/activemq-runtime-config/src/test/java/org/apache/activemq/NetworkConnectorTest.java ---------------------------------------------------------------------- diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/NetworkConnectorTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/NetworkConnectorTest.java index 3151cc4..97ced17 100644 --- a/activemq-runtime-config/src/test/java/org/apache/activemq/NetworkConnectorTest.java +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/NetworkConnectorTest.java @@ -16,17 +16,14 @@ */ package org.apache.activemq; -import java.util.List; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.network.NetworkConnector; -import org.apache.activemq.util.Wait; -import org.junit.Before; -import org.junit.Test; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.util.Wait; +import org.junit.Test; + public class NetworkConnectorTest extends RuntimeConfigTestSupport { String configurationSeed = "networkConnectorTest"; @@ -63,7 +60,6 @@ public class NetworkConnectorTest extends RuntimeConfigTestSupport { } - @Test public void testMod() throws Exception { http://git-wip-us.apache.org/repos/asf/activemq/blob/43c3cae2/activemq-runtime-config/src/test/java/org/apache/activemq/RuntimeConfigTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/RuntimeConfigTestSupport.java b/activemq-runtime-config/src/test/java/org/apache/activemq/RuntimeConfigTestSupport.java index 24894c3..f647008 100644 --- a/activemq-runtime-config/src/test/java/org/apache/activemq/RuntimeConfigTestSupport.java +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/RuntimeConfigTestSupport.java @@ -37,7 +37,7 @@ public class RuntimeConfigTestSupport { public static final int SLEEP = 4; // seconds public static final String EMPTY_UPDATABLE_CONFIG = "emptyUpdatableConfig1000" ; - BrokerService brokerService; + protected BrokerService brokerService; @Rule public TestWatcher watchman = new TestWatcher() { http://git-wip-us.apache.org/repos/asf/activemq/blob/43c3cae2/activemq-runtime-config/src/test/java/org/apache/activemq/VirtualDestTest.java ---------------------------------------------------------------------- diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/VirtualDestTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/VirtualDestTest.java index 0113e81..bd21118 100644 --- a/activemq-runtime-config/src/test/java/org/apache/activemq/VirtualDestTest.java +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/VirtualDestTest.java @@ -16,22 +16,18 @@ */ package org.apache.activemq; -import java.util.Collections; -import java.util.Map; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + import java.util.concurrent.TimeUnit; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; + import org.apache.activemq.broker.region.DestinationInterceptor; import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; import org.apache.activemq.util.Wait; import org.junit.Test; - -import static org.junit.Assert.*; - -public class VirtualDestTest extends RuntimeConfigTestSupport { +public class VirtualDestTest extends AbstractVirtualDestTest { String configurationSeed = "virtualDestTest"; @@ -183,6 +179,7 @@ public class VirtualDestTest extends RuntimeConfigTestSupport { forceAddDestination("AnyDest"); assertTrue("getDestinationInterceptors empty on time", Wait.waitFor(new Wait.Condition() { + @Override public boolean isSatisified() { return 0 == brokerService.getDestinationInterceptors().length; } @@ -235,7 +232,7 @@ public class VirtualDestTest extends RuntimeConfigTestSupport { assertEquals("still one interceptor", 1, brokerService.getDestinationInterceptors().length); } - + @Test public void testNewFilteredComposite() throws Exception { final String brokerConfig = configurationSeed + "-new-filtered-composite-vd-broker"; @@ -246,7 +243,7 @@ public class VirtualDestTest extends RuntimeConfigTestSupport { applyNewConfig(brokerConfig, configurationSeed + "-add-filtered-composite-vd", SLEEP); exerciseFilteredCompositeQueue("VirtualDestination.FilteredCompositeQueue", "VirtualDestination.QueueConsumer", "yes"); - } + } @Test public void testModFilteredComposite() throws Exception { @@ -259,96 +256,9 @@ public class VirtualDestTest extends RuntimeConfigTestSupport { applyNewConfig(brokerConfig, configurationSeed + "-mod-filtered-composite-vd", SLEEP); exerciseFilteredCompositeQueue("VirtualDestination.FilteredCompositeQueue", "VirtualDestination.QueueConsumer", "no"); exerciseFilteredCompositeQueue("VirtualDestination.FilteredCompositeQueue", "VirtualDestination.QueueConsumer", "no"); - } - - private void forceAddDestination(String dest) throws Exception { - ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - session.createConsumer(session.createQueue("Consumer.A." + dest)); - connection.close(); - } - - private void exerciseVirtualTopic(String topic) throws Exception { - exerciseVirtualTopic("Consumer.A.", topic); } - private void exerciseVirtualTopic(String prefix, String topic) throws Exception { - ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(session.createQueue(prefix + topic)); - LOG.info("new consumer for: " + consumer.getDestination()); - MessageProducer producer = session.createProducer(session.createTopic(topic)); - final String body = "To vt:" + topic; - Message message = sendAndReceiveMessage(session, consumer, producer, body); - assertNotNull("got message", message); - assertEquals("got expected message", body, ((TextMessage) message).getText()); - connection.close(); - } - private void exerciseCompositeQueue(String dest, String consumerQ) throws Exception { - ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(session.createQueue(consumerQ)); - LOG.info("new consumer for: " + consumer.getDestination()); - MessageProducer producer = session.createProducer(session.createQueue(dest)); - final String body = "To cq:" + dest; - Message message = sendAndReceiveMessage(session, consumer, producer, body); - assertNotNull("got message", message); - assertEquals("got expected message", body, ((TextMessage) message).getText()); - connection.close(); - } - - private void exerciseFilteredCompositeQueue(String dest, String consumerDestination, String acceptedHeaderValue) throws Exception { - ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(session.createQueue(consumerDestination)); - LOG.info("new consumer for: " + consumer.getDestination()); - MessageProducer producer = session.createProducer(session.createQueue(dest)); - - // positive test - String body = "To filtered cq:" + dest; - - Message message = sendAndReceiveMessage(session, consumer, producer, body, Collections.singletonMap("odd", acceptedHeaderValue)); - assertNotNull("The message did not reach the destination even though it should pass through the filter.", message); - assertEquals("Did not get expected message", body, ((TextMessage) message).getText()); - - // negative test - message = sendAndReceiveMessage(session, consumer, producer, "Not to filtered cq:" + dest, Collections.singletonMap("odd", "somethingElse")); - assertNull("The message reached the destination, but it should have been removed by the filter.", message); - - connection.close(); - } - private Message sendAndReceiveMessage(Session session, - ActiveMQMessageConsumer consumer, MessageProducer producer, - final String messageBody) throws Exception { - return sendAndReceiveMessage(session, consumer, producer, messageBody, null); - } - private Message sendAndReceiveMessage(Session session, - ActiveMQMessageConsumer consumer, MessageProducer producer, - final String messageBody, Map<String, String> propertiesMap) - throws Exception { - TextMessage messageToSend = session.createTextMessage(messageBody); - if (propertiesMap != null) { - for (String headerKey : propertiesMap.keySet()) { - messageToSend.setStringProperty(headerKey, propertiesMap.get(headerKey)); - } - } - producer.send(messageToSend); - LOG.info("sent to: " + producer.getDestination()); - - Message message = null; - for (int i = 0; i < 10 && message == null; i++) { - message = consumer.receive(1000); - } - return message; - } } http://git-wip-us.apache.org/repos/asf/activemq/blob/43c3cae2/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaAuthenticationTest.java ---------------------------------------------------------------------- diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaAuthenticationTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaAuthenticationTest.java new file mode 100644 index 0000000..5f20f8c --- /dev/null +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaAuthenticationTest.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.java; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import javax.jms.JMSException; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.RuntimeConfigTestSupport; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.filter.DestinationMapEntry; +import org.apache.activemq.plugin.java.JavaRuntimeConfigurationBroker; +import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin; +import org.apache.activemq.security.AuthenticationUser; +import org.apache.activemq.security.AuthorizationEntry; +import org.apache.activemq.security.AuthorizationPlugin; +import org.apache.activemq.security.DefaultAuthorizationMap; +import org.apache.activemq.security.SimpleAuthenticationPlugin; +import org.apache.activemq.security.TempDestinationAuthorizationEntry; +import org.junit.Test; + +public class JavaAuthenticationTest extends RuntimeConfigTestSupport { + + public static final int SLEEP = 2; // seconds + private JavaRuntimeConfigurationBroker javaConfigBroker; + private SimpleAuthenticationPlugin authenticationPlugin; + + public void startBroker(BrokerService brokerService) throws Exception { + this.brokerService = brokerService; + + authenticationPlugin = new SimpleAuthenticationPlugin(); + authenticationPlugin.setAnonymousAccessAllowed(false); + authenticationPlugin.setAnonymousGroup("ag"); + authenticationPlugin.setAnonymousUser("au"); + List<AuthenticationUser> users = new ArrayList<>(); + users.add(new AuthenticationUser("test_user_password", "test_user_password", "users")); + authenticationPlugin.setUsers(users); + + AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(); + DefaultAuthorizationMap authorizationMap = new DefaultAuthorizationMap(); + authorizationPlugin.setMap(authorizationMap); + @SuppressWarnings("rawtypes") + List<DestinationMapEntry> entries = new ArrayList<>(); + entries.add(buildQueueAuthorizationEntry(">", "admins", "admins", "admins")); + entries.add(buildQueueAuthorizationEntry("USERS.>", "users", "users", "users")); + + entries.add(buildTopicAuthorizationEntry(">", "admins", "admins", "admins")); + entries.add(buildTopicAuthorizationEntry("USERS.>", "users", "users", "users")); + + entries.add(buildTopicAuthorizationEntry("ActiveMQ.Advisory.>", "guests,users", "guests,users", "guests,users")); + + TempDestinationAuthorizationEntry tempEntry = new TempDestinationAuthorizationEntry(); + tempEntry.setRead("tempDestinationAdmins"); + tempEntry.setWrite("tempDestinationAdmins"); + tempEntry.setAdmin("tempDestinationAdmins"); + + authorizationMap.setAuthorizationEntries(entries); + authorizationMap.setTempDestinationAuthorizationEntry(tempEntry); + + brokerService.setPlugins(new BrokerPlugin[]{new JavaRuntimeConfigurationPlugin(), + authenticationPlugin, authorizationPlugin}); + brokerService.setPersistent(false); + brokerService.start(); + brokerService.waitUntilStarted(); + + javaConfigBroker = + (JavaRuntimeConfigurationBroker) brokerService.getBroker().getAdaptor(JavaRuntimeConfigurationBroker.class); + } + + @Test + public void testMod() throws Exception { + BrokerService brokerService = new BrokerService(); + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + + assertAllowed("test_user_password", "USERS.A"); + assertDenied("another_test_user_password", "USERS.A"); + + // anonymous + assertDenied(null, "USERS.A"); + + List<AuthenticationUser> users = new ArrayList<>(); + users.add(new AuthenticationUser("test_user_password", "test_user_password", "users")); + users.add(new AuthenticationUser("another_test_user_password", "another_test_user_password", "users")); + authenticationPlugin.setAnonymousGroup("users"); + authenticationPlugin.setUsers(users); + authenticationPlugin.setAnonymousAccessAllowed(true); + javaConfigBroker.updateSimpleAuthenticationPlugin(authenticationPlugin); + + TimeUnit.SECONDS.sleep(SLEEP); + + assertAllowed("test_user_password", "USERS.A"); + assertAllowed("another_test_user_password", "USERS.A"); + assertAllowed(null, "USERS.A"); + + } + + private void assertDenied(String userPass, String destination) { + try { + assertAllowed(userPass, destination); + fail("Expected not allowed exception"); + } catch (JMSException expected) { + LOG.debug("got:" + expected, expected); + } + } + + private void assertAllowed(String userPass, String dest) throws JMSException { + ActiveMQConnection connection = (ActiveMQConnection) new ActiveMQConnectionFactory("vm://localhost").createConnection(userPass, userPass); + connection.start(); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createConsumer(session.createQueue(dest)); + } finally { + connection.close(); + } + } + + private AuthorizationEntry buildQueueAuthorizationEntry(String queue, String read, String write, String admin) throws Exception { + AuthorizationEntry entry = new AuthorizationEntry(); + entry.setQueue(queue); + entry.setRead(read); + entry.setWrite(write); + entry.setAdmin(admin); + return entry; + } + + private AuthorizationEntry buildTopicAuthorizationEntry(String topic, String read, String write, String admin) throws Exception { + AuthorizationEntry entry = new AuthorizationEntry(); + entry.setTopic(topic); + entry.setRead(read); + entry.setWrite(write); + entry.setAdmin(admin); + return entry; + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/43c3cae2/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaAuthorizationTest.java ---------------------------------------------------------------------- diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaAuthorizationTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaAuthorizationTest.java new file mode 100644 index 0000000..d255538 --- /dev/null +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaAuthorizationTest.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.java; + +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.AbstractAuthorizationTest; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.filter.DestinationMapEntry; +import org.apache.activemq.plugin.java.JavaRuntimeConfigurationBroker; +import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin; +import org.apache.activemq.security.AuthorizationEntry; +import org.apache.activemq.security.AuthorizationPlugin; +import org.apache.activemq.security.DefaultAuthorizationMap; +import org.apache.activemq.security.JaasAuthenticationPlugin; +import org.apache.activemq.security.TempDestinationAuthorizationEntry; +import org.junit.Test; + +public class JavaAuthorizationTest extends AbstractAuthorizationTest { + + public static final int SLEEP = 2; // seconds + String configurationSeed = "authorizationTest"; + + private JavaRuntimeConfigurationBroker javaConfigBroker; + + public void startBroker(BrokerService brokerService) throws Exception { + this.brokerService = brokerService; + + JaasAuthenticationPlugin authenticationPlugin = new JaasAuthenticationPlugin(); + authenticationPlugin.setConfiguration("activemq-domain"); + + + AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(); + DefaultAuthorizationMap authorizationMap = new DefaultAuthorizationMap(); + authorizationPlugin.setMap(authorizationMap); + + brokerService.setPlugins(new BrokerPlugin[]{new JavaRuntimeConfigurationPlugin(), + authenticationPlugin, authorizationPlugin}); + brokerService.setPersistent(false); + brokerService.start(); + brokerService.waitUntilStarted(); + + javaConfigBroker = + (JavaRuntimeConfigurationBroker) brokerService.getBroker().getAdaptor(JavaRuntimeConfigurationBroker.class); + } + + @Test + public void testMod() throws Exception { + DefaultAuthorizationMap authorizationMap = buildUsersMap(); + BrokerService brokerService = new BrokerService(); + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + + javaConfigBroker.updateAuthorizationMap(authorizationMap); + assertAllowed("user", "USERS.A"); + assertDenied("user", "GUESTS.A"); + + assertDeniedTemp("guest"); + + // applyNewConfig(brokerConfig, configurationSeed + "-users-guests", SLEEP); + + authorizationMap = buildUsersGuestsMap(); + javaConfigBroker.updateAuthorizationMap(authorizationMap); + TimeUnit.SECONDS.sleep(SLEEP); + + assertAllowed("user", "USERS.A"); + assertAllowed("guest", "GUESTS.A"); + assertDenied("user", "GUESTS.A"); + + assertAllowedTemp("guest"); + } + + @Test + public void testModRm() throws Exception { + DefaultAuthorizationMap authorizationMap = buildUsersGuestsMap(); + BrokerService brokerService = new BrokerService(); + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + + javaConfigBroker.updateAuthorizationMap(authorizationMap); + TimeUnit.SECONDS.sleep(SLEEP); + assertAllowed("user", "USERS.A"); + assertAllowed("guest", "GUESTS.A"); + assertDenied("user", "GUESTS.A"); + assertAllowedTemp("guest"); + + authorizationMap = buildUsersMap(); + javaConfigBroker.updateAuthorizationMap(authorizationMap); + TimeUnit.SECONDS.sleep(SLEEP); + + assertAllowed("user", "USERS.A"); + assertDenied("user", "GUESTS.A"); + assertDeniedTemp("guest"); + } + + @Test + public void testWildcard() throws Exception { + DefaultAuthorizationMap authorizationMap = buildWildcardUsersGuestsMap(); + BrokerService brokerService = new BrokerService(); + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + + javaConfigBroker.updateAuthorizationMap(authorizationMap); + TimeUnit.SECONDS.sleep(SLEEP); + + final String ALL_USERS = "ALL.USERS.>"; + final String ALL_GUESTS = "ALL.GUESTS.>"; + + assertAllowed("user", ALL_USERS); + assertAllowed("guest", ALL_GUESTS); + assertDenied("user", ALL_USERS + "," + ALL_GUESTS); + assertDenied("guest", ALL_GUESTS + "," + ALL_USERS); + + final String ALL_PREFIX = "ALL.>"; + + assertDenied("user", ALL_PREFIX); + assertDenied("guest", ALL_PREFIX); + + assertAllowed("user", "ALL.USERS.A"); + assertAllowed("user", "ALL.USERS.A,ALL.USERS.B"); + assertAllowed("guest", "ALL.GUESTS.A"); + assertAllowed("guest", "ALL.GUESTS.A,ALL.GUESTS.B"); + + assertDenied("user", "USERS.>"); + assertDenied("guest", "GUESTS.>"); + + + assertAllowedTemp("guest"); + } + + /** + * @return + * @throws Exception + */ + private DefaultAuthorizationMap buildWildcardUsersGuestsMap() throws Exception { + DefaultAuthorizationMap authorizationMap = new DefaultAuthorizationMap(); + @SuppressWarnings("rawtypes") + List<DestinationMapEntry> entries = new ArrayList<>(); + entries.add(buildQueueAuthorizationEntry(">", "admins", "admins", "admins")); + entries.add(buildQueueAuthorizationEntry("ALL.USERS.>", "users", "users", "users")); + entries.add(buildQueueAuthorizationEntry("ALL.GUESTS.>", "guests", "guests,users", "guests,users")); + + entries.add(buildTopicAuthorizationEntry(">", "admins", "admins", "admins")); + entries.add(buildTopicAuthorizationEntry("ALL.USERS.>", "users", "users", "users")); + entries.add(buildTopicAuthorizationEntry("ALL.GUESTS.>", "guests", "guests,users", "guests,users")); + + entries.add(buildTopicAuthorizationEntry("ActiveMQ.Advisory.>", "guests,users", "guests,users", "guests,users")); + + TempDestinationAuthorizationEntry tempEntry = new TempDestinationAuthorizationEntry(); + tempEntry.setRead("tempDestinationAdmins,guests"); + tempEntry.setWrite("tempDestinationAdmins,guests"); + tempEntry.setAdmin("tempDestinationAdmins,guests"); + + authorizationMap.setAuthorizationEntries(entries); + authorizationMap.setTempDestinationAuthorizationEntry(tempEntry); + return authorizationMap; + } + + private DefaultAuthorizationMap buildUsersMap() throws Exception { + DefaultAuthorizationMap authorizationMap = new DefaultAuthorizationMap(); + @SuppressWarnings("rawtypes") + List<DestinationMapEntry> entries = new ArrayList<>(); + entries.add(buildQueueAuthorizationEntry(">", "admins", "admins", "admins")); + entries.add(buildQueueAuthorizationEntry("USERS.>", "users", "users", "users")); + + + entries.add(buildTopicAuthorizationEntry(">", "admins", "admins", "admins")); + entries.add(buildTopicAuthorizationEntry("USERS.>", "users", "users", "users")); + + entries.add(buildTopicAuthorizationEntry("ActiveMQ.Advisory.>", "guests,users", "guests,users", "guests,users")); + + TempDestinationAuthorizationEntry tempEntry = new TempDestinationAuthorizationEntry(); + tempEntry.setRead("tempDestinationAdmins"); + tempEntry.setWrite("tempDestinationAdmins"); + tempEntry.setAdmin("tempDestinationAdmins"); + + authorizationMap.setAuthorizationEntries(entries); + authorizationMap.setTempDestinationAuthorizationEntry(tempEntry); + + return authorizationMap; + } + + private DefaultAuthorizationMap buildUsersGuestsMap() throws Exception { + DefaultAuthorizationMap authorizationMap = new DefaultAuthorizationMap(); + @SuppressWarnings("rawtypes") + List<DestinationMapEntry> entries = new ArrayList<>(); + entries.add(buildQueueAuthorizationEntry(">", "admins", "admins", "admins")); + entries.add(buildQueueAuthorizationEntry("USERS.>", "users", "users", "users")); + entries.add(buildQueueAuthorizationEntry("GUESTS.>", "guests", "guests,users", "guests,users")); + + entries.add(buildTopicAuthorizationEntry(">", "admins", "admins", "admins")); + entries.add(buildTopicAuthorizationEntry("USERS.>", "users", "users", "users")); + entries.add(buildTopicAuthorizationEntry("GUESTS.>", "guests", "guests,users", "guests,users")); + + entries.add(buildTopicAuthorizationEntry("ActiveMQ.Advisory.>", "guests,users", "guests,users", "guests,users")); + + TempDestinationAuthorizationEntry tempEntry = new TempDestinationAuthorizationEntry(); + tempEntry.setRead("tempDestinationAdmins,guests"); + tempEntry.setWrite("tempDestinationAdmins,guests"); + tempEntry.setAdmin("tempDestinationAdmins,guests"); + + authorizationMap.setAuthorizationEntries(entries); + authorizationMap.setTempDestinationAuthorizationEntry(tempEntry); + return authorizationMap; + } + + private AuthorizationEntry buildQueueAuthorizationEntry(String queue, String read, String write, String admin) throws Exception { + AuthorizationEntry entry = new AuthorizationEntry(); + entry.setQueue(queue); + entry.setRead(read); + entry.setWrite(write); + entry.setAdmin(admin); + return entry; + } + + private AuthorizationEntry buildTopicAuthorizationEntry(String topic, String read, String write, String admin) throws Exception { + AuthorizationEntry entry = new AuthorizationEntry(); + entry.setTopic(topic); + entry.setRead(read); + entry.setWrite(write); + entry.setAdmin(admin); + return entry; + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/43c3cae2/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaDestinationsTest.java ---------------------------------------------------------------------- diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaDestinationsTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaDestinationsTest.java new file mode 100644 index 0000000..95124e1 --- /dev/null +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaDestinationsTest.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.java; + +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.RuntimeConfigTestSupport; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.plugin.java.JavaRuntimeConfigurationBroker; +import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin; +import org.apache.activemq.util.Wait; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JavaDestinationsTest extends RuntimeConfigTestSupport { + public static final Logger LOG = LoggerFactory.getLogger(JavaDestinationsTest.class); + + private JavaRuntimeConfigurationBroker javaConfigBroker; + + public void startBroker(BrokerService brokerService) throws Exception { + this.brokerService = brokerService; + brokerService.setPlugins(new BrokerPlugin[]{new JavaRuntimeConfigurationPlugin()}); + brokerService.setPersistent(false); + brokerService.start(); + brokerService.waitUntilStarted(); + + javaConfigBroker = + (JavaRuntimeConfigurationBroker) brokerService.getBroker().getAdaptor(JavaRuntimeConfigurationBroker.class); + } + + @Test + public void testMod() throws Exception { + BrokerService brokerService = new BrokerService(); + brokerService.setDestinations(new ActiveMQDestination[] {new ActiveMQQueue("ORIGINAL")}); + startBroker(brokerService); + + + assertTrue("broker alive", brokerService.isStarted()); + printDestinations(); + assertTrue("contains original", containsDestination(new ActiveMQQueue("ORIGINAL"))); + + LOG.info("Adding destinations"); + + //apply new config + javaConfigBroker.setDestinations(new ActiveMQDestination[] { + new ActiveMQTopic("BEFORE"), new ActiveMQQueue("ORIGINAL"), new ActiveMQQueue("AFTER")}); + + printDestinations(); + + assertTrue("contains destinations", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return containsDestination(new ActiveMQQueue("ORIGINAL")) + && containsDestination(new ActiveMQTopic("BEFORE")) + && containsDestination(new ActiveMQQueue("AFTER")); + } + }, TimeUnit.MILLISECONDS.convert(SLEEP, TimeUnit.SECONDS))); + + + LOG.info("Removing destinations"); + //apply new config + javaConfigBroker.setDestinations(new ActiveMQDestination[] { + new ActiveMQTopic("BEFORE"), new ActiveMQQueue("AFTER")}); + printDestinations(); + assertTrue("contains destinations", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return containsDestination(new ActiveMQQueue("ORIGINAL")) + && containsDestination(new ActiveMQTopic("BEFORE")) + && containsDestination(new ActiveMQQueue("AFTER")); + } + }, TimeUnit.MILLISECONDS.convert(SLEEP, TimeUnit.SECONDS))); + } + + protected boolean containsDestination(ActiveMQDestination destination) throws Exception { + return Arrays.asList(brokerService.getRegionBroker().getDestinations()).contains(destination); + } + + protected void printDestinations() throws Exception { + ActiveMQDestination[] destinations = brokerService.getRegionBroker().getDestinations(); + for (ActiveMQDestination destination : destinations) { + LOG.info("Broker destination: " + destination.toString()); + } + } +}