Author: tabish
Date: Thu Apr 4 20:30:00 2013
New Revision: 1464729
URL: http://svn.apache.org/r1464729
Log:
fix and test for: https://issues.apache.org/jira/browse/AMQ-4356
The actual Durable subscription wasn't getting removed from the Store so on
restart they were recovered.
Added:
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java
(with props)
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=1464729&r1=1464728&r2=1464729&view=diff
==============================================================================
---
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
(original)
+++
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
Thu Apr 4 20:30:00 2013
@@ -19,6 +19,7 @@ package org.apache.activemq.broker.regio
import java.io.IOException;
import java.util.List;
import java.util.Set;
+
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -39,89 +40,108 @@ import org.apache.activemq.usage.Usage;
*/
public class DestinationFilter implements Destination {
- private final Destination next;
+ protected final Destination next;
public DestinationFilter(Destination next) {
this.next = next;
}
+ @Override
public void acknowledge(ConnectionContext context, Subscription sub,
MessageAck ack, MessageReference node) throws IOException {
next.acknowledge(context, sub, ack, node);
}
+ @Override
public void addSubscription(ConnectionContext context, Subscription sub)
throws Exception {
next.addSubscription(context, sub);
}
+ @Override
public Message[] browse() {
return next.browse();
}
+ @Override
public void dispose(ConnectionContext context) throws IOException {
next.dispose(context);
}
+ @Override
public boolean isDisposed() {
return next.isDisposed();
}
+ @Override
public void gc() {
next.gc();
}
+ @Override
public void markForGC(long timeStamp) {
next.markForGC(timeStamp);
}
+ @Override
public boolean canGC() {
return next.canGC();
}
+ @Override
public long getInactiveTimoutBeforeGC() {
return next.getInactiveTimoutBeforeGC();
}
+ @Override
public ActiveMQDestination getActiveMQDestination() {
return next.getActiveMQDestination();
}
+ @Override
public DeadLetterStrategy getDeadLetterStrategy() {
return next.getDeadLetterStrategy();
}
+ @Override
public DestinationStatistics getDestinationStatistics() {
return next.getDestinationStatistics();
}
+ @Override
public String getName() {
return next.getName();
}
+ @Override
public MemoryUsage getMemoryUsage() {
return next.getMemoryUsage();
}
- @Override
- public void setMemoryUsage(MemoryUsage memoryUsage) {
- next.setMemoryUsage(memoryUsage);
- }
+ @Override
+ public void setMemoryUsage(MemoryUsage memoryUsage) {
+ next.setMemoryUsage(memoryUsage);
+ }
+ @Override
public void removeSubscription(ConnectionContext context, Subscription
sub, long lastDeliveredSequenceId) throws Exception {
next.removeSubscription(context, sub, lastDeliveredSequenceId);
}
+ @Override
public void send(ProducerBrokerExchange context, Message messageSend)
throws Exception {
next.send(context, messageSend);
}
+ @Override
public void start() throws Exception {
next.start();
}
+ @Override
public void stop() throws Exception {
next.stop();
}
+ @Override
public List<Subscription> getConsumers() {
return next.getConsumers();
}
@@ -143,102 +163,127 @@ public class DestinationFilter implement
}
}
+ @Override
public MessageStore getMessageStore() {
return next.getMessageStore();
}
+ @Override
public boolean isProducerFlowControl() {
return next.isProducerFlowControl();
}
+ @Override
public void setProducerFlowControl(boolean value) {
next.setProducerFlowControl(value);
}
+ @Override
public boolean isAlwaysRetroactive() {
return next.isAlwaysRetroactive();
}
+ @Override
public void setAlwaysRetroactive(boolean value) {
next.setAlwaysRetroactive(value);
}
+ @Override
public void setBlockedProducerWarningInterval(long
blockedProducerWarningInterval) {
next.setBlockedProducerWarningInterval(blockedProducerWarningInterval);
}
+ @Override
public long getBlockedProducerWarningInterval() {
return next.getBlockedProducerWarningInterval();
}
+ @Override
public void addProducer(ConnectionContext context, ProducerInfo info)
throws Exception {
next.addProducer(context, info);
}
+ @Override
public void removeProducer(ConnectionContext context, ProducerInfo info)
throws Exception {
next.removeProducer(context, info);
}
+ @Override
public int getMaxAuditDepth() {
return next.getMaxAuditDepth();
}
+ @Override
public int getMaxProducersToAudit() {
return next.getMaxProducersToAudit();
}
+ @Override
public boolean isEnableAudit() {
return next.isEnableAudit();
}
+ @Override
public void setEnableAudit(boolean enableAudit) {
next.setEnableAudit(enableAudit);
}
+ @Override
public void setMaxAuditDepth(int maxAuditDepth) {
next.setMaxAuditDepth(maxAuditDepth);
}
+ @Override
public void setMaxProducersToAudit(int maxProducersToAudit) {
next.setMaxProducersToAudit(maxProducersToAudit);
}
+ @Override
public boolean isActive() {
return next.isActive();
}
+ @Override
public int getMaxPageSize() {
return next.getMaxPageSize();
}
+ @Override
public void setMaxPageSize(int maxPageSize) {
next.setMaxPageSize(maxPageSize);
}
+ @Override
public boolean isUseCache() {
return next.isUseCache();
}
+ @Override
public void setUseCache(boolean useCache) {
next.setUseCache(useCache);
}
+ @Override
public int getMinimumMessageSize() {
return next.getMinimumMessageSize();
}
+ @Override
public void setMinimumMessageSize(int minimumMessageSize) {
next.setMinimumMessageSize(minimumMessageSize);
}
+ @Override
public void wakeup() {
next.wakeup();
}
+ @Override
public boolean isLazyDispatch() {
return next.isLazyDispatch();
}
+ @Override
public void setLazyDispatch(boolean value) {
next.setLazyDispatch(value);
}
@@ -247,70 +292,87 @@ public class DestinationFilter implement
next.messageExpired(context, prefetchSubscription, node);
}
+ @Override
public boolean iterate() {
return next.iterate();
}
+ @Override
public void fastProducer(ConnectionContext context, ProducerInfo
producerInfo) {
next.fastProducer(context, producerInfo);
}
+ @Override
public void isFull(ConnectionContext context, Usage<?> usage) {
next.isFull(context, usage);
}
+ @Override
public void messageConsumed(ConnectionContext context, MessageReference
messageReference) {
next.messageConsumed(context, messageReference);
}
+ @Override
public void messageDelivered(ConnectionContext context, MessageReference
messageReference) {
next.messageDelivered(context, messageReference);
}
+ @Override
public void messageDiscarded(ConnectionContext context, Subscription sub,
MessageReference messageReference) {
next.messageDiscarded(context, sub, messageReference);
}
+ @Override
public void slowConsumer(ConnectionContext context, Subscription subs) {
next.slowConsumer(context, subs);
}
+ @Override
public void messageExpired(ConnectionContext context, Subscription subs,
MessageReference node) {
next.messageExpired(context, subs, node);
}
+ @Override
public int getMaxBrowsePageSize() {
return next.getMaxBrowsePageSize();
}
+ @Override
public void setMaxBrowsePageSize(int maxPageSize) {
next.setMaxBrowsePageSize(maxPageSize);
}
+ @Override
public void processDispatchNotification(MessageDispatchNotification
messageDispatchNotification) throws Exception {
next.processDispatchNotification(messageDispatchNotification);
}
+ @Override
public int getCursorMemoryHighWaterMark() {
return next.getCursorMemoryHighWaterMark();
}
+ @Override
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark);
}
+ @Override
public boolean isPrioritizedMessages() {
return next.isPrioritizedMessages();
}
+ @Override
public SlowConsumerStrategy getSlowConsumerStrategy() {
return next.getSlowConsumerStrategy();
}
+ @Override
public boolean isDoOptimzeMessageStorage() {
return next.isDoOptimzeMessageStorage();
}
+ @Override
public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
next.setDoOptimzeMessageStorage(doOptimzeMessageStorage);
}
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=1464729&r1=1464728&r2=1464729&view=diff
==============================================================================
---
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
(original)
+++
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
Thu Apr 4 20:30:00 2013
@@ -31,6 +31,7 @@ import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.virtual.VirtualTopicInterceptor;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConsumerId;
@@ -65,6 +66,7 @@ public class TopicRegion extends Abstrac
if
(broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule() != -1 &&
broker.getBrokerService().getOfflineDurableSubscriberTimeout() != -1) {
this.cleanupTimer = new Timer("ActiveMQ Durable Subscriber Cleanup
Timer", true);
this.cleanupTask = new TimerTask() {
+ @Override
public void run() {
doCleanup();
}
@@ -193,10 +195,12 @@ public class TopicRegion extends Abstrac
destinationsLock.readLock().lock();
try {
for (Destination dest : destinations.values()) {
- //Account for virtual destinations
if (dest instanceof Topic){
Topic topic = (Topic)dest;
topic.deleteSubscription(context, key);
+ } else if (dest instanceof VirtualTopicInterceptor) {
+ VirtualTopicInterceptor virtualTopic =
(VirtualTopicInterceptor) dest;
+ virtualTopic.getTopic().deleteSubscription(context, key);
}
}
} finally {
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java?rev=1464729&r1=1464728&r2=1464729&view=diff
==============================================================================
---
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
(original)
+++
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
Thu Apr 4 20:30:00 2013
@@ -19,6 +19,7 @@ package org.apache.activemq.broker.regio
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
+import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.Message;
@@ -27,15 +28,13 @@ import org.apache.activemq.util.LRUCache
/**
* A Destination which implements <a
* href="http://activemq.org/site/virtual-destinations.html">Virtual Topic</a>
- *
- *
*/
public class VirtualTopicInterceptor extends DestinationFilter {
- private String prefix;
- private String postfix;
- private boolean local;
- private LRUCache<ActiveMQDestination,ActiveMQQueue> cache = new
LRUCache<ActiveMQDestination,ActiveMQQueue>();
+ private final String prefix;
+ private final String postfix;
+ private final boolean local;
+ private final LRUCache<ActiveMQDestination,ActiveMQQueue> cache = new
LRUCache<ActiveMQDestination,ActiveMQQueue>();
public VirtualTopicInterceptor(Destination next, String prefix, String
postfix, boolean local) {
super(next);
@@ -44,6 +43,11 @@ public class VirtualTopicInterceptor ext
this.local = local;
}
+ public Topic getTopic() {
+ return (Topic) this.next;
+ }
+
+ @Override
public void send(ProducerBrokerExchange context, Message message) throws
Exception {
if (!message.isAdvisory() && !(local && message.getBrokerPath() !=
null)) {
ActiveMQDestination queueConsumers =
getQueueConsumersWildcard(message.getDestination());
Added:
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java?rev=1464729&view=auto
==============================================================================
---
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java
(added)
+++
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java
Thu Apr 4 20:30:00 2013
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ4356Test {
+
+ private static BrokerService brokerService;
+ private static String BROKER_ADDRESS = "tcp://localhost:0";
+
+ private String connectionUri;
+ private ActiveMQConnectionFactory cf;
+ private final String CLIENT_ID = "AMQ4356Test";
+ private final String SUBSCRIPTION_NAME = "AMQ4356Test";
+
+ private void createBroker(boolean deleteOnStart) throws Exception {
+ brokerService = new BrokerService();
+ brokerService.setUseJmx(true);
+ brokerService.setDeleteAllMessagesOnStartup(deleteOnStart);
+ connectionUri =
brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString();
+ brokerService.start();
+ brokerService.waitUntilStarted();
+
+ }
+
+ private void startBroker() throws Exception {
+ createBroker(true);
+ }
+
+ private void restartBroker() throws Exception {
+ brokerService.stop();
+ brokerService.waitUntilStopped();
+ createBroker(false);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ startBroker();
+ cf = new ActiveMQConnectionFactory(connectionUri);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ brokerService.stop();
+ brokerService.waitUntilStopped();
+ }
+
+ @Test
+ public void testVirtualTopicUnsubDurable() throws Exception {
+ Connection connection = cf.createConnection();
+ connection.setClientID(CLIENT_ID);
+ connection.start();
+
+ // create consumer 'cluster'
+ ActiveMQQueue queue1 = new
ActiveMQQueue(getVirtualTopicConsumerName());
+ ActiveMQQueue queue2 = new
ActiveMQQueue(getVirtualTopicConsumerName());
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer c1 = session.createConsumer(queue1);
+ c1.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ }
+ });
+ MessageConsumer c2 = session.createConsumer(queue2);
+ c2.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ }
+ });
+
+ ActiveMQTopic topic = new ActiveMQTopic(getVirtualTopicName());
+ MessageConsumer c3 = session.createDurableSubscriber(topic,
SUBSCRIPTION_NAME);
+
+ assertEquals(1,
brokerService.getAdminView().getDurableTopicSubscribers().length);
+ assertEquals(0,
brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+ c3.close();
+
+ // create topic producer
+ MessageProducer producer = session.createProducer(topic);
+ assertNotNull(producer);
+
+ int total = 10;
+ for (int i = 0; i < total; i++) {
+ producer.send(session.createTextMessage("message: " + i));
+ }
+
+ assertEquals(0,
brokerService.getAdminView().getDurableTopicSubscribers().length);
+ assertEquals(1,
brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+ session.unsubscribe(SUBSCRIPTION_NAME);
+ connection.close();
+
+ assertEquals(0,
brokerService.getAdminView().getDurableTopicSubscribers().length);
+ assertEquals(0,
brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+ restartBroker();
+
+ assertEquals(0,
brokerService.getAdminView().getDurableTopicSubscribers().length);
+ assertEquals(0,
brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+ }
+
+ protected String getVirtualTopicName() {
+ return "VirtualTopic.TEST";
+ }
+
+ protected String getVirtualTopicConsumerName() {
+ return "Consumer.A.VirtualTopic.TEST";
+ }
+}
Propchange:
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java
------------------------------------------------------------------------------
svn:eol-style = native