This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new ddfb36515 AMQ-9504 - Prevent registering duplicate mKahadb adapters
ddfb36515 is described below
commit ddfb36515c0e9588d2e322365f56a3f53fb094ad
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Wed May 22 09:22:01 2024 -0400
AMQ-9504 - Prevent registering duplicate mKahadb adapters
This fixes an issue on start up of a broker that is configured with
multiple mKahaDB filtered adapters and one is configured with
perDestination=true. Before this fix a duplicate persistence adapter
could be created because the filter did not check for existing matches.
Patch applied with thanks to Ritesh Adval
---
.../kahadb/MultiKahaDBPersistenceAdapter.java | 19 +--
.../MultiKahaDBMultipleFilteredAdapterTest.java | 157 +++++++++++++++++++++
2 files changed, 168 insertions(+), 8 deletions(-)
diff --git
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
index d852c1525..3f33d48f6 100644
---
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
+++
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.stream.Collectors;
import javax.transaction.xa.Xid;
import org.apache.activemq.broker.BrokerService;
@@ -88,7 +89,7 @@ public class MultiKahaDBPersistenceAdapter extends
LockableServiceSupport implem
};
final DelegateDestinationMap destinationMap = new DelegateDestinationMap();
- List<PersistenceAdapter> adapters = new
CopyOnWriteArrayList<PersistenceAdapter>();
+ List<PersistenceAdapter> adapters = new CopyOnWriteArrayList<>();
private File directory = new File(IOHelper.getDefaultDataDirectory() +
File.separator + "mKahaDB");
MultiKahaDBTransactionStore transactionStore = new
MultiKahaDBTransactionStore(this);
@@ -383,16 +384,18 @@ public class MultiKahaDBPersistenceAdapter extends
LockableServiceSupport implem
}
private void
findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template)
throws IOException {
- FileFilter destinationNames = new FileFilter() {
- @Override
- public boolean accept(File file) {
- return file.getName().startsWith("queue#") ||
file.getName().startsWith("topic#");
- }
- };
+ FileFilter destinationNames = file ->
+ file.getName().startsWith("queue#") ||
file.getName().startsWith("topic#");
+
File[] candidates =
template.getPersistenceAdapter().getDirectory().listFiles(destinationNames);
if (candidates != null) {
+ Set<File> existing =
adapters.stream().map(PersistenceAdapter::getDirectory).collect(
+ Collectors.toSet());
for (File candidate : candidates) {
- registerExistingAdapter(template, candidate);
+ if(!existing.contains(candidate)) {
+ LOG.debug("Adapter does not exist for dir: {} so will
register it", candidate);
+ registerExistingAdapter(template, candidate);
+ }
}
}
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MultiKahaDBMultipleFilteredAdapterTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MultiKahaDBMultipleFilteredAdapterTest.java
new file mode 100644
index 000000000..0ff9e9f2d
--- /dev/null
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MultiKahaDBMultipleFilteredAdapterTest.java
@@ -0,0 +1,157 @@
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+
+import jakarta.jms.Connection;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Session;
+import jakarta.jms.TextMessage;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MultiKahaDBMultipleFilteredAdapterTest {
+
+ private final static int maxFileLength = 1024*1024*32;
+ private static final String QUEUE_NAME =
"QUEUE.amqMultiKahadbMultiFilteredAdapter";
+ private static final String TOPIC_NAME =
"TOPIC.amqMultiKahadbMultiFilteredAdapter";
+ private static final int MESSAGE_COUNT = 100;
+
+ private BrokerService broker;
+ private ActiveMQConnectionFactory cf;
+
+ @Before
+ public void setUp() throws Exception {
+ prepareBrokerWithMultiStore(true);
+ TransportConnector connector =
broker.addConnector("tcp://localhost:0");
+ broker.start();
+ broker.waitUntilStarted();
+ cf = new ActiveMQConnectionFactory("failover://(" +
connector.getConnectUri() + ")");
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ broker.stop();
+ }
+
+ @Test
+ public void testTopicWildcardAndPerDestinationFilteredAdapter() throws
Exception {
+ //create one topic and queue and send messages
+ sendMessages(false);
+ sendMessages(true);
+
+ //the adapters will persist the data, topic will be by wildcard
filtered adapter
+ //and queue will be by per destionation adapter
+
+ //stop broker and restart, the bug is that on restart wildcard
filtered adapter adds one KahaDBPersistenceAdapter
+ // when calling
MultiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters
+ //and on the call to doStart of MultiKahaDBPersistenceAdapter it uses
per destination filtered adapter to add
+ //another KahaDBPersistenceAdapter and fails on jmx bean registration
+ // the fix is to make sure to check the persistent adapter already
exists or not when we do per destination
+ // filtered adapter processing and only add if one does not exists
already
+ var multiKaha = (MultiKahaDBPersistenceAdapter)
broker.getPersistenceAdapter();
+ // Verify 2 adapters created and not 3
+ assertEquals(2, multiKaha.getAdapters().size());
+ Set<File> dirs = multiKaha.getAdapters().stream().map(adapter ->
adapter.getDirectory()).collect(
+ Collectors.toSet());
+ broker.stop();
+ broker.waitUntilStopped();
+
+ //restart should succeed with fix
+ prepareBrokerWithMultiStore(false);
+ broker.start();
+ broker.waitUntilStarted();
+
+ // Verify 2 adapters created and not 3 and same dirs
+ multiKaha = (MultiKahaDBPersistenceAdapter)
broker.getPersistenceAdapter();
+ assertEquals(2, multiKaha.getAdapters().size());
+ assertEquals(dirs, multiKaha.getAdapters().stream().map(adapter ->
adapter.getDirectory()).collect(
+ Collectors.toSet()));
+ }
+
+ private void sendMessages(boolean isTopic) throws Exception {
+ Connection connection = cf.createConnection();
+ Session session = connection.createSession();
+ MessageProducer producer = isTopic ? session.createProducer(session
+ .createTopic(TOPIC_NAME)) : session.createProducer(session
+ .createQueue(QUEUE_NAME));
+
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ TextMessage message = session
+ .createTextMessage("Test message " + i);
+ producer.send(message);
+ }
+
+ producer.close();
+ session.close();
+ connection.close();
+
+ if(!isTopic) {
+ assertQueueLength(MESSAGE_COUNT);
+ }
+ }
+
+ private void assertQueueLength(int len) throws Exception, IOException {
+ Set<org.apache.activemq.broker.region.Destination> destinations =
broker.getBroker().getDestinations(
+ new ActiveMQQueue(QUEUE_NAME));
+ org.apache.activemq.broker.region.Queue queue = (Queue)
destinations.iterator().next();
+ assertEquals(len, queue.getMessageStore().getMessageCount());
+ }
+
+ protected BrokerService createBroker(PersistenceAdapter kaha) throws
Exception {
+ BrokerService broker = new BrokerService();
+ broker.setUseJmx(true);
+ broker.setBrokerName("localhost");
+ broker.setPersistenceAdapter(kaha);
+ return broker;
+ }
+
+ protected KahaDBPersistenceAdapter createStore(boolean delete) throws
IOException {
+ KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
+ kaha.setJournalMaxFileLength(maxFileLength);
+ kaha.setCleanupInterval(5000);
+ if (delete) {
+ kaha.deleteAllMessages();
+ }
+ return kaha;
+ }
+
+ public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws
Exception {
+ MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new
MultiKahaDBPersistenceAdapter();
+ if (deleteAllMessages) {
+ multiKahaDBPersistenceAdapter.deleteAllMessages();
+ }
+ ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new
ArrayList<>();
+
+ //have a topic wildcard filtered adapter
+ FilteredKahaDBPersistenceAdapter adapter1 = new
FilteredKahaDBPersistenceAdapter();
+ adapter1.setPersistenceAdapter(createStore(deleteAllMessages));
+ adapter1.setTopic(">");
+ adapters.add(adapter1);
+
+ //have another per destination filtered adapter
+ FilteredKahaDBPersistenceAdapter adapter2 = new
FilteredKahaDBPersistenceAdapter();
+ adapter2.setPersistenceAdapter(createStore(deleteAllMessages));
+ adapter2.setPerDestination(true);
+ adapters.add(adapter2);
+
+ multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
+ broker = createBroker(multiKahaDBPersistenceAdapter);
+ }
+}