Author: rajdavies
Date: Thu Sep 4 11:10:45 2008
New Revision: 692181
URL: http://svn.apache.org/viewvc?rev=692181&view=rev
Log:
Added patch for https://issues.apache.org/activemq/browse/AMQ-1890
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=692181&r1=692180&r2=692181&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
Thu Sep 4 11:10:45 2008
@@ -45,6 +45,7 @@
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
@@ -412,6 +413,17 @@
if (subscriptionId == null && destination == null) {
throw new ProtocolException("Must specify the subscriptionId or
the destination you are unsubscribing from");
}
+
+ // check if it is a durable subscription
+ String durable =
command.getHeaders().get("activemq.subscriptionName");
+ if (durable != null) {
+ RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
+ info.setClientId(durable);
+ info.setSubscriptionName(durable);
+ info.setConnectionId(connectionId);
+ sendToActiveMQ(info, createResponseHandler(command));
+ return;
+ }
// TODO: Unsubscribing using a destination is a bit wierd if multiple
// subscriptions
@@ -426,7 +438,7 @@
return;
}
}
-
+
throw new ProtocolException("No subscription matched.");
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=692181&r1=692180&r2=692181&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Thu Sep 4 11:10:45 2008
@@ -35,11 +35,15 @@
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.commons.logging.Log;
@@ -857,6 +861,58 @@
stompConnection.sendFrame(frame);
}
+ public void testDurableUnsub() throws Exception {
+ // get broker JMX view
+ MBeanServer mbeanServer =
broker.getManagementContext().getMBeanServer();
+
+ String domain = "org.apache.activemq";
+ ObjectName brokerName = new ObjectName(domain +
":Type=Broker,BrokerName=localhost");
+
+ BrokerViewMBean view =
(BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
+
+ // connect
+ String frame = "CONNECT\n" + "login: system\n" + "passcode:
manager\nclient-id:test\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+ assertEquals(view.getDurableTopicSubscribers().length, 0);
+
+ // subscribe
+ frame = "SUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "\n"
+ "ack:auto\nactivemq.subscriptionName:test\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ // wait a bit for MBean to get refreshed
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e){}
+
+ assertEquals(view.getDurableTopicSubscribers().length, 1);
+ // disconnect
+ frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e){}
+
+ //reconnect
+ stompConnect();
+ // connect
+ frame = "CONNECT\n" + "login: system\n" + "passcode:
manager\nclient-id:test\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ // unsubscribe
+ frame = "UNSUBSCRIBE\n" + "destination:/topic/" + getQueueName() +
"\n" + "ack:auto\nactivemq.subscriptionName:test\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e){}
+ assertEquals(view.getDurableTopicSubscribers().length, 0);
+ }
+
protected void assertClients(int expected) throws Exception {
org.apache.activemq.broker.Connection[] clients =
broker.getBroker().getClients();
int actual = clients.length;