Author: chirino
Date: Tue Feb 14 20:35:12 2006
New Revision: 377936
URL: http://svn.apache.org/viewcvs?rev=377936&view=rev
Log:
Fixed a few of the broken durable subscription tests.
Modified:
incubator/activemq/trunk/activemq-core/project.xml
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/SubscriptionKey.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
Modified: incubator/activemq/trunk/activemq-core/project.xml
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/project.xml?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/project.xml (original)
+++ incubator/activemq/trunk/activemq-core/project.xml Tue Feb 14 20:35:12 2006
@@ -347,6 +347,9 @@
<!-- http://jira.activemq.org/jira/browse/AMQ-522 -->
<exclude>**/ProxyConnectorTest.*</exclude>
+ <!-- http://jira.activemq.org/jira/browse/AMQ-560 -->
+ <exclude>**/FanoutTransportBrokerTest.*</exclude>
+
</excludes>
</unitTest>
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
Tue Feb 14 20:35:12 2006
@@ -50,8 +50,8 @@
super.destroySubscription(sub);
}
- protected Destination createDestination(ActiveMQDestination destination)
throws Throwable {
- Destination rc = super.createDestination(destination);
+ protected Destination createDestination(ConnectionContext context,
ActiveMQDestination destination) throws Throwable {
+ Destination rc = super.createDestination(context, destination);
regionBroker.register(destination, rc);
return rc;
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
Tue Feb 14 20:35:12 2006
@@ -48,8 +48,8 @@
super.destroySubscription(sub);
}
- protected Destination createDestination(ActiveMQDestination destination)
throws Throwable {
- Destination rc = super.createDestination(destination);
+ protected Destination createDestination(ConnectionContext context,
ActiveMQDestination destination) throws Throwable {
+ Destination rc = super.createDestination(context, destination);
regionBroker.register(destination, rc);
return rc;
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
Tue Feb 14 20:35:12 2006
@@ -48,8 +48,8 @@
super.destroySubscription(sub);
}
- protected Destination createDestination(ActiveMQDestination destination)
throws Throwable {
- Destination rc = super.createDestination(destination);
+ protected Destination createDestination(ConnectionContext context,
ActiveMQDestination destination) throws Throwable {
+ Destination rc = super.createDestination(context, destination);
regionBroker.register(destination, rc);
return rc;
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
Tue Feb 14 20:35:12 2006
@@ -50,8 +50,8 @@
super.destroySubscription(sub);
}
- protected Destination createDestination(ActiveMQDestination destination)
throws Throwable {
- Destination rc = super.createDestination(destination);
+ protected Destination createDestination(ConnectionContext context,
ActiveMQDestination destination) throws Throwable {
+ Destination rc = super.createDestination(context, destination);
regionBroker.register(destination, rc);
return rc;
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
Tue Feb 14 20:35:12 2006
@@ -73,7 +73,7 @@
public Destination addDestination(ConnectionContext context,
ActiveMQDestination destination) throws Throwable {
log.debug("Adding destination: "+destination);
- Destination dest = createDestination(destination);
+ Destination dest = createDestination(context, destination);
dest.start();
synchronized(destinationsMutex){
destinations.put(destination,dest);
@@ -241,7 +241,7 @@
}
protected abstract Subscription createSubscription(ConnectionContext
context, ConsumerInfo info) throws Throwable;
- abstract protected Destination createDestination(ActiveMQDestination
destination) throws Throwable;
+ abstract protected Destination createDestination(ConnectionContext
context, ActiveMQDestination destination) throws Throwable;
public boolean isAutoCreateDestinations() {
return autoCreateDestinations;
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Tue Feb 14 20:35:12 2006
@@ -27,76 +27,66 @@
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.util.SubscriptionKey;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
public class DurableTopicSubscription extends PrefetchSubscription {
- final protected String clientId;
- final protected String subscriptionName;
- final ConcurrentHashMap redeliveredMessages = new ConcurrentHashMap();
-
- boolean active=true;
- boolean recovered=true;
+ private final ConcurrentHashMap redeliveredMessages = new
ConcurrentHashMap();
+ private final ConcurrentHashMap destinations = new ConcurrentHashMap();
+ private final SubscriptionKey subscriptionKey;
+ private boolean active=false;
public DurableTopicSubscription(Broker broker,ConnectionContext context,
ConsumerInfo info) throws InvalidSelectorException {
super(broker,context, info);
- this.clientId = context.getClientId();
- this.subscriptionName = info.getSubcriptionName();
+ subscriptionKey = new SubscriptionKey(context.getClientId(),
info.getSubcriptionName());
}
- public DurableTopicSubscription(Broker broker,SubscriptionInfo info)
throws InvalidSelectorException {
- super(broker,null, createFakeConsumerInfo(info));
- this.clientId = info.getClientId();
- this.subscriptionName = info.getSubcriptionName();
- active=false;
- recovered=false;
- }
-
- private static ConsumerInfo createFakeConsumerInfo(SubscriptionInfo info) {
- ConsumerInfo rc = new ConsumerInfo();
- rc.setSelector(info.getSelector());
- rc.setSubcriptionName(info.getSubcriptionName());
- rc.setDestination(info.getDestination());
- return rc;
- }
-
synchronized public boolean isActive() {
return active;
}
- synchronized public boolean isRecovered() {
- return recovered;
- }
protected boolean isFull() {
return !active || super.isFull();
}
synchronized public void gc() {
- if( !active && recovered ) {
- recovered = false;
-
- for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
- MessageReference node = (MessageReference) iter.next();
- // node.decrementTargetCount();
- iter.remove();
+ }
+
+ synchronized public void add(ConnectionContext context, Destination
destination) throws Throwable {
+ super.add(context, destination);
+ destinations.put(destination.getActiveMQDestination(), destination);
+ if( active ) {
+ Topic topic = (Topic) destination;
+ topic.activate(context, this);
+ }
+ }
+
+ synchronized public void activate(ConnectionContext context, ConsumerInfo
info) throws Throwable {
+ if( !active ) {
+ this.active = true;
+ this.context = context;
+ this.info = info;
+ for (Iterator iter = destinations.values().iterator();
iter.hasNext();) {
+ Topic topic = (Topic) iter.next();
+ topic.activate(context, this);
}
-
- for (Iterator iter = matched.iterator(); iter.hasNext();) {
- MessageReference node = (MessageReference) iter.next();
- // node.decrementTargetCount();
- iter.remove();
+ if( !isFull() ) {
+ dispatchMatched();
}
-
- delivered=0;
}
}
- synchronized public void deactivate() {
+ synchronized public void deactivate() throws Throwable {
active=false;
+ for (Iterator iter = destinations.values().iterator();
iter.hasNext();) {
+ Topic topic = (Topic) iter.next();
+ topic.deactivate(context, this);
+ }
for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
-
+
+ // Mark the dispatched messages as redelivered for next time.
MessageReference node = (MessageReference) iter.next();
Integer count = (Integer)
redeliveredMessages.get(node.getMessageId());
if( count !=null ) {
@@ -105,32 +95,16 @@
redeliveredMessages.put(node.getMessageId(), new Integer(1));
}
- // Undo the dispatch.
- matched.addFirst(node);
iter.remove();
}
+ for (Iterator iter = matched.iterator(); iter.hasNext();) {
+ MessageReference node = (MessageReference) iter.next();
+ // node.decrementTargetCount();
+ iter.remove();
+ }
delivered=0;
}
- synchronized public void activate(ConnectionContext context, ConsumerInfo
info) throws Throwable {
- if( !active ) {
- this.active = true;
- this.context = context;
- this.info = info;
- if( !recovered ) {
- recovered=true;
- for (Iterator iter = destinations.iterator(); iter.hasNext();)
{
- Topic topic = (Topic) iter.next();
- topic.recover(context, this, false);
- }
- } else {
- if( !isFull() ) {
- dispatchMatched();
- }
- }
- }
- }
-
protected MessageDispatch createMessageDispatch(MessageReference node,
Message message) {
MessageDispatch md = super.createMessageDispatch(node, message);
Integer count = (Integer) redeliveredMessages.get(node.getMessageId());
@@ -141,7 +115,9 @@
}
synchronized public void add(MessageReference node) throws Throwable {
- assert recovered;
+ if( !active ) {
+ return;
+ }
node = new IndirectMessageReference(node.getRegionDestination(),
(Message) node);
super.add(node);
node.decrementReferenceCount();
@@ -152,7 +128,6 @@
}
public synchronized void acknowledge(ConnectionContext context, MessageAck
ack) throws Throwable {
- assert recovered;
super.acknowledge(context, ack);
}
@@ -163,7 +138,7 @@
}
public String getSubscriptionName() {
- return subscriptionName;
+ return subscriptionKey.getSubscriptionName();
}
public String toString() {
@@ -177,7 +152,11 @@
}
public String getClientId() {
- return clientId;
+ return subscriptionKey.getClientId();
+ }
+
+ public SubscriptionKey getSubscriptionKey() {
+ return subscriptionKey;
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Tue Feb 14 20:35:12 2006
@@ -37,6 +37,7 @@
* @version $Revision: 1.15 $
*/
abstract public class PrefetchSubscription extends AbstractSubscription{
+
static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
final protected LinkedList matched=new LinkedList();
final protected LinkedList dispatched=new LinkedList();
@@ -44,13 +45,17 @@
int preLoadLimit=1024*100;
int preLoadSize=0;
boolean dispatching=false;
-
+
+ long enqueueCounter;
+ long dispatchCounter;
+
public PrefetchSubscription(Broker broker,ConnectionContext
context,ConsumerInfo info)
throws InvalidSelectorException{
super(broker,context,info);
}
synchronized public void add(MessageReference node) throws Throwable{
+ enqueueCounter++;
if(!isFull()&&!isSlaveBroker()){
dispatch(node);
}else{
@@ -244,8 +249,9 @@
}
// Make sure we can dispatch a message.
if(canDispatch(node)&&!isSlaveBroker()){
+ dispatchCounter++;
MessageDispatch md=createMessageDispatch(node,message);
- dispatched.addLast(node);
+ dispatched.addLast(node);
incrementPreloadSize(node.getMessage().getSize());
if(info.isDispatchAsync()){
md.setConsumer(new Runnable(){
@@ -325,4 +331,13 @@
*/
protected void acknowledge(ConnectionContext context,final MessageAck
ack,final MessageReference node)
throws IOException{}
+
+
+ public long getDispatchCounter() {
+ return dispatchCounter;
+ }
+
+ public long getEnqueueCounter() {
+ return enqueueCounter;
+ }
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
Tue Feb 14 20:35:12 2006
@@ -54,7 +54,7 @@
// Implementation methods
//
-------------------------------------------------------------------------
- protected Destination createDestination(ActiveMQDestination destination)
throws Throwable {
+ protected Destination createDestination(ConnectionContext context,
ActiveMQDestination destination) throws Throwable {
MessageStore store =
persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination);
Queue queue = new Queue(destination, memoryManager, store,
destinationStatistics, taskRunnerFactory);
configureQueue(queue, destination);
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
Tue Feb 14 20:35:12 2006
@@ -38,7 +38,7 @@
setAutoCreateDestinations(false);
}
- protected Destination createDestination(ActiveMQDestination destination)
throws Throwable {
+ protected Destination createDestination(ConnectionContext context,
ActiveMQDestination destination) throws Throwable {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)
destination;
return new Queue(destination, memoryManager, null,
destinationStatistics, taskRunnerFactory) {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
Tue Feb 14 20:35:12 2006
@@ -37,7 +37,7 @@
setAutoCreateDestinations(false);
}
- protected Destination createDestination(ActiveMQDestination destination)
throws Throwable {
+ protected Destination createDestination(ConnectionContext context,
ActiveMQDestination destination) throws Throwable {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)
destination;
return new Topic(destination, null, memoryManager,
destinationStatistics, taskRunnerFactory) {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Tue Feb 14 20:35:12 2006
@@ -42,8 +42,8 @@
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.SubscriptionKey;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
/**
* The Topic is a destination that sends a copy of a message to every active
@@ -64,7 +64,7 @@
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy = new
FixedCountSubscriptionRecoveryPolicy();
private boolean sendAdvisoryIfNoConsumers;
private DeadLetterStrategy deadLetterStrategy = new
SharedDeadLetterStrategy();
- private AtomicInteger durableSubscriberCounter = new AtomicInteger();
+ private final ConcurrentHashMap durableSubcribers = new
ConcurrentHashMap();
public Topic(ActiveMQDestination destination, TopicMessageStore store,
UsageManager memoryManager, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory) {
@@ -72,15 +72,6 @@
this.destination = destination;
this.store = store;
this.usageManager = memoryManager;
-
- // TODO: switch back when cache is working again.
- // this.cache = cache;
- // destinationStatistics.setMessagesCached(cache.getMessagesCached());
- // CacheEvictionUsageListener listener = new
- // CacheEvictionUsageListener(memoryManager, 90, 50, taskFactory);
- // listener.add(cache);
- // this.memoryManager.addUsageListener(listener);
-
this.destinationStatistics.setParent(parentStats);
}
@@ -89,142 +80,147 @@
}
public void addSubscription(ConnectionContext context, final Subscription
sub) throws Throwable {
- destinationStatistics.getConsumers().increment();
+
sub.add(context, this);
- if (sub.getConsumerInfo().isDurable()) {
- recover(context, (DurableTopicSubscription) sub, true);
- }
- else {
- recover(context, sub);
- }
- }
- /**
- * Used to recover the message list non durable subscriptions. Recovery
only happens if the consumer is
- * retroactive.
- *
- * @param context
- * @param sub
- * @throws Throwable
- */
- private void recover(ConnectionContext context, final Subscription sub)
throws Throwable {
- if (sub.getConsumerInfo().isRetroactive()) {
+ if ( !sub.getConsumerInfo().isDurable() ) {
+
+ destinationStatistics.getConsumers().increment();
- // synchronize with dispatch method so that no new messages are
sent
- // while we are recovering a subscription to avoid out of order
messages.
- dispatchValve.turnOff();
- try {
+ // Do a retroactive recovery if needed.
+ if (sub.getConsumerInfo().isRetroactive()) {
+ // synchronize with dispatch method so that no new messages
are sent
+ // while we are recovering a subscription to avoid out of
order messages.
+ dispatchValve.turnOff();
+ try {
+
+ synchronized(consumers) {
+ consumers.add(sub);
+ }
+ subscriptionRecoveryPolicy.recover(context, this, sub);
+
+ } finally {
+ dispatchValve.turnOn();
+ }
+
+ } else {
synchronized(consumers) {
consumers.add(sub);
}
- subscriptionRecoveryPolicy.recover(context, this, sub);
-
- } finally {
- dispatchValve.turnOn();
- }
-
+ }
} else {
+ DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
+ durableSubcribers.put(dsub.getSubscriptionKey(), dsub);
+ }
+ }
+
+ public void removeSubscription(ConnectionContext context, Subscription
sub) throws Throwable {
+ if ( !sub.getConsumerInfo().isDurable() ) {
+ destinationStatistics.getConsumers().decrement();
synchronized(consumers) {
- consumers.add(sub);
+ consumers.remove(sub);
}
}
+ sub.remove(context, this);
}
-
- /**
- * Used to recover the message list for a durable subscription.
- *
- * @param context
- * @param sub
- * @param initialActivation
- * @throws Throwable
- */
- public void recover(ConnectionContext context, final
DurableTopicSubscription sub, boolean initialActivation) throws Throwable {
-
+
+ public void addInactiveSubscription(ConnectionContext context,
DurableTopicSubscription sub) throws Throwable {
+ sub.add(context, this);
+ destinationStatistics.getConsumers().increment();
+ durableSubcribers.put(sub.getSubscriptionKey(), sub);
+ }
+
+ public void deleteSubscription(ConnectionContext context, SubscriptionKey
key) throws IOException {
+ if (store != null) {
+ store.deleteSubscription(key.clientId, key.subscriptionName);
+ durableSubcribers.remove(key);
+ destinationStatistics.getConsumers().decrement();
+ }
+ }
+
+ public void activate(ConnectionContext context, final
DurableTopicSubscription subscription) throws Throwable {
+
// synchronize with dispatch method so that no new messages are sent
// while
// we are recovering a subscription to avoid out of order messages.
dispatchValve.turnOff();
try {
-
- boolean persistenceWasOptimized = canOptimizeOutPersistence();
- if (initialActivation) {
- synchronized(consumers) {
- consumers.add(sub);
- durableSubscriberCounter.incrementAndGet();
+
+ synchronized(consumers) {
+ consumers.add(subscription);
+ }
+
+ if (store == null )
+ return;
+
+ // Recover the durable subscription.
+ String clientId = subscription.getClientId();
+ String subscriptionName = subscription.getSubscriptionName();
+ String selector = subscription.getConsumerInfo().getSelector();
+ SubscriptionInfo info = store.lookupSubscription(clientId,
subscriptionName);
+ if (info != null) {
+ // Check to see if selector changed.
+ String s1 = info.getSelector();
+ if (s1 == null ^ selector == null || (s1 != null &&
!s1.equals(selector))) {
+ // Need to delete the subscription
+ store.deleteSubscription(clientId, subscriptionName);
+ info = null;
}
}
-
- if (store != null) {
- String clientId = sub.getClientId();
- String subscriptionName = sub.getSubscriptionName();
- String selector = sub.getConsumerInfo().getSelector();
- SubscriptionInfo info = store.lookupSubscription(clientId,
subscriptionName);
- if (info != null) {
- // Check to see if selector changed.
- String s1 = info.getSelector();
- if (s1 == null ^ selector == null || (s1 != null &&
!s1.equals(selector))) {
- // Need to delete the subscription
- store.deleteSubscription(clientId, subscriptionName);
- info = null;
+ // Do we need to create the subscription?
+ if (info == null) {
+ store.addSubsciption(clientId, subscriptionName, selector,
subscription.getConsumerInfo().isRetroactive());
+ }
+
+ final MessageEvaluationContext msgContext = new
MessageEvaluationContext();
+ msgContext.setDestination(destination);
+ store.recoverSubscription(clientId, subscriptionName, new
MessageRecoveryListener() {
+ public void recoverMessage(Message message) throws Throwable {
+ message.setRegionDestination(Topic.this);
+ try {
+ msgContext.setMessageReference(message);
+ if (subscription.matches(message, msgContext)) {
+ subscription.add(message);
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ catch (IOException e) {
+ // TODO: Need to handle this better.
+ e.printStackTrace();
}
}
- // Do we need to create the subscription?
- if (info == null) {
- store.addSubsciption(clientId, subscriptionName, selector,
sub.getConsumerInfo().isRetroactive());
+
+ public void recoverMessageReference(String messageReference)
throws Throwable {
+ throw new RuntimeException("Should not be called.");
}
-
- if (sub.isRecovered()) {
- final MessageEvaluationContext msgContext = new
MessageEvaluationContext();
- msgContext.setDestination(destination);
- store.recoverSubscription(clientId, subscriptionName, new
MessageRecoveryListener() {
- public void recoverMessage(Message message) throws
Throwable {
- message.setRegionDestination(Topic.this);
- try {
- msgContext.setMessageReference(message);
- if (sub.matches(message, msgContext)) {
- sub.add(message);
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- catch (IOException e) {
- // TODO: Need to handle this better.
- e.printStackTrace();
- }
- }
-
- public void recoverMessageReference(String
messageReference) throws Throwable {
- throw new RuntimeException("Should not be
called.");
- }
- });
-
- if( initialActivation &&
sub.getConsumerInfo().isRetroactive() ) {
- // Then use the subscriptionRecoveryPolicy since there
will not be any messages in the persistent store.
- if( persistenceWasOptimized ) {
- subscriptionRecoveryPolicy.recover(context, this,
sub);
- } else {
- // TODO: implement something like
- //
subscriptionRecoveryPolicy.recoverNonPersistent(context, this, sub);
- }
- }
+ });
+
+ if( true && subscription.getConsumerInfo().isRetroactive() ) {
+ // If nothing was in the persistent store, then try to use the
recovery policy.
+ if( subscription.getEnqueueCounter() == 0 ) {
+ subscriptionRecoveryPolicy.recover(context, this,
subscription);
+ } else {
+ // TODO: implement something like
+ //
subscriptionRecoveryPolicy.recoverNonPersistent(context, this, sub);
}
}
-
+
}
finally {
dispatchValve.turnOn();
}
}
- public void removeSubscription(ConnectionContext context, Subscription
sub) throws Throwable {
- destinationStatistics.getConsumers().decrement();
- synchronized(consumers) {
+ public void deactivate(ConnectionContext context, DurableTopicSubscription
sub) throws Throwable {
+ synchronized(consumers) {
consumers.remove(sub);
}
sub.remove(context, this);
- }
+ }
+
public void send(final ConnectionContext context, final Message message)
throws Throwable {
@@ -259,18 +255,7 @@
}
private boolean canOptimizeOutPersistence() {
- return durableSubscriberCounter.get()==0;
- }
-
- public void createSubscription(SubscriptionKey key) {
- durableSubscriberCounter.incrementAndGet();
- }
-
- public void deleteSubscription(ConnectionContext context, SubscriptionKey
key) throws IOException {
- if (store != null) {
- store.deleteSubscription(key.clientId, key.subscriptionName);
- durableSubscriberCounter.decrementAndGet();
- }
+ return durableSubcribers.size()==0;
}
public String toString() {
@@ -423,5 +408,6 @@
}
}
}
+
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
Tue Feb 14 20:35:12 2006
@@ -86,19 +86,21 @@
super.removeConsumer(context, sub.getConsumerInfo());
super.addConsumer(context, info);
-
+ sub = (DurableTopicSubscription)
durableSubscriptions.get(key);
}
else {
// Change the consumer id key of the durable sub.
if( sub.getConsumerInfo().getConsumerId()!=null )
subscriptions.remove(sub.getConsumerInfo().getConsumerId());
subscriptions.put(info.getConsumerId(), sub);
- sub.activate(context, info);
}
}
else {
super.addConsumer(context, info);
+ sub = (DurableTopicSubscription) durableSubscriptions.get(key);
}
+
+ sub.activate(context, info);
}
else {
super.addConsumer(context, info);
@@ -145,7 +147,7 @@
// Implementation methods
//
-------------------------------------------------------------------------
- protected Destination createDestination(ActiveMQDestination destination)
throws Throwable {
+ protected Destination createDestination(ConnectionContext context,
ActiveMQDestination destination) throws Throwable {
TopicMessageStore store =
persistenceAdapter.createTopicMessageStore((ActiveMQTopic) destination);
Topic topic = new Topic(destination, store, memoryManager,
destinationStatistics, taskRunnerFactory);
configureTopic(topic, destination);
@@ -154,13 +156,30 @@
if (store != null) {
SubscriptionInfo[] infos = store.getAllSubscriptions();
for (int i = 0; i < infos.length; i++) {
- log.info("Restoring durable subscription: "+infos[i]);
- createDurableSubscription(topic, infos[i]);
+
+ SubscriptionInfo info = infos[i];
+ log.debug("Restoring durable subscription: "+infos);
+ SubscriptionKey key = new SubscriptionKey(info);
+
+ // A single durable sub may be subscribing to multiple topics.
so it might exist already.
+ DurableTopicSubscription sub = (DurableTopicSubscription)
durableSubscriptions.get(key);
+ if( sub == null ) {
+ sub = (DurableTopicSubscription)
createSubscription(context, createInactiveConsumerInfo(info));
+ }
+ topic.addInactiveSubscription(context, sub);
}
}
return topic;
}
+
+ private static ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo
info) {
+ ConsumerInfo rc = new ConsumerInfo();
+ rc.setSelector(info.getSelector());
+ rc.setSubcriptionName(info.getSubcriptionName());
+ rc.setDestination(info.getDestination());
+ return rc;
+ }
protected void configureTopic(Topic topic, ActiveMQDestination
destination) {
if (policyMap != null) {
@@ -188,17 +207,6 @@
return new TopicSubscription(broker,context, info, memoryManager);
}
}
-
- public Subscription createDurableSubscription(Topic topic,
SubscriptionInfo info) throws Throwable {
- SubscriptionKey key = new SubscriptionKey(info.getClientId(),
info.getSubcriptionName());
- topic.createSubscription(key);
- DurableTopicSubscription sub = (DurableTopicSubscription)
durableSubscriptions.get(key);
- sub = new DurableTopicSubscription(broker,info);
- sub.add(null, topic);
- durableSubscriptions.put(key, sub);
- return sub;
- }
-
/**
*/
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/SubscriptionKey.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/SubscriptionKey.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/SubscriptionKey.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/SubscriptionKey.java
Tue Feb 14 20:35:12 2006
@@ -16,17 +16,26 @@
*/
package org.apache.activemq.util;
+import org.apache.activemq.command.SubscriptionInfo;
+
public class SubscriptionKey {
+
public final String clientId;
public final String subscriptionName;
private final int hashValue;
+
+ public SubscriptionKey(SubscriptionInfo info) {
+ this(info.getClientId(), info.getSubcriptionName());
+ }
+
public SubscriptionKey(String clientId, String subscriptionName) {
this.clientId = clientId;
this.subscriptionName = subscriptionName;
hashValue = clientId.hashCode()^subscriptionName.hashCode();
}
-
+
+
public int hashCode() {
return hashValue;
}
@@ -42,5 +51,13 @@
public String toString() {
return clientId+":"+subscriptionName;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public String getSubscriptionName() {
+ return subscriptionName;
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRedeliveredTest.java?rev=377936&r1=377935&r2=377936&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
Tue Feb 14 20:35:12 2006
@@ -185,7 +185,10 @@
Topic topic = session.createTopic("topic-"+getName());
MessageConsumer consumer = session.createDurableSubscriber(topic,
"sub1");
- MessageProducer producer = createProducer(session, topic);
+ // This case only works with persistent messages since transient
messages
+ // are dropped when the consumer goes offline.
+ MessageProducer producer = session.createProducer(topic);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(createTextMessage(session));
// Consume the message...