Updated Branches: refs/heads/trunk 42d006f2b -> 4c38b03d1
https://issues.apache.org/jira/browse/AMQ-5035 Fix and test for variations in configuration settings that result in a non-region broker as the AdvisoryBroker's next instance. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4c38b03d Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4c38b03d Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4c38b03d Branch: refs/heads/trunk Commit: 4c38b03d14f2d8e803d79272ae08d79b52105959 Parents: 42d006f Author: Timothy Bish <[email protected]> Authored: Mon Feb 10 18:14:41 2014 -0500 Committer: Timothy Bish <[email protected]> Committed: Mon Feb 10 18:14:41 2014 -0500 ---------------------------------------------------------------------- .../activemq/advisory/AdvisoryBroker.java | 33 +++++- .../org/apache/activemq/bugs/AMQ5035Test.java | 82 ++++++++++++++ .../apache/activemq/bugs/amq5035/activemq.xml | 109 +++++++++++++++++++ 3 files changed, 222 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/4c38b03d/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index 5c90287..d48bf16 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerFilter; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.Destination; @@ -34,7 +35,21 @@ import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.TopicRegion; import org.apache.activemq.broker.region.TopicSubscription; -import org.apache.activemq.command.*; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.command.Command; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.DestinationInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.security.SecurityContext; import org.apache.activemq.state.ProducerState; import org.apache.activemq.usage.Usage; @@ -261,7 +276,21 @@ public class AdvisoryBroker extends BrokerFilter { @Override public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); - DurableTopicSubscription sub = ((TopicRegion) ((RegionBroker) next).getTopicRegion()).getDurableSubscription(key); + + RegionBroker regionBroker = null; + if (next instanceof RegionBroker) { + regionBroker = (RegionBroker) next; + } else { + BrokerService service = next.getBrokerService(); + regionBroker = (RegionBroker) service.getRegionBroker(); + } + + if (regionBroker == null) { + LOG.warn("Cannot locate a RegionBroker instance to pass along the removeSubscription call"); + throw new IllegalStateException("No RegionBroker found."); + } + + DurableTopicSubscription sub = ((TopicRegion) regionBroker.getTopicRegion()).getDurableSubscription(key); super.removeSubscription(context, info); http://git-wip-us.apache.org/repos/asf/activemq/blob/4c38b03d/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java new file mode 100644 index 0000000..a253bc8 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertNotNull; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.Topic; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.BrokerViewMBean; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AMQ5035Test { + + private static final String CLIENT_ID = "amq-test-client-id"; + private static final String DURABLE_SUB_NAME = "testDurable"; + + private final String xbean = "xbean:"; + private final String confBase = "src/test/resources/org/apache/activemq/bugs/amq5035"; + + private static BrokerService brokerService; + private String connectionUri; + + @Before + public void setUp() throws Exception { + brokerService = BrokerFactory.createBroker(xbean + confBase + "/activemq.xml"); + connectionUri = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString(); + brokerService.start(); + brokerService.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + + @Test + public void testFoo() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + Connection connection = factory.createConnection(); + connection.setClientID(CLIENT_ID); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic("Test.Topic"); + MessageConsumer consumer = session.createDurableSubscriber(topic, DURABLE_SUB_NAME); + consumer.close(); + + BrokerViewMBean brokerView = getBrokerView(DURABLE_SUB_NAME); + brokerView.destroyDurableSubscriber(CLIENT_ID, DURABLE_SUB_NAME); + } + + private BrokerViewMBean getBrokerView(String testDurable) throws MalformedObjectNameException { + ObjectName brokerName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost"); + BrokerViewMBean view = (BrokerViewMBean) brokerService.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true); + assertNotNull(view); + return view; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/4c38b03d/activemq-unit-tests/src/test/resources/org/apache/activemq/bugs/amq5035/activemq.xml ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/bugs/amq5035/activemq.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/bugs/amq5035/activemq.xml new file mode 100644 index 0000000..1bd5a42 --- /dev/null +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/bugs/amq5035/activemq.xml @@ -0,0 +1,109 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<!-- START SNIPPET: example --> +<beans + xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd + http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> + + <!-- + The <broker> element is used to configure the ActiveMQ broker. + --> + <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true"> + + <destinationPolicy> + <policyMap> + <policyEntries> + <policyEntry topic=">" > + <!-- The constantPendingMessageLimitStrategy is used to prevent + slow topic consumers to block producers and affect other consumers + by limiting the number of messages that are retained + For more information, see: + + http://activemq.apache.org/slow-consumer-handling.html + --> + <pendingMessageLimitStrategy> + <constantPendingMessageLimitStrategy limit="1000"/> + </pendingMessageLimitStrategy> + </policyEntry> + </policyEntries> + </policyMap> + </destinationPolicy> + + <!-- + The managementContext is used to configure how ActiveMQ is exposed in + JMX. By default, ActiveMQ uses the MBean server that is started by + the JVM. For more information, see: + + http://activemq.apache.org/jmx.html + --> + <managementContext> + <managementContext createConnector="false"/> + </managementContext> + + <!-- + Configure message persistence for the broker. The default persistence + mechanism is the KahaDB store (identified by the kahaDB tag). + For more information, see: + + http://activemq.apache.org/persistence.html + --> + <persistenceAdapter> + <kahaDB directory="${activemq.data}/kahadb"/> + </persistenceAdapter> + + + <!-- + The systemUsage controls the maximum amount of space the broker will + use before disabling caching and/or slowing down producers. For more information, see: + http://activemq.apache.org/producer-flow-control.html + --> + <systemUsage> + <systemUsage> + <memoryUsage> + <memoryUsage percentOfJvmHeap="70" /> + </memoryUsage> + <storeUsage> + <storeUsage limit="100 gb"/> + </storeUsage> + <tempUsage> + <tempUsage limit="50 gb"/> + </tempUsage> + </systemUsage> + </systemUsage> + + <!-- + The transport connectors expose ActiveMQ over a given protocol to + clients and other brokers. For more information, see: + + http://activemq.apache.org/configuring-transports.html + --> + <transportConnectors> + <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> + <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> + </transportConnectors> + + <!-- destroy the spring context on shutdown to stop jetty --> + <shutdownHooks> + <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" /> + </shutdownHooks> + + </broker> + +</beans> +<!-- END SNIPPET: example -->
