Author: dejanb
Date: Fri Dec 10 18:14:05 2010
New Revision: 1044465
URL: http://svn.apache.org/viewvc?rev=1044465&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3081 - Durable subscriptions are not
removed from mbean
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=1044465&r1=1044464&r2=1044465&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
Fri Dec 10 18:14:05 2010
@@ -266,6 +266,7 @@ public class ManagedRegionBroker extends
ObjectName inactiveName =
subscriptionKeys.get(subscriptionKey);
if (inactiveName != null) {
inactiveDurableTopicSubscribers.remove(inactiveName);
+ managementContext.unregisterMBean(inactiveName);
}
} catch (Exception e) {
LOG.error("Failed to unregister subscription " + sub, e);
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java?rev=1044465&r1=1044464&r2=1044465&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java
Fri Dec 10 18:14:05 2010
@@ -21,16 +21,20 @@ import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import javax.jms.Connection;
import javax.jms.Session;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
+import javax.management.*;
import java.io.File;
import java.lang.management.ManagementFactory;
+import java.util.List;
+
public class DurableSubscriptionUnsubscribeTest extends TestSupport {
@@ -63,142 +67,100 @@ public class DurableSubscriptionUnsubscr
}
public void doJMXUnsubscribe(boolean restart) throws Exception {
- for (int i = 0; i < 100; i++) {
- Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId" + i);
- session.close();
- }
+ createSubscriptions();
- Thread.sleep(2 * 1000);
+ Thread.sleep(1000);
+ assertCount(100, 0);
if (restart) {
- stopBroker();
- startBroker(false);
+ restartBroker();
+ assertCount(100, 0);
}
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- ObjectName[] subscriptions =
broker.getAdminView().getDurableTopicSubscribers();
- ObjectName[] inactive =
broker.getAdminView().getInactiveDurableTopicSubscribers();
+ ObjectName[] subs =
broker.getAdminView().getInactiveDurableTopicSubscribers();
- for (ObjectName subscription: subscriptions) {
- mbs.invoke(subscription, "destroy", null, null);
- }
- for (ObjectName subscription: inactive) {
- mbs.invoke(subscription, "destroy", null, null);
- }
+ for (int i = 0; i < subs.length; i++) {
+ ObjectName sub = subs[i];
+ mbs.invoke(sub, "destroy", null, null);
- Thread.sleep(2 * 1000);
+ if (i % 20 == 0) {
+ Thread.sleep(1000);
+ assertCount(100 - i - 1, 0);
+ }
+ }
- subscriptions = broker.getAdminView().getDurableTopicSubscribers();
- assertEquals(0, subscriptions.length);
+ Thread.sleep(1000);
+ assertCount(0, 0);
- subscriptions =
broker.getAdminView().getInactiveDurableTopicSubscribers();
- assertEquals(0, subscriptions.length);
+ if (restart) {
+ restartBroker();
+ assertCount(0, 0);
+ }
}
- public void testInactiveSubscriptions() throws Exception {
- Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId");
-
- ObjectName[] subscriptions =
broker.getAdminView().getDurableTopicSubscribers();
- assertEquals(1, subscriptions.length);
-
- subscriptions =
broker.getAdminView().getInactiveDurableTopicSubscribers();
- assertEquals(0, subscriptions.length);
-
- session.close();
-
- Thread.sleep(1000);
-
- subscriptions = broker.getAdminView().getDurableTopicSubscribers();
- assertEquals(0, subscriptions.length);
-
- subscriptions =
broker.getAdminView().getInactiveDurableTopicSubscribers();
- assertEquals(1, subscriptions.length);
-
- session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId");
-
- Thread.sleep(1000);
-
- subscriptions = broker.getAdminView().getDurableTopicSubscribers();
- assertEquals(1, subscriptions.length);
-
- subscriptions =
broker.getAdminView().getInactiveDurableTopicSubscribers();
- assertEquals(0, subscriptions.length);
-
- session.close();
- session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ public void doConnectionUnsubscribe(boolean restart) throws Exception {
+ createSubscriptions();
- Thread.sleep(1000);
+ Thread.sleep(1000);
+ assertCount(100, 0);
- subscriptions = broker.getAdminView().getDurableTopicSubscribers();
- assertEquals(0, subscriptions.length);
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId1");
- subscriptions =
broker.getAdminView().getInactiveDurableTopicSubscribers();
- assertEquals(1, subscriptions.length);
+ Thread.sleep(1000);
+ assertCount(100, 1);
- session.unsubscribe("SubsId");
+ Session session2 = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ session2.createDurableSubscriber(topic, "SubsId2");
- Thread.sleep(1000);
+ Thread.sleep(1000);
+ assertCount(100, 2);
- subscriptions = broker.getAdminView().getDurableTopicSubscribers();
- assertEquals(0, subscriptions.length);
+ session.close();
- subscriptions =
broker.getAdminView().getInactiveDurableTopicSubscribers();
- assertEquals(0, subscriptions.length);
+ Thread.sleep(1000);
+ assertCount(100, 1);
- session.close();
+ session2.close();
- }
-
- public void doConnectionUnsubscribe(boolean restart) throws Exception {
- for (int i = 0; i < 100; i++) {
- Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId" + i);
- session.close();
- }
-
- Thread.sleep(2 * 1000);
+ Thread.sleep(1000);
+ assertCount(100, 0);
if (restart) {
- stopBroker();
- startBroker(false);
+ restartBroker();
+ assertCount(100, 0);
}
- ObjectName[] subscriptions =
broker.getAdminView().getDurableTopicSubscribers();
- assertEquals(0, subscriptions.length);
-
- subscriptions =
broker.getAdminView().getInactiveDurableTopicSubscribers();
- assertEquals(100, subscriptions.length);
-
for (int i = 0; i < 100; i++) {
- Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
session.unsubscribe("SubsId" + i);
session.close();
- }
- Thread.sleep(2 * 1000);
+ if (i % 20 == 0) {
+ Thread.sleep(1000);
+ assertCount(100 - i - 1, 0);
+ }
+ }
- subscriptions = broker.getAdminView().getDurableTopicSubscribers();
- assertEquals(0, subscriptions.length);
+ Thread.sleep(1000);
+ assertCount(0, 0);
- subscriptions =
broker.getAdminView().getInactiveDurableTopicSubscribers();
- assertEquals(0, subscriptions.length);
+ if (restart) {
+ restartBroker();
+ assertCount(0, 0);
+ }
}
public void doDirectUnsubscribe(boolean restart) throws Exception {
- for (int i = 0; i < 100; i++) {
- Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId" + i);
- session.close();
- }
+ createSubscriptions();
- Thread.sleep(2 * 1000);
+ Thread.sleep(1000);
+ assertCount(100, 0);
if (restart) {
- stopBroker();
- startBroker(false);
+ restartBroker();
+ assertCount(100, 0);
}
for (int i = 0; i < 100; i++) {
@@ -209,19 +171,78 @@ public class DurableSubscriptionUnsubscr
context.setBroker(broker.getRegionBroker());
context.setClientId(getName());
broker.getRegionBroker().removeSubscription(context, info);
+
+ if (i % 20 == 0) {
+ assertCount(100 - i - 1, 0);
+ }
}
- Thread.sleep(2 * 1000);
+ assertCount(0, 0);
- ObjectName[] subscriptions =
broker.getAdminView().getDurableTopicSubscribers();
- assertEquals(0, subscriptions.length);
+ if (restart) {
+ restartBroker();
+ assertCount(0, 0);
+ }
+ }
+
+ private void createSubscriptions() throws Exception {
+ for (int i = 0; i < 100; i++) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId" + i);
+ session.close();
+ }
+ }
+
+
+ private void assertCount(int all, int active) throws Exception {
+ int inactive = all - active;
+ // broker check
+ Destination destination = broker.getDestination(topic);
+ List<Subscription> subs = destination.getConsumers();
+ int cActive = 0, cInactive = 0;
+ for (Subscription sub: subs) {
+ if (sub instanceof DurableTopicSubscription) {
+ DurableTopicSubscription durable = (DurableTopicSubscription)
sub;
+ if (durable.isActive())
+ cActive++;
+ else
+ cInactive++;
+ }
+ }
+ assertEquals(active, cActive);
+ assertEquals(inactive, cInactive);
+
+ // admin view
+ ObjectName[] subscriptions =
broker.getAdminView().getDurableTopicSubscribers();
+ assertEquals(active, subscriptions.length);
subscriptions =
broker.getAdminView().getInactiveDurableTopicSubscribers();
- assertEquals(0, subscriptions.length);
+ assertEquals(inactive, subscriptions.length);
+
+ // check the strange false MBean
+ if (all == 0)
+ assertEquals(0, countMBean());
+ }
+
+ private int countMBean() throws MalformedObjectNameException,
InstanceNotFoundException {
+ int count = 0;
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ for (int i = 0; i < 100; i++) {
+ String name = "org.apache.activemq:BrokerName=" + getName() +
",Type=Subscription,active=false,name=" + getName() + "_SubsId" + i;
+ ObjectName sub = new ObjectName(name);
+ try {
+ ObjectInstance oi = mbs.getObjectInstance(sub);
+ count++;
+ }
+ catch (InstanceNotFoundException ignore) {
+ // this should happen
+ }
+ }
+ return count;
}
private void startBroker(boolean deleteMessages) throws Exception {
- broker = BrokerFactory.createBroker("broker:(vm://localhost)");
+ broker = BrokerFactory.createBroker("broker:(vm://" + getName() + ")");
broker.setUseJmx(true);
broker.setBrokerName(getName());
@@ -233,7 +254,11 @@ public class DurableSubscriptionUnsubscr
broker.setDeleteAllMessagesOnStartup(true);
}
+
+ broker.setKeepDurableSubsActive(true);
+
broker.start();
+ broker.waitUntilStarted();
connection = createConnection();
}
@@ -243,11 +268,18 @@ public class DurableSubscriptionUnsubscr
connection.close();
connection = null;
- if (broker != null)
+ if (broker != null) {
broker.stop();
+ broker.waitUntilStopped();
+ }
broker = null;
}
+ private void restartBroker() throws Exception {
+ stopBroker();
+ startBroker(false);
+ }
+
protected ActiveMQConnectionFactory createConnectionFactory() throws
Exception {
return new ActiveMQConnectionFactory("vm://" + getName() +
"?waitForStart=5000&create=false");
}
@@ -273,4 +305,4 @@ public class DurableSubscriptionUnsubscr
rc.start();
return rc;
}
-}
+}
\ No newline at end of file