Author: tabish
Date: Tue May 22 19:47:23 2012
New Revision: 1341601
URL: http://svn.apache.org/viewvc?rev=1341601&view=rev
Log:
Fix for: https://issues.apache.org/jira/browse/AMQ-3841
Ensure that the mKahaDB cleans up the per-destination kahaDB data when the
destination is deleted, and don't throw exceptions in cases where we find an
older one that has no destinations in it any longer.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3841Test.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java?rev=1341601&r1=1341600&r2=1341601&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
Tue May 22 19:47:23 2012
@@ -26,6 +26,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
@@ -176,6 +177,16 @@ public class MultiKahaDBPersistenceAdapt
}
}
+ private void stopAdapter(KahaDBPersistenceAdapter
kahaDBPersistenceAdapter, String destination) {
+ try {
+ kahaDBPersistenceAdapter.stop();
+ } catch (Exception e) {
+ RuntimeException detail = new RuntimeException("Failed to stop per
destination persistence adapter for destination: " + destination + ", options:"
+ adapters, e);
+ LOG.error(detail.toString(), e);
+ throw detail;
+ }
+ }
+
public TopicMessageStore createTopicMessageStore(ActiveMQTopic
destination) throws IOException {
PersistenceAdapter persistenceAdapter =
getMatchingPersistenceAdapter(destination);
return
transactionStore.proxy(persistenceAdapter.createTransactionStore(),
persistenceAdapter.createTopicMessageStore(destination));
@@ -218,11 +229,38 @@ public class MultiKahaDBPersistenceAdapt
}
public void removeQueueMessageStore(ActiveMQQueue destination) {
-
getMatchingPersistenceAdapter(destination).removeQueueMessageStore(destination);
+ PersistenceAdapter adapter =
getMatchingPersistenceAdapter(destination);
+ adapter.removeQueueMessageStore(destination);
+ if (adapter instanceof KahaDBPersistenceAdapter) {
+ adapter.removeQueueMessageStore(destination);
+ removeMessageStore((KahaDBPersistenceAdapter)adapter, destination);
+ }
}
public void removeTopicMessageStore(ActiveMQTopic destination) {
-
getMatchingPersistenceAdapter(destination).removeTopicMessageStore(destination);
+ PersistenceAdapter adapter =
getMatchingPersistenceAdapter(destination);
+ if (adapter instanceof KahaDBPersistenceAdapter) {
+ adapter.removeTopicMessageStore(destination);
+ removeMessageStore((KahaDBPersistenceAdapter)adapter, destination);
+ }
+ }
+
+ private void removeMessageStore(KahaDBPersistenceAdapter adapter,
ActiveMQDestination destination) {
+ if (adapter.getDestinations().isEmpty()) {
+ stopAdapter(adapter, destination.toString());
+ File adapterDir = adapter.getDirectory();
+ if (adapterDir != null) {
+ if (IOHelper.deleteFile(adapterDir)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.info("deleted per destination adapter directory
for: " + destination);
+ }
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.info("failed to deleted per destination adapter
directory for: " + destination);
+ }
+ }
+ }
+ }
}
public void rollbackTransaction(ConnectionContext context) throws
IOException {
@@ -280,7 +318,11 @@ public class MultiKahaDBPersistenceAdapt
private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter
filteredAdapter, File candidate) {
KahaDBPersistenceAdapter adapter =
adapterFromTemplate(filteredAdapter.getPersistenceAdapter(),
candidate.getName());
startAdapter(adapter, candidate.getName());
- registerAdapter(adapter, adapter.getDestinations().toArray(new
ActiveMQDestination[]{})[0]);
+ if (adapter.getDestinations().size() != 0) {
+ registerAdapter(adapter, adapter.getDestinations().toArray(new
ActiveMQDestination[]{})[0]);
+ } else {
+ stopAdapter(adapter, candidate.getName());
+ }
}
private FilteredKahaDBPersistenceAdapter
addAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter,
ActiveMQDestination destination) {
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3841Test.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3841Test.java?rev=1341601&r1=1341600&r2=1341601&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3841Test.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3841Test.java
Tue May 22 19:47:23 2012
@@ -85,6 +85,10 @@ public class AMQ3841Test {
prepareBrokerWithMultiStore(false);
broker.start();
+
+ broker.getAdminView().addQueue(destinationName);
+ assertNotNull(broker.getDestination(new
ActiveMQQueue(destinationName)));
+
}
protected KahaDBPersistenceAdapter createStore(boolean delete) throws
IOException {