Repository: activemq Updated Branches: refs/heads/master e0c2c177c -> a439a0c6b
https://issues.apache.org/jira/browse/AMQ-5875 Fixing a regression that caused a network bridge to recreate durable demand improperly. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2117768e Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2117768e Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2117768e Branch: refs/heads/master Commit: 2117768e0a6c7bab0225f5ba4e960bfb443188c7 Parents: e0c2c17 Author: Christopher L. Shannon (cshannon) <[email protected]> Authored: Fri Jul 10 16:43:26 2015 +0000 Committer: gtully <[email protected]> Committed: Fri Jul 17 16:34:06 2015 +0100 ---------------------------------------------------------------------- .../activemq/network/DurableConduitBridge.java | 23 ++- .../kahadb/AbstractMultiKahaDBDeletionTest.java | 202 +++++++++++++++++++ .../kahadb/MultiKahaDBQueueDeletionTest.java | 91 +++++++++ .../kahadb/MultiKahaDBTopicDeletionTest.java | 176 ++-------------- 4 files changed, 326 insertions(+), 166 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/2117768e/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java index 621972c..afcb42d 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java @@ -18,6 +18,7 @@ package org.apache.activemq.network; import java.io.IOException; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; @@ -32,6 +33,7 @@ import org.slf4j.LoggerFactory; public class DurableConduitBridge extends ConduitBridge { private static final Logger LOG = LoggerFactory.getLogger(DurableConduitBridge.class); + @Override public String toString() { return "DurableConduitBridge:" + configuration.getBrokerName() + "->" + getRemoteBrokerName(); } @@ -52,6 +54,7 @@ public class DurableConduitBridge extends ConduitBridge { * Subscriptions for these destinations are always created * */ + @Override protected void setupStaticDestinations() { super.setupStaticDestinations(); ActiveMQDestination[] dests = configuration.isDynamicOnly() ? null : durableDestinations; @@ -60,11 +63,22 @@ public class DurableConduitBridge extends ConduitBridge { if (isPermissableDestination(dest) && !doesConsumerExist(dest)) { DemandSubscription sub = createDemandSubscription(dest); sub.setStaticallyIncluded(true); - if (dest.isTopic()) { - sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest)); - } try { - addSubscription(sub); + //Filtering by non-empty subscriptions, see AMQ-5875 + if (dest.isTopic()) { + sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest)); + for (Subscription subscription : this.getRegionSubscriptions(dest)) { + String clientId = subscription.getContext().getClientId(); + String subName = subscription.getConsumerInfo().getSubscriptionName(); + if (clientId != null && clientId.equals(sub.getLocalInfo().getClientId()) + && subName != null && subName.equals(sub.getLocalInfo().getSubscriptionName())) { + addSubscription(sub); + break; + } + } + } else { + addSubscription(sub); + } } catch (IOException e) { LOG.error("Failed to add static destination {}", dest, e); } @@ -74,6 +88,7 @@ public class DurableConduitBridge extends ConduitBridge { } } + @Override protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { if (addToAlreadyInterestedConsumers(info)) { return null; // don't want this subscription added http://git-wip-us.apache.org/repos/asf/activemq/blob/2117768e/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractMultiKahaDBDeletionTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractMultiKahaDBDeletionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractMultiKahaDBDeletionTest.java new file mode 100644 index 0000000..e1ba0ff --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractMultiKahaDBDeletionTest.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import javax.jms.JMSException; + +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.filefilter.WildcardFileFilter; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public abstract class AbstractMultiKahaDBDeletionTest { + protected static final Logger LOG = LoggerFactory + .getLogger(MultiKahaDBTopicDeletionTest.class); + + protected BrokerService brokerService; + protected Broker broker; + protected URI brokerConnectURI; + protected File storeDir; + protected ActiveMQDestination dest1; + protected ActiveMQDestination dest2; + + public AbstractMultiKahaDBDeletionTest(ActiveMQDestination dest1, ActiveMQDestination dest2) { + this.dest1 = dest1; + this.dest2 = dest2; + } + + @Rule + public TemporaryFolder tempTestDir = new TemporaryFolder(); + + @Before + public void startBroker() throws Exception { + setUpBroker(true); + } + + protected void setUpBroker(boolean clearDataDir) throws Exception { + brokerService = new BrokerService(); + this.initPersistence(brokerService); + // set up a transport + TransportConnector connector = brokerService + .addConnector(new TransportConnector()); + connector.setUri(new URI("tcp://0.0.0.0:0")); + connector.setName("tcp"); + + brokerService.start(); + brokerService.waitUntilStarted(); + brokerConnectURI = brokerService.getConnectorByName("tcp") + .getConnectUri(); + broker = brokerService.getBroker(); + } + + @After + public void stopBroker() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + + protected void initPersistence(BrokerService brokerService) + throws IOException { + storeDir = tempTestDir.getRoot(); + brokerService.setPersistent(true); + + // setup multi-kaha adapter + MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter(); + persistenceAdapter.setDirectory(storeDir); + + KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter(); + kahaStore.setJournalMaxFileLength(1024 * 512); + + // set up a store per destination + FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter(); + filtered.setPersistenceAdapter(kahaStore); + filtered.setPerDestination(true); + List<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<>(); + stores.add(filtered); + + persistenceAdapter.setFilteredPersistenceAdapters(stores); + brokerService.setPersistenceAdapter(persistenceAdapter); + } + + /** + * Test that a destination can be deleted and the other destination can still be subscribed to + * @throws Exception + */ + @Test + public void testDest1Deletion() throws Exception { + LOG.info("Creating {} first, {} second", dest1, dest2); + LOG.info("Removing {}, subscribing to {}", dest1, dest2); + + // Create two destinations + broker.addDestination(brokerService.getAdminConnectionContext(), dest1, false); + broker.addDestination(brokerService.getAdminConnectionContext(), dest2, false); + + // remove destination2 + broker.removeDestination(brokerService.getAdminConnectionContext(), dest1, 100); + + // try and create a consumer on dest2, before AMQ-5875 this + //would cause an IllegalStateException for Topics + createConsumer(dest2); + Collection<File> storeFiles = FileUtils.listFiles(storeDir, new WildcardFileFilter("db*"), getStoreFileFilter()); + assertTrue("Store index should still exist", storeFiles.size() >= 1); + } + + + @Test + public void testDest2Deletion() throws Exception { + LOG.info("Creating {} first, {} second", dest1, dest2); + LOG.info("Removing {}, subscribing to {}", dest2, dest1); + + // Create two destinations + broker.addDestination(brokerService.getAdminConnectionContext(), dest1, false); + broker.addDestination(brokerService.getAdminConnectionContext(), dest2, false); + + // remove destination2 + broker.removeDestination(brokerService.getAdminConnectionContext(), dest2, 100); + + // try and create a consumer on dest1, before AMQ-5875 this + //would cause an IllegalStateException for Topics + createConsumer(dest1); + Collection<File> storeFiles = FileUtils.listFiles(storeDir, new WildcardFileFilter("db*"), getStoreFileFilter()); + assertTrue("Store index should still exist", storeFiles.size() >= 1); + } + + + @Test + public void testStoreCleanupDeleteDest1First() throws Exception { + LOG.info("Creating {} first, {} second", dest1, dest2); + LOG.info("Deleting {} first, {} second", dest1, dest2); + + // Create two destinations + broker.addDestination(brokerService.getAdminConnectionContext(), dest1, false); + broker.addDestination(brokerService.getAdminConnectionContext(), dest2, false); + + // remove both destinations + broker.removeDestination(brokerService.getAdminConnectionContext(), dest1, 100); + broker.removeDestination(brokerService.getAdminConnectionContext(), dest2, 100); + + //Assert that with no more destinations attached to a store that it has been cleaned up + Collection<File> storeFiles = FileUtils.listFiles(storeDir, new WildcardFileFilter("db*"), getStoreFileFilter()); + assertEquals("Store files should be deleted", 0, storeFiles.size()); + + } + + @Test + public void testStoreCleanupDeleteDest2First() throws Exception { + LOG.info("Creating {} first, {} second", dest1, dest2); + LOG.info("Deleting {} first, {} second", dest2, dest1); + + // Create two destinations + broker.addDestination(brokerService.getAdminConnectionContext(), dest1, false); + broker.addDestination(brokerService.getAdminConnectionContext(), dest2, false); + + // remove both destinations + broker.removeDestination(brokerService.getAdminConnectionContext(), dest2, 100); + broker.removeDestination(brokerService.getAdminConnectionContext(), dest1, 100); + + //Assert that with no more destinations attached to a store that it has been cleaned up + Collection<File> storeFiles = FileUtils.listFiles(storeDir, new WildcardFileFilter("db*"), getStoreFileFilter()); + assertEquals("Store files should be deleted", 0, storeFiles.size()); + + } + + + protected abstract void createConsumer(ActiveMQDestination dest) throws JMSException; + + protected abstract WildcardFileFilter getStoreFileFilter(); + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/2117768e/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBQueueDeletionTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBQueueDeletionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBQueueDeletionTest.java new file mode 100644 index 0000000..f4499f3 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBQueueDeletionTest.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import java.util.Arrays; +import java.util.Collection; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.commons.io.filefilter.WildcardFileFilter; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * AMQ-5875 + * + * This test shows that when multiple destinations share a single KahaDB + * instance when using mKahaDB, that the deletion of one Queue will not cause + * the store to be deleted if another destination is still attached. This + * issue was related to Topics but this test makes sure Queues work as well. + * + * */ +@RunWith(Parameterized.class) +public class MultiKahaDBQueueDeletionTest extends AbstractMultiKahaDBDeletionTest { + + protected static final Logger LOG = LoggerFactory + .getLogger(MultiKahaDBTopicDeletionTest.class); + + protected static ActiveMQQueue QUEUE1 = new ActiveMQQueue("test.>"); + protected static ActiveMQQueue QUEUE2 = new ActiveMQQueue("test.t.queue"); + + @Parameters + public static Collection<Object[]> data() { + + //Test with queues created in different orders + return Arrays.asList(new Object[][] { + {QUEUE1, QUEUE2}, + {QUEUE2, QUEUE1} + }); + } + + public MultiKahaDBQueueDeletionTest(ActiveMQQueue dest1, ActiveMQQueue dest2) { + super(dest1, dest2); + } + + /* (non-Javadoc) + * @see org.apache.activemq.store.kahadb.AbstractMultiKahaDBDeletionTest#createConsumer(org.apache.activemq.command.ActiveMQDestination) + */ + @Override + protected void createConsumer(ActiveMQDestination dest) throws JMSException { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( + brokerConnectURI); + Connection connection = factory.createConnection(); + connection.setClientID("client1"); + connection.start(); + Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + session.createConsumer(dest); + } + + /* (non-Javadoc) + * @see org.apache.activemq.store.kahadb.AbstractMultiKahaDBDeletionTest#getStoreFileFilter() + */ + @Override + protected WildcardFileFilter getStoreFileFilter() { + return new WildcardFileFilter("queue*"); + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/2117768e/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBTopicDeletionTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBTopicDeletionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBTopicDeletionTest.java index 4380f5a..02f20c5 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBTopicDeletionTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBTopicDeletionTest.java @@ -16,32 +16,18 @@ */ package org.apache.activemq.store.kahadb; -import static org.junit.Assert.assertEquals; - -import java.io.File; -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.List; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Session; +import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.Broker; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTopic; -import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.WildcardFileFilter; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; @@ -58,21 +44,13 @@ import org.slf4j.LoggerFactory; * * */ @RunWith(Parameterized.class) -public class MultiKahaDBTopicDeletionTest { +public class MultiKahaDBTopicDeletionTest extends AbstractMultiKahaDBDeletionTest { protected static final Logger LOG = LoggerFactory .getLogger(MultiKahaDBTopicDeletionTest.class); - protected BrokerService brokerService; - protected Broker broker; - protected URI brokerConnectURI; - protected File storeDir; - protected ActiveMQTopic topic1; - protected ActiveMQTopic topic2; - protected static ActiveMQTopic TOPIC1 = new ActiveMQTopic("test.>"); protected static ActiveMQTopic TOPIC2 = new ActiveMQTopic("test.t.topic"); - @Parameters public static Collection<Object[]> data() { @@ -83,144 +61,13 @@ public class MultiKahaDBTopicDeletionTest { }); } - public MultiKahaDBTopicDeletionTest(ActiveMQTopic topic1, ActiveMQTopic topic2) { - this.topic1 = topic1; - this.topic2 = topic2; - } - - @Rule - public TemporaryFolder tempTestDir = new TemporaryFolder(); - - @Before - public void startBroker() throws Exception { - setUpBroker(true); - } - - protected void setUpBroker(boolean clearDataDir) throws Exception { - brokerService = new BrokerService(); - this.initPersistence(brokerService); - // set up a transport - TransportConnector connector = brokerService - .addConnector(new TransportConnector()); - connector.setUri(new URI("tcp://0.0.0.0:0")); - connector.setName("tcp"); - - brokerService.start(); - brokerService.waitUntilStarted(); - brokerConnectURI = brokerService.getConnectorByName("tcp") - .getConnectUri(); - broker = brokerService.getBroker(); - } - - @After - public void stopBroker() throws Exception { - brokerService.stop(); - brokerService.waitUntilStopped(); - } - - protected void initPersistence(BrokerService brokerService) - throws IOException { - storeDir = tempTestDir.getRoot(); - brokerService.setPersistent(true); - - // setup multi-kaha adapter - MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter(); - persistenceAdapter.setDirectory(storeDir); - - KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter(); - kahaStore.setJournalMaxFileLength(1024 * 512); - - // set up a store per destination - FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter(); - filtered.setPersistenceAdapter(kahaStore); - filtered.setPerDestination(true); - List<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<>(); - stores.add(filtered); - - persistenceAdapter.setFilteredPersistenceAdapters(stores); - brokerService.setPersistenceAdapter(persistenceAdapter); - } - - /** - * Test that a topic can be deleted and the other topic can still be subscribed to - * @throws Exception - */ - @Test - public void testTopic1Deletion() throws Exception { - LOG.info("Creating {} first, {} second", topic1, topic2); - LOG.info("Removing {}, subscribing to {}", topic1, topic2); - - // Create two topics - broker.addDestination(brokerService.getAdminConnectionContext(), topic1, false); - broker.addDestination(brokerService.getAdminConnectionContext(), topic2, false); - - // remove topic2 - broker.removeDestination(brokerService.getAdminConnectionContext(), topic1, 100); - - // try and create a subscription on topic2, before AMQ-5875 this - //would cause an IllegalStateException - createSubscriber(topic2); - } - - - @Test - public void testTopic2Deletion() throws Exception { - LOG.info("Creating {} first, {} second", topic1, topic2); - LOG.info("Removing {}, subscribing to {}", topic2, topic1); - - // Create two topics - broker.addDestination(brokerService.getAdminConnectionContext(), topic1, false); - broker.addDestination(brokerService.getAdminConnectionContext(), topic2, false); - - // remove topic2 - broker.removeDestination(brokerService.getAdminConnectionContext(), topic2, 100); - - // try and create a subscription on topic1, before AMQ-5875 this - //would cause an IllegalStateException - createSubscriber(topic1); - } - - - @Test - public void testStoreCleanupDeleteTopic1First() throws Exception { - LOG.info("Creating {} first, {} second", topic1, topic2); - LOG.info("Deleting {} first, {} second", topic1, topic2); - - // Create two topics - broker.addDestination(brokerService.getAdminConnectionContext(), topic1, false); - broker.addDestination(brokerService.getAdminConnectionContext(), topic2, false); - - // remove both topics - broker.removeDestination(brokerService.getAdminConnectionContext(), topic1, 100); - broker.removeDestination(brokerService.getAdminConnectionContext(), topic2, 100); - - //Assert that with no more destinations attached to a store that it has been cleaned up - Collection<File> storeFiles = FileUtils.listFiles(storeDir, new WildcardFileFilter("db*"), new WildcardFileFilter("topic*")); - assertEquals("Store files should be deleted", 0, storeFiles.size()); - - } - - @Test - public void testStoreCleanupDeleteTopic2First() throws Exception { - LOG.info("Creating {} first, {} second", topic1, topic2); - LOG.info("Deleting {} first, {} second", topic2, topic1); - - // Create two topics - broker.addDestination(brokerService.getAdminConnectionContext(), topic1, false); - broker.addDestination(brokerService.getAdminConnectionContext(), topic2, false); - - // remove both topics - broker.removeDestination(brokerService.getAdminConnectionContext(), topic2, 100); - broker.removeDestination(brokerService.getAdminConnectionContext(), topic1, 100); - - //Assert that with no more destinations attached to a store that it has been cleaned up - Collection<File> storeFiles = FileUtils.listFiles(storeDir, new WildcardFileFilter("db*"), new WildcardFileFilter("topic*")); - assertEquals("Store files should be deleted", 0, storeFiles.size()); - + public MultiKahaDBTopicDeletionTest(ActiveMQTopic dest1, + ActiveMQTopic dest2) { + super(dest1, dest2); } - - protected void createSubscriber(ActiveMQTopic topic) throws JMSException { + @Override + protected void createConsumer(ActiveMQDestination dest) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( brokerConnectURI); Connection connection = factory.createConnection(); @@ -228,7 +75,12 @@ public class MultiKahaDBTopicDeletionTest { connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - session.createDurableSubscriber(topic, "sub1"); + session.createDurableSubscriber((Topic) dest, "sub1"); + } + + @Override + protected WildcardFileFilter getStoreFileFilter() { + return new WildcardFileFilter("topic*"); } }
