This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new ddfb36515 AMQ-9504 - Prevent registering duplicate mKahadb adapters
ddfb36515 is described below

commit ddfb36515c0e9588d2e322365f56a3f53fb094ad
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Wed May 22 09:22:01 2024 -0400

    AMQ-9504 - Prevent registering duplicate mKahadb adapters
    
    This fixes an issue on start up of a broker that is configured with
    multiple mKahaDB filtered adapters and one is configured with
    perDestination=true. Before this fix a duplicate persistence adapter
    could be created because the filter did not check for existing matches.
    
    Patch applied with thanks to Ritesh Adval
---
 .../kahadb/MultiKahaDBPersistenceAdapter.java      |  19 +--
 .../MultiKahaDBMultipleFilteredAdapterTest.java    | 157 +++++++++++++++++++++
 2 files changed, 168 insertions(+), 8 deletions(-)

diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
index d852c1525..3f33d48f6 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 
+import java.util.stream.Collectors;
 import javax.transaction.xa.Xid;
 
 import org.apache.activemq.broker.BrokerService;
@@ -88,7 +89,7 @@ public class MultiKahaDBPersistenceAdapter extends 
LockableServiceSupport implem
     };
     final DelegateDestinationMap destinationMap = new DelegateDestinationMap();
 
-    List<PersistenceAdapter> adapters = new 
CopyOnWriteArrayList<PersistenceAdapter>();
+    List<PersistenceAdapter> adapters = new CopyOnWriteArrayList<>();
     private File directory = new File(IOHelper.getDefaultDataDirectory() + 
File.separator + "mKahaDB");
 
     MultiKahaDBTransactionStore transactionStore = new 
MultiKahaDBTransactionStore(this);
@@ -383,16 +384,18 @@ public class MultiKahaDBPersistenceAdapter extends 
LockableServiceSupport implem
     }
 
     private void 
findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template) 
throws IOException {
-        FileFilter destinationNames = new FileFilter() {
-            @Override
-            public boolean accept(File file) {
-                return file.getName().startsWith("queue#") || 
file.getName().startsWith("topic#");
-            }
-        };
+        FileFilter destinationNames = file ->
+            file.getName().startsWith("queue#") || 
file.getName().startsWith("topic#");
+
         File[] candidates = 
template.getPersistenceAdapter().getDirectory().listFiles(destinationNames);
         if (candidates != null) {
+            Set<File> existing = 
adapters.stream().map(PersistenceAdapter::getDirectory).collect(
+                Collectors.toSet());
             for (File candidate : candidates) {
-                registerExistingAdapter(template, candidate);
+                if(!existing.contains(candidate)) {
+                    LOG.debug("Adapter does not exist for dir: {} so will 
register it", candidate);
+                    registerExistingAdapter(template, candidate);
+                }
             }
         }
     }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MultiKahaDBMultipleFilteredAdapterTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MultiKahaDBMultipleFilteredAdapterTest.java
new file mode 100644
index 000000000..0ff9e9f2d
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MultiKahaDBMultipleFilteredAdapterTest.java
@@ -0,0 +1,157 @@
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+
+import jakarta.jms.Connection;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Session;
+import jakarta.jms.TextMessage;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MultiKahaDBMultipleFilteredAdapterTest {
+
+    private final static int maxFileLength = 1024*1024*32;
+    private static final String QUEUE_NAME = 
"QUEUE.amqMultiKahadbMultiFilteredAdapter";
+    private static final String TOPIC_NAME = 
"TOPIC.amqMultiKahadbMultiFilteredAdapter";
+    private static final int MESSAGE_COUNT = 100;
+
+    private  BrokerService broker;
+    private ActiveMQConnectionFactory cf;
+
+    @Before
+    public void setUp() throws Exception {
+        prepareBrokerWithMultiStore(true);
+        TransportConnector connector = 
broker.addConnector("tcp://localhost:0");
+        broker.start();
+        broker.waitUntilStarted();
+        cf = new ActiveMQConnectionFactory("failover://(" + 
connector.getConnectUri() + ")");
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+    }
+
+    @Test
+    public void testTopicWildcardAndPerDestinationFilteredAdapter() throws 
Exception {
+        //create one topic and queue and send messages
+        sendMessages(false);
+        sendMessages(true);
+
+        //the adapters will persist the data, topic will be by wildcard 
filtered adapter
+        //and queue will be by per destionation adapter
+
+        //stop broker and restart, the bug is that on restart wildcard 
filtered adapter adds one KahaDBPersistenceAdapter
+        // when calling 
MultiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters
+        //and on the call to doStart of MultiKahaDBPersistenceAdapter it uses 
per destination filtered adapter to add
+        //another KahaDBPersistenceAdapter and fails on jmx bean registration
+        // the fix is to make sure to check the persistent adapter already 
exists or not when we do per destination
+        // filtered adapter processing and only add if one does not exists 
already
+        var multiKaha = (MultiKahaDBPersistenceAdapter) 
broker.getPersistenceAdapter();
+        // Verify 2 adapters created and not 3
+        assertEquals(2, multiKaha.getAdapters().size());
+        Set<File> dirs = multiKaha.getAdapters().stream().map(adapter -> 
adapter.getDirectory()).collect(
+            Collectors.toSet());
+        broker.stop();
+        broker.waitUntilStopped();
+
+        //restart should succeed with fix
+        prepareBrokerWithMultiStore(false);
+        broker.start();
+        broker.waitUntilStarted();
+
+        // Verify 2 adapters created and not 3 and same dirs
+        multiKaha = (MultiKahaDBPersistenceAdapter) 
broker.getPersistenceAdapter();
+        assertEquals(2, multiKaha.getAdapters().size());
+        assertEquals(dirs, multiKaha.getAdapters().stream().map(adapter -> 
adapter.getDirectory()).collect(
+            Collectors.toSet()));
+    }
+
+    private void sendMessages(boolean isTopic) throws Exception {
+        Connection connection = cf.createConnection();
+        Session session = connection.createSession();
+        MessageProducer producer = isTopic ? session.createProducer(session
+                .createTopic(TOPIC_NAME)) : session.createProducer(session
+                .createQueue(QUEUE_NAME));
+
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            TextMessage message = session
+                    .createTextMessage("Test message " + i);
+            producer.send(message);
+        }
+
+        producer.close();
+        session.close();
+        connection.close();
+
+        if(!isTopic) {
+            assertQueueLength(MESSAGE_COUNT);
+        }
+    }
+
+    private void assertQueueLength(int len) throws Exception, IOException {
+        Set<org.apache.activemq.broker.region.Destination> destinations = 
broker.getBroker().getDestinations(
+                new ActiveMQQueue(QUEUE_NAME));
+        org.apache.activemq.broker.region.Queue queue = (Queue) 
destinations.iterator().next();
+        assertEquals(len, queue.getMessageStore().getMessageCount());
+    }
+
+    protected BrokerService createBroker(PersistenceAdapter kaha) throws 
Exception {
+        BrokerService broker = new BrokerService();
+        broker.setUseJmx(true);
+        broker.setBrokerName("localhost");
+        broker.setPersistenceAdapter(kaha);
+        return broker;
+    }
+
+    protected KahaDBPersistenceAdapter createStore(boolean delete) throws 
IOException {
+        KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
+        kaha.setJournalMaxFileLength(maxFileLength);
+        kaha.setCleanupInterval(5000);
+        if (delete) {
+            kaha.deleteAllMessages();
+        }
+        return kaha;
+    }
+
+    public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws 
Exception {
+        MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new 
MultiKahaDBPersistenceAdapter();
+        if (deleteAllMessages) {
+            multiKahaDBPersistenceAdapter.deleteAllMessages();
+        }
+        ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new 
ArrayList<>();
+
+        //have a topic wildcard filtered adapter
+        FilteredKahaDBPersistenceAdapter adapter1 = new 
FilteredKahaDBPersistenceAdapter();
+        adapter1.setPersistenceAdapter(createStore(deleteAllMessages));
+        adapter1.setTopic(">");
+        adapters.add(adapter1);
+
+        //have another per destination filtered adapter
+        FilteredKahaDBPersistenceAdapter adapter2 = new 
FilteredKahaDBPersistenceAdapter();
+        adapter2.setPersistenceAdapter(createStore(deleteAllMessages));
+        adapter2.setPerDestination(true);
+        adapters.add(adapter2);
+
+        multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
+        broker = createBroker(multiKahaDBPersistenceAdapter);
+    }
+}

Reply via email to