Author: dejanb
Date: Mon Jun 29 10:41:30 2009
New Revision: 789283
URL: http://svn.apache.org/viewvc?rev=789283&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-2303 - durable subsciber
recovery
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=789283&r1=789282&r2=789283&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Mon Jun 29 10:41:30 2009
@@ -117,7 +117,6 @@
public void addSubscription(ConnectionContext context, final Subscription
sub) throws Exception {
- sub.add(context, this);
destinationStatistics.getConsumers().increment();
if (!sub.getConsumerInfo().isDurable()) {
@@ -133,6 +132,7 @@
try {
synchronized (consumers) {
+ sub.add(context, this);
consumers.add(sub);
}
subscriptionRecoveryPolicy.recover(context, this, sub);
@@ -143,10 +143,12 @@
} else {
synchronized (consumers) {
+ sub.add(context, this);
consumers.add(sub);
}
}
} else {
+ sub.add(context, this);
DurableTopicSubscription dsub = (DurableTopicSubscription)sub;
durableSubcribers.put(dsub.getSubscriptionKey(), dsub);
}
@@ -178,11 +180,7 @@
// we are recovering a subscription to avoid out of order messages.
dispatchValve.turnOff();
try {
-
- synchronized (consumers) {
- consumers.add(subscription);
- }
-
+
if (topicStore == null) {
return;
}
@@ -199,6 +197,10 @@
// Need to delete the subscription
topicStore.deleteSubscription(clientId, subscriptionName);
info = null;
+ } else {
+ synchronized (consumers) {
+ consumers.add(subscription);
+ }
}
}
// Do we need to create the subscription?
@@ -208,11 +210,15 @@
info.setSelector(selector);
info.setSubscriptionName(subscriptionName);
info.setDestination(getActiveMQDestination());
- // Thi destination is an actual destination id.
+ // This destination is an actual destination id.
info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
// This destination might be a pattern
-
topicStore.addSubsciption(info,subscription.getConsumerInfo().isRetroactive());
+ synchronized (consumers) {
+ consumers.add(subscription);
+
topicStore.addSubsciption(info,subscription.getConsumerInfo().isRetroactive());
+ }
}
+
final MessageEvaluationContext msgContext = new
NonCachedMessageEvaluationContext();
msgContext.setDestination(destination);
@@ -244,7 +250,6 @@
}
});
}
-
} finally {
dispatchValve.turnOn();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=789283&r1=789282&r2=789283&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
Mon Jun 29 10:41:30 2009
@@ -565,7 +565,6 @@
*/
private void recover() throws IllegalStateException, IOException {
referenceStoreAdapter.clearMessages();
- referenceStoreAdapter.recoverState();
Location pos = null;
int redoCounter = 0;
LOG.info("Journal Recovery Started from: " + asyncDataManager);
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java?rev=789283&r1=789282&r2=789283&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
Mon Jun 29 10:41:30 2009
@@ -136,11 +136,11 @@
topicConnectionFactory = new
ActiveMQConnectionFactory(CONNECTION_URL);
try {
- topic = new ActiveMQTopic(TOPIC_NAME);
- topicConnection = topicConnectionFactory.createTopicConnection();
- topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
- topicPublisher = topicSession.createPublisher(topic);
- message = topicSession.createMessage();
+ topic = new ActiveMQTopic(TOPIC_NAME);
+ topicConnection =
topicConnectionFactory.createTopicConnection();
+ topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
+ topicPublisher = topicSession.createPublisher(topic);
+ message = topicSession.createMessage();
} catch( Exception ex ) {
exceptions.add(ex);
}
@@ -174,13 +174,12 @@
} );
thread.start();
- LOG.info( "subscribed " + i + " of 100" );
}
Thread.sleep(5000);
broker.stop();
broker = createBroker(false);
- Thread.sleep(5000);
+ Thread.sleep(10000);
assertEquals(0, exceptions.size());
}