This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/master by this push:
     new 289750d  AMQ-7308 - ensure kahadb message add does not auto create the 
message store in error, expect an existing store. fix and test
289750d is described below

commit 289750d7c9849bd26f19e9116457eb72a3412d05
Author: gtully <gary.tu...@gmail.com>
AuthorDate: Fri Sep 20 10:22:56 2019 +0100

    AMQ-7308 - ensure kahadb message add does not auto create the message store 
in error, expect an existing store. fix and test
---
 .../store/PersistenceAdapterTestSupport.java       |   1 +
 .../activemq/store/kahadb/MessageDatabase.java     |  25 ++-
 .../store/kahadb/MessageDatabaseSizeTest.java      |   2 +
 .../VirtualTopicConcurrentSendDeleteTest.java      | 177 +++++++++++++++++++++
 4 files changed, 203 insertions(+), 2 deletions(-)

diff --git 
a/activemq-broker/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java
 
b/activemq-broker/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java
index 16d46e1..39a63ae 100644
--- 
a/activemq-broker/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java
+++ 
b/activemq-broker/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java
@@ -57,6 +57,7 @@ abstract public class PersistenceAdapterTestSupport extends 
TestCase {
 
         
         MessageStore ms = pa.createQueueMessageStore(new 
ActiveMQQueue("TEST"));
+        ms.start();
         ConnectionContext context = new ConnectionContext();
 
         ActiveMQTextMessage message = new ActiveMQTextMessage();
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 1a120a2..ac8ea48 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1205,6 +1205,7 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
      */
     void process(JournalCommand<?> data, final Location location, final 
Location inDoubtlocation) throws IOException {
         if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 
0) {
+            initMessageStore(data);
             process(data, location, (IndexAware) null);
         } else {
             // just recover producer audit
@@ -1217,6 +1218,23 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
         }
     }
 
+    private void initMessageStore(JournalCommand<?> data) throws IOException {
+        data.visit(new Visitor() {
+            @Override
+            public void visit(KahaAddMessageCommand command) throws 
IOException {
+                final KahaDestination destination = command.getDestination();
+                if (!storedDestinations.containsKey(key(destination))) {
+                    pageFile.tx().execute(new 
Transaction.Closure<IOException>() {
+                        @Override
+                        public void execute(Transaction tx) throws IOException 
{
+                            getStoredDestination(destination, tx);
+                        }
+                    });
+                }
+            }
+        });
+    }
+
     // /////////////////////////////////////////////////////////////////
     // Journaled record processing methods. Once the record is journaled,
     // these methods handle applying the index updates. These may be called
@@ -1486,8 +1504,11 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
     private final HashSet<Integer> journalFilesBeingReplicated = new 
HashSet<>();
 
     long updateIndex(Transaction tx, KahaAddMessageCommand command, Location 
location) throws IOException {
-        StoredDestination sd = getStoredDestination(command.getDestination(), 
tx);
-
+        StoredDestination sd = 
getExistingStoredDestination(command.getDestination(), tx);
+        if (sd == null) {
+            // if the store no longer exists, skip
+            return -1;
+        }
         // Skip adding the message to the index if this is a topic and there 
are
         // no subscriptions.
         if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
diff --git 
a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java
 
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java
index 4deb1e0..82d4018 100644
--- 
a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java
+++ 
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java
@@ -114,6 +114,7 @@ public class MessageDatabaseSizeTest {
 
         //Add a single message and update once so we can compare the size 
consistently
         MessageStore messageStore = store.createQueueMessageStore(destination);
+        messageStore.start();
         messageStore.addMessage(broker.getAdminConnectionContext(), 
textMessage);
         messageStore.updateMessage(textMessage);
 
@@ -134,6 +135,7 @@ public class MessageDatabaseSizeTest {
 
         //Add a single message and update once so we can compare the size 
consistently
         MessageStore messageStore = store.createQueueMessageStore(destination);
+        messageStore.start();
         messageStore.addMessage(broker.getAdminConnectionContext(), 
textMessage);
         textMessage.setText("new size of message");
         messageStore.updateMessage(textMessage);
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicConcurrentSendDeleteTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicConcurrentSendDeleteTest.java
new file mode 100644
index 0000000..d4be5ae
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicConcurrentSendDeleteTest.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.ObjectName;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+public class VirtualTopicConcurrentSendDeleteTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(VirtualTopicConcurrentSendDeleteTest.class);
+
+    BrokerService brokerService;
+    ConnectionFactory connectionFactory;
+
+    @Before
+    public void createBroker() throws Exception {
+        createBroker(true);
+    }
+
+    public void createBroker(boolean delete) throws Exception  {
+        brokerService = new BrokerService();
+        //brokerService.setPersistent(false);
+        brokerService.setDeleteAllMessagesOnStartup(delete);
+        brokerService.setAdvisorySupport(false);
+        
((KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(false);
+        brokerService.start();
+
+        ActiveMQConnectionFactory activeMQConnectionFactory = new 
ActiveMQConnectionFactory("vm://localhost");
+        activeMQConnectionFactory.setWatchTopicAdvisories(false);
+        activeMQConnectionFactory.setAlwaysSyncSend(false);
+        ActiveMQPrefetchPolicy zeroPrefetch = new ActiveMQPrefetchPolicy();
+        zeroPrefetch.setAll(0);
+        activeMQConnectionFactory.setPrefetchPolicy(zeroPrefetch);
+        connectionFactory = activeMQConnectionFactory;
+    }
+
+    @After
+    public void stopBroker() throws Exception  {
+        brokerService.stop();
+    }
+
+    @Test
+    public void testConsumerQueueDeleteOk() throws Exception {
+
+        final int numConnections = 1;
+        final int numDestinations = 10;
+        final int numMessages = 4000;
+
+        ExecutorService executorService = 
Executors.newFixedThreadPool(numConnections * 2);
+
+        brokerService.getRegionBroker().addDestination(
+                brokerService.getAdminConnectionContext(), new 
ActiveMQTopic("VirtualTopic.TEST"), false);
+
+        // precreate dests to accentuate read access
+        for (int i=0; i<numDestinations; i++ ) {
+            brokerService.getRegionBroker().addDestination(
+                    brokerService.getAdminConnectionContext(),
+                    new ActiveMQQueue("Consumer." + i + ".VirtualTopic.TEST"),
+                    false);
+        }
+
+        final CountDownLatch doneOne = new CountDownLatch(numConnections);
+        Runnable runnable = new Runnable() {
+            @Override
+            public void run() {
+
+                try {
+                    int messagestoSend = 0;
+
+                    Connection connection1 = 
connectionFactory.createConnection();
+                    connection1.start();
+
+                    Session session = connection1.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                    MessageProducer producer = session.createProducer(null);
+
+                    do {
+                        producer.send(new ActiveMQTopic("VirtualTopic.TEST"), 
new ActiveMQMessage());
+                        messagestoSend++;
+
+                        if (messagestoSend == 1000) {
+                            doneOne.countDown();
+                        }
+                    } while (messagestoSend < numMessages);
+
+                    connection1.close();
+
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+
+        for (int i = 0; i < numConnections; i++) {
+            executorService.execute(runnable);
+        }
+
+        // delete all of the consumer queues
+        final String prefix = 
"org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=";
+
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        doneOne.await(30, TimeUnit.SECONDS);
+
+                        // delete in reverse to clash with send in forward 
direction
+                        for (int i=numDestinations-1; i>=0; i--) {
+                            final ActiveMQQueue toDelete = new 
ActiveMQQueue("Consumer." + i + ".VirtualTopic.TEST");
+
+                            ObjectName queueViewMBeanName = new 
ObjectName(prefix + toDelete.getQueueName());
+                            QueueViewMBean proxy = (QueueViewMBean)
+                                    
brokerService.getManagementContext().newProxyInstance(queueViewMBeanName, 
QueueViewMBean.class, true);
+                            LOG.info("Q len: " + toDelete.getQueueName() + ", 
" + proxy.getQueueSize());
+                            
brokerService.getAdminView().removeQueue(toDelete.getPhysicalName());
+
+                            TimeUnit.MILLISECONDS.sleep(100);
+                        }
+
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+
+                }
+            });
+
+
+        executorService.shutdown();
+        executorService.awaitTermination(5, TimeUnit.MINUTES);
+
+        LOG.info("Enqueues: " + 
((RegionBroker)brokerService.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
+        final int numQueues = 
((RegionBroker)brokerService.getRegionBroker()).getQueueRegion().getDestinationMap().size();
+        LOG.info("Destinations: " + numQueues );
+
+        assertEquals("no queues left", 0, numQueues);
+
+        // the bug
+        assertEquals("no queues, just one topic, in kahadb", 1, 
brokerService.getPersistenceAdapter().getDestinations().size());
+    }
+}

Reply via email to