Repository: activemq-cli-tools
Updated Branches:
  refs/heads/master 5b8a4b93c -> d297503ef


AMQCLI-5 - Add support for exporting Topics

https://issues.apache.org/jira/browse/AMQCLI-5

Added basic support for exporting topics.  This will still need a bit of
polishing


Project: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/commit/d297503e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/tree/d297503e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/diff/d297503e

Branch: refs/heads/master
Commit: d297503ef3079af2b79aee75cd67fab3579afb9f
Parents: 5b8a4b9
Author: Christopher L. Shannon (cshannon) <[email protected]>
Authored: Wed Feb 15 10:49:24 2017 -0500
Committer: Christopher L. Shannon (cshannon) <[email protected]>
Committed: Wed Feb 15 10:49:24 2017 -0500

----------------------------------------------------------------------
 .../cli/kahadb/exporter/KahaDBExporter.java     |  21 +--
 .../ArtemisXmlMessageRecoveryListener.java      |   8 +-
 .../artemis/OpenWireMessageTypeConverter.java   |  39 ++++-
 .../activemq/store/kahadb/KahaDBUtil.java       |  83 ++++++++++
 .../cli/kahadb/exporter/ExporterTest.java       | 153 +++++++++++++++++--
 5 files changed, 269 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/d297503e/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java
 
b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java
index c178a8c..9c12644 100644
--- 
a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java
+++ 
b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java
@@ -23,7 +23,6 @@ import java.util.stream.Collectors;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.TopicMessageStore;
@@ -72,20 +71,12 @@ public class KahaDBExporter implements MessageStoreExporter 
{
             final ActiveMQTopic topic = (ActiveMQTopic) destination;
             final TopicMessageStore messageStore = 
adapter.createTopicMessageStore(topic);
 
-            //recover subscriptions
-            //TODO: This will most likely run into the same message more than 
once if there is
-            //more than one durable sub on a topic so we should look at 
optimizing this
-            //Ideally we'd just recover all the messages once and then ask 
KahaDB which subscriptions
-            //have not acked the message.  This will probably require a new 
hook into KahaDB
-//            for (final SubscriptionInfo subscriptionInfo : 
messageStore.getAllSubscriptions()) {
-//
-//                try {
-//                    
messageStore.recoverSubscription(subscriptionInfo.getClientId(),
-//                            subscriptionInfo.getSubscriptionName(), 
recoveryListener);
-//                } catch (Exception e) {
-//                    IOExceptionSupport.create(e);
-//                }
-//            }
+            //recover topic
+            try {
+                messageStore.recover(recoveryListener);
+            } catch (Exception e) {
+                IOExceptionSupport.create(e);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/d297503e/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java
 
b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java
index c2d04a2..93bd439 100644
--- 
a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java
+++ 
b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java
@@ -21,6 +21,7 @@ import org.apache.activemq.cli.schema.MessageType;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.kahadb.KahaDBStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,15 +32,16 @@ public class ArtemisXmlMessageRecoveryListener implements 
MessageRecoveryListene
     static final Logger LOG = 
LoggerFactory.getLogger(ArtemisXmlMessageRecoveryListener.class);
 
     private final ArtemisJournalMarshaller xmlMarshaller;
-    private final OpenWireMessageTypeConverter converter = new 
OpenWireMessageTypeConverter();
-
+    private final OpenWireMessageTypeConverter converter;
 
     /**
      * @param file
      */
-    public ArtemisXmlMessageRecoveryListener(final ArtemisJournalMarshaller 
xmlMarshaller) {
+    public ArtemisXmlMessageRecoveryListener(final KahaDBStore store,
+            final ArtemisJournalMarshaller xmlMarshaller) {
         super();
         this.xmlMarshaller = xmlMarshaller;
+        this.converter = new OpenWireMessageTypeConverter(store);
     }
 
 

http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/d297503e/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java
 
b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java
index 259decc..c921b48 100644
--- 
a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java
+++ 
b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java
@@ -16,11 +16,10 @@
  */
 package org.apache.activemq.cli.kahadb.exporter.artemis;
 
-import javax.jms.JMSException;
-
 import org.apache.activemq.artemis.cli.commands.tools.XmlDataExporterUtil;
 import 
org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
 import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 import org.apache.activemq.cli.kahadb.exporter.OpenWireExportConverter;
 import org.apache.activemq.cli.schema.BodyType;
 import org.apache.activemq.cli.schema.MessageType;
@@ -30,10 +29,22 @@ import org.apache.activemq.cli.schema.QueueType;
 import org.apache.activemq.cli.schema.QueuesType;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.store.kahadb.KahaDBUtil;
 
 public class OpenWireMessageTypeConverter implements 
OpenWireExportConverter<MessageType> {
 
-    final OpenWireMessageConverter converter = new 
OpenWireMessageConverter(new OpenWireFormat());
+    private final OpenWireMessageConverter converter = new 
OpenWireMessageConverter(new OpenWireFormat());
+    private final KahaDBStore store;
+
+
+    /**
+     * @param store
+     */
+    public OpenWireMessageTypeConverter(KahaDBStore store) {
+        super();
+        this.store = store;
+    }
 
     /* (non-Javadoc)
      * @see 
org.apache.activemq.cli.kahadb.exporter.MessageConverter#convert(org.apache.activemq.Message)
@@ -66,11 +77,23 @@ public class OpenWireMessageTypeConverter implements 
OpenWireExportConverter<Mes
         return messageType;
     }
 
-    private QueuesType convertQueue(final Message message) throws JMSException 
{
-        return QueuesType.builder()
-                .withQueue(QueueType.builder()
-                        
.withName(message.getDestination().getPhysicalName()).build())
-            .build();
+    private QueuesType convertQueue(final Message message) throws Exception {
+        if (message.getDestination().isQueue()) {
+            return QueuesType.builder()
+                    .withQueue(QueueType.builder()
+                            
.withName(message.getDestination().getPhysicalName()).build())
+                .build();
+        } else {
+            final QueuesType.Builder<Void> queuesBuilder = 
QueuesType.builder();
+
+            KahaDBUtil.getUnackedSubscriptions(store, message).forEach(sub -> {
+                queuesBuilder.addQueue(QueueType.builder().withName(
+                        
ActiveMQDestination.createQueueNameForDurableSubscription(
+                        true, sub.getClientId(), 
sub.getSubcriptionName())).build());
+            });
+
+            return queuesBuilder.build();
+        }
     }
 
     private BodyType convertBody(final ServerMessage serverMessage) throws 
Exception {

http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/d297503e/activemq-kahadb-exporter/src/main/java/org/apache/activemq/store/kahadb/KahaDBUtil.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/store/kahadb/KahaDBUtil.java
 
b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/store/kahadb/KahaDBUtil.java
new file mode 100644
index 0000000..40ea71d
--- /dev/null
+++ 
b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/store/kahadb/KahaDBUtil.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.kahadb.MessageDatabase.LastAck;
+import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination;
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
+
+public class KahaDBUtil {
+
+
+    /**
+     * Return subscriptions which have not acked this message
+     *
+     * @param store
+     * @param message
+     * @return
+     * @throws Exception
+     */
+    public static List<SubscriptionInfo> getUnackedSubscriptions(KahaDBStore 
store, Message message)
+            throws Exception {
+
+        final List<SubscriptionInfo> matching = new ArrayList<>();
+
+        if (!message.getDestination().isTopic()) {
+            return matching;
+        }
+
+        ActiveMQTopic topic = (ActiveMQTopic) message.getDestination();
+        String messageId = message.getMessageId().toString();
+        TopicMessageStore messageStore =  store.createTopicMessageStore(topic);
+
+        store.indexLock.writeLock().lock();
+
+        final SubscriptionInfo[] infos = messageStore.getAllSubscriptions();
+
+        try {
+            store.pageFile.tx().execute(new Transaction.Closure<Exception>() {
+                @Override
+                public void execute(Transaction tx) throws Exception {
+                    StoredDestination sd = 
store.getStoredDestination(store.convert(topic), tx);
+
+                    if (sd != null) {
+                        Long position = sd.messageIdIndex.get(tx, messageId);
+
+                        for (SubscriptionInfo info : infos) {
+                            LastAck cursorPos = store.getLastAck(tx, sd,
+                                    store.subscriptionKey(info.getClientId(), 
info.getSubcriptionName()));
+                            if (cursorPos.lastAckedSequence < position) {
+                                matching.add(info);
+                            }
+                        }
+                    }
+                }
+            });
+        } finally {
+            store.indexLock.writeLock().unlock();
+        }
+
+        return matching;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/d297503e/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
 
b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
index 73bcecd..b23ab7b 100644
--- 
a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
+++ 
b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
@@ -19,6 +19,8 @@ package org.apache.activemq.cli.kahadb.exporter;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -28,6 +30,7 @@ import java.io.IOException;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
@@ -56,6 +59,7 @@ import 
org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactor
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.cli.artemis.schema.ArtemisJournalMarshaller;
 import 
org.apache.activemq.cli.kahadb.exporter.artemis.ArtemisXmlMessageRecoveryListener;
@@ -68,8 +72,12 @@ import org.apache.activemq.command.ActiveMQObjectMessage;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQStreamMessage;
 import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IdGenerator;
@@ -165,17 +173,19 @@ public class ExporterTest {
             xmlMarshaller.appendBindingsElement();
             xmlMarshaller.appendBinding(QueueBindingType.builder()
                     .withName("test.queue")
+                    .withRoutingType(RoutingType.ANYCAST.toString())
                     .withAddress("test.queue").build());
             xmlMarshaller.appendEndElement();
             xmlMarshaller.appendMessagesElement();
 
             KahaDBExporter dbExporter = new KahaDBExporter(adapter,
-                    new ArtemisXmlMessageRecoveryListener(xmlMarshaller));
+                    new ArtemisXmlMessageRecoveryListener(adapter.getStore(), 
xmlMarshaller));
 
             dbExporter.exportQueues();
             xmlMarshaller.appendJournalClose(true);
         }
 
+        adapter.stop();
 
         try (BufferedReader br = new BufferedReader(new FileReader(file))) {
             String line = null;
@@ -250,6 +260,139 @@ public class ExporterTest {
         artemisServer.stop();
     }
 
+    @Test
+    public void testExportTopics() throws Exception {
+
+        ActiveMQTopic topic = new ActiveMQTopic("test.topic");
+        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+        adapter.setJournalMaxFileLength(1024 * 1024);
+        adapter.setDirectory(storeFolder.newFolder());
+        adapter.start();
+        TopicMessageStore messageStore = 
adapter.createTopicMessageStore(topic);
+        messageStore.start();
+
+        SubscriptionInfo sub1 = new SubscriptionInfo("clientId1", "sub1");
+        SubscriptionInfo sub2 = new SubscriptionInfo("clientId1", "sub2");
+        sub1.setDestination(topic);
+        messageStore.addSubscription(sub1, false);
+        messageStore.addSubscription(sub2, false);
+
+        IdGenerator id = new IdGenerator();
+        ConnectionContext context = new ConnectionContext();
+        MessageId first = null;
+        for (int i = 0; i < 5; i++) {
+            ActiveMQTextMessage message = new ActiveMQTextMessage();
+            message.setText("Test");
+            message.setProperty("MyStringProperty", "abc");
+            message.setProperty("MyIntegerProperty", 1);
+            message.setDestination(topic);
+            message.setMessageId(new MessageId(id.generateId() + ":1", i));
+            messageStore.addMessage(context, message);
+            if (i == 0) {
+                first = message.getMessageId();
+            }
+        }
+
+        //ack for sub1 only
+        messageStore.acknowledge(context, "clientId1", "sub1", first, new 
MessageAck());
+
+        messageStore.stop();
+
+      //  String queueName = 
ActiveMQDestination.createQueueNameForDurableSubscription(true, "clientId1", 
"sub1");
+
+        File file = storeFolder.newFile();
+        try(FileOutputStream fos = new FileOutputStream(file)) {
+            XMLStreamWriter xmlWriter = 
XMLOutputFactory.newFactory().createXMLStreamWriter(fos);
+            ArtemisJournalMarshaller xmlMarshaller = new 
ArtemisJournalMarshaller(xmlWriter);
+
+            xmlMarshaller.appendJournalOpen();
+            xmlMarshaller.appendBindingsElement();
+
+            adapter.getStore().getDestinations().stream()
+                    .filter(dest -> dest.isTopic()).forEach(dest -> {
+
+                        try {
+                            for (SubscriptionInfo info :
+                                
adapter.getStore().createTopicMessageStore((ActiveMQTopic) 
dest).getAllSubscriptions()) {
+                                
xmlMarshaller.appendBinding(QueueBindingType.builder()
+                                        
.withName(ActiveMQDestination.createQueueNameForDurableSubscription(
+                                                true, info.getClientId(), 
info.getSubcriptionName()))
+                                        
.withRoutingType(RoutingType.MULTICAST.toString())
+                                        
.withAddress(dest.getPhysicalName()).build());
+                            }
+
+                        } catch (Exception e) {
+                            fail(e.getMessage());
+                        }
+                    });
+
+            xmlMarshaller.appendEndElement();
+            xmlMarshaller.appendMessagesElement();
+
+            KahaDBExporter dbExporter = new KahaDBExporter(adapter,
+                    new ArtemisXmlMessageRecoveryListener(adapter.getStore(), 
xmlMarshaller));
+
+            dbExporter.exportTopics();
+            xmlMarshaller.appendJournalClose(true);
+        }
+
+        adapter.stop();
+
+        try (BufferedReader br = new BufferedReader(new FileReader(file))) {
+            String line = null;
+            while ((line = br.readLine()) != null) {
+                System.out.println(line);
+            }
+         }
+
+
+        validate(file, 5);
+
+        final ActiveMQServer artemisServer = buildArtemisBroker();
+        artemisServer.start();
+
+        XmlDataImporter dataImporter = new XmlDataImporter();
+        dataImporter.process(file.getAbsolutePath(), "localhost", 61400, 
false);
+
+        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("tcp://localhost:61400");
+
+        Connection connection = null;
+        try {
+
+            connection = cf.createConnection();
+            connection.setClientID("clientId1");
+            connection.start();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer messageConsumer = 
session.createSharedDurableConsumer(
+                    session.createTopic("test.topic"), "sub1");
+            MessageConsumer messageConsumer2 = 
session.createSharedDurableConsumer(
+                    session.createTopic("test.topic"), "sub2");
+
+            for (int i = 0; i < 5; i++) {
+                TextMessage messageReceived1 = (TextMessage) 
messageConsumer.receive(1000);
+                if (i < 4) {
+                    assertNotNull(messageReceived1);
+                } else {
+                    assertNull(messageReceived1);
+                }
+                TextMessage messageReceived2 = (TextMessage) 
messageConsumer2.receive(1000);
+                assertNotNull(messageReceived2);
+
+                assertEquals("abc", 
messageReceived2.getStringProperty("MyStringProperty"));
+                assertEquals("Test", messageReceived2.getText());
+            }
+
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+            cf.close();
+        }
+
+        artemisServer.stop();
+    }
+
     public ActiveMQServer buildArtemisBroker() throws IOException {
         Configuration configuration = new ConfigurationImpl();
 
@@ -270,14 +413,6 @@ public class ExporterTest {
         configuration.addConnectorConfiguration("connector",
                 new 
TransportConfiguration(NettyConnectorFactory.class.getName(), 
connectionParams));
 
-        configuration.addAddressConfiguration(new CoreAddressConfiguration()
-                .setName("test.queue")
-                .addRoutingType(RoutingType.ANYCAST)
-                .addQueueConfiguration(new CoreQueueConfiguration()
-                        .setAddress("test.queue")
-                        .setName("test.queue")
-                        .setRoutingType(RoutingType.ANYCAST))
-                );
 
        return new ActiveMQServerImpl(configuration);
     }

Reply via email to