Author: aco
Date: Tue Jul 11 02:27:17 2006
New Revision: 420774
URL: http://svn.apache.org/viewvc?rev=420774&view=rev
Log:
Allow option to unsubscribe durable subscriptions after each run
Modified:
incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/properties/JmsConsumerProperties.java
Modified:
incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java?rev=420774&r1=420773&r2=420774&view=diff
==============================================================================
---
incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
(original)
+++
incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
Tue Jul 11 02:27:17 2006
@@ -88,6 +88,10 @@
incThroughput();
}
} finally {
+ if (client.isDurable() && client.isUnsubscribe()) {
+ log.info("Unsubscribing durable subscriber: " +
getClientName());
+ getSession().unsubscribe(getClientName());
+ }
getConnection().close();
}
}
@@ -108,6 +112,10 @@
recvCount++;
}
} finally {
+ if (client.isDurable() && client.isUnsubscribe()) {
+ log.info("Unsubscribing durable subscriber: " +
getClientName());
+ getSession().unsubscribe(getClientName());
+ }
getConnection().close();
}
}
@@ -132,6 +140,10 @@
throw new JMSException("JMS consumer thread sleep has been
interrupted. Message: " + e.getMessage());
}
} finally {
+ if (client.isDurable() && client.isUnsubscribe()) {
+ log.info("Unsubscribing durable subscriber: " +
getClientName());
+ getSession().unsubscribe(getClientName());
+ }
getConnection().close();
}
}
@@ -161,6 +173,10 @@
throw new JMSException("JMS consumer thread wait has been
interrupted. Message: " + e.getMessage());
}
} finally {
+ if (client.isDurable() && client.isUnsubscribe()) {
+ log.info("Unsubscribing durable subscriber: " +
getClientName());
+ getSession().unsubscribe(getClientName());
+ }
getConnection().close();
}
}
@@ -175,8 +191,9 @@
String clientName = getClientName();
if (clientName == null) {
clientName = "JmsConsumer";
+ setClientName(clientName);
}
- log.info("Creating durable subscriber (" +
getConnection().getClientID() + ") to: " + dest.toString());
+ log.info("Creating durable subscriber (" + clientName + ") to: " +
dest.toString());
jmsConsumer = getSession().createDurableSubscriber((Topic) dest,
clientName);
} else {
log.info("Creating non-durable consumer to: " + dest.toString());
@@ -190,8 +207,9 @@
String clientName = getClientName();
if (clientName == null) {
clientName = "JmsConsumer";
+ setClientName(clientName);
}
- log.info("Creating durable subscriber (" +
getConnection().getClientID() + ") to: " + dest.toString());
+ log.info("Creating durable subscriber (" + clientName + ") to: " +
dest.toString());
jmsConsumer = getSession().createDurableSubscriber((Topic) dest,
clientName, selector, noLocal);
} else {
log.info("Creating non-durable consumer to: " + dest.toString());
Modified:
incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/properties/JmsConsumerProperties.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/properties/JmsConsumerProperties.java?rev=420774&r1=420773&r2=420774&view=diff
==============================================================================
---
incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/properties/JmsConsumerProperties.java
(original)
+++
incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/properties/JmsConsumerProperties.java
Tue Jul 11 02:27:17 2006
@@ -20,6 +20,7 @@
public static final String COUNT_BASED_RECEIVING = "count"; // Receive a
specific count of messages
protected boolean durable = false; // Consumer is a durable subscriber
+ protected boolean unsubscribe = true; // If true, unsubscribe a durable
subscriber after it finishes running
protected boolean asyncRecv = true; // If true, use onMessage() to
receive messages, else use receive()
protected long recvCount = 1000000; // Receive a million messages
by default
@@ -32,6 +33,14 @@
public void setDurable(boolean durable) {
this.durable = durable;
+ }
+
+ public boolean isUnsubscribe() {
+ return unsubscribe;
+ }
+
+ public void setUnsubscribe(boolean unsubscribe) {
+ this.unsubscribe = unsubscribe;
}
public boolean isAsyncRecv() {