Repository: activemq-cli-tools Updated Branches: refs/heads/master d297503ef -> 419019cfd
AMQCLI-3 Add a utility method for exporting a store Project: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/commit/419019cf Tree: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/tree/419019cf Diff: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/diff/419019cf Branch: refs/heads/master Commit: 419019cfd5b289d87c2fd650ced0a5b4bcd9807a Parents: d297503 Author: Christopher L. Shannon (cshannon) <[email protected]> Authored: Wed Feb 15 12:37:46 2017 -0500 Committer: Christopher L. Shannon (cshannon) <[email protected]> Committed: Wed Feb 15 12:37:46 2017 -0500 ---------------------------------------------------------------------- .../activemq/cli/kahadb/exporter/Exporter.java | 67 +++++++++++++++ .../cli/kahadb/exporter/ExporterTest.java | 87 ++++---------------- 2 files changed, 84 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/419019cf/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java index 7b8f17d..49d9ae7 100644 --- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java +++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java @@ -16,6 +16,23 @@ */ package org.apache.activemq.cli.kahadb.exporter; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.FileOutputStream; + +import javax.xml.stream.XMLOutputFactory; +import javax.xml.stream.XMLStreamWriter; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; +import org.apache.activemq.cli.artemis.schema.ArtemisJournalMarshaller; +import org.apache.activemq.cli.kahadb.exporter.artemis.ArtemisXmlMessageRecoveryListener; +import org.apache.activemq.cli.schema.QueueBindingType; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.SubscriptionInfo; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; + /** * KahaDB Exporter */ @@ -24,4 +41,54 @@ public class Exporter { public static void main(String[] args) { } + + public static void exportKahaDbStore(final File kahaDbDir, final File artemisXml) throws Exception { + + KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); + adapter.setDirectory(kahaDbDir); + adapter.start(); + + try(FileOutputStream fos = new FileOutputStream(artemisXml)) { + XMLStreamWriter xmlWriter = XMLOutputFactory.newFactory().createXMLStreamWriter(fos); + ArtemisJournalMarshaller xmlMarshaller = new ArtemisJournalMarshaller(xmlWriter); + + xmlMarshaller.appendJournalOpen(); + xmlMarshaller.appendBindingsElement(); + + adapter.getStore().getDestinations().stream() + .forEach(dest -> { + try { + if (dest.isQueue()) { + xmlMarshaller.appendBinding(QueueBindingType.builder() + .withName(dest.getPhysicalName()) + .withRoutingType(RoutingType.ANYCAST.toString()) + .withAddress(dest.getPhysicalName()).build()); + } else if (dest.isTopic()) { + for (SubscriptionInfo info : + adapter.getStore().createTopicMessageStore((ActiveMQTopic) dest).getAllSubscriptions()) { + xmlMarshaller.appendBinding(QueueBindingType.builder() + .withName(ActiveMQDestination.createQueueNameForDurableSubscription( + true, info.getClientId(), info.getSubcriptionName())) + .withRoutingType(RoutingType.MULTICAST.toString()) + .withAddress(dest.getPhysicalName()).build()); + } + } + } catch (Exception e) { + throw new IllegalStateException(e); + } + }); + + xmlMarshaller.appendEndElement(); + xmlMarshaller.appendMessagesElement(); + + KahaDBExporter dbExporter = new KahaDBExporter(adapter, + new ArtemisXmlMessageRecoveryListener(adapter.getStore(), xmlMarshaller)); + + dbExporter.exportQueues(); + dbExporter.exportTopics(); + xmlMarshaller.appendJournalClose(true); + } finally { + adapter.stop(); + } + } } http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/419019cf/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java index b23ab7b..4a4d47a 100644 --- a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java +++ b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java @@ -103,10 +103,11 @@ public class ExporterTest { @Test public void testExportQueues() throws Exception { + File kahaDbDir = storeFolder.newFolder(); ActiveMQQueue queue = new ActiveMQQueue("test.queue"); KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); adapter.setJournalMaxFileLength(1024 * 1024); - adapter.setDirectory(storeFolder.newFolder()); + adapter.setDirectory(kahaDbDir); adapter.start(); MessageStore messageStore = adapter.createQueueMessageStore(queue); messageStore.start(); @@ -162,32 +163,12 @@ public class ExporterTest { messageStore.addMessage(context, message); } - messageStore.stop(); - - File file = storeFolder.newFile(); - try(FileOutputStream fos = new FileOutputStream(file)) { - XMLStreamWriter xmlWriter = XMLOutputFactory.newFactory().createXMLStreamWriter(fos); - ArtemisJournalMarshaller xmlMarshaller = new ArtemisJournalMarshaller(xmlWriter); - - xmlMarshaller.appendJournalOpen(); - xmlMarshaller.appendBindingsElement(); - xmlMarshaller.appendBinding(QueueBindingType.builder() - .withName("test.queue") - .withRoutingType(RoutingType.ANYCAST.toString()) - .withAddress("test.queue").build()); - xmlMarshaller.appendEndElement(); - xmlMarshaller.appendMessagesElement(); - - KahaDBExporter dbExporter = new KahaDBExporter(adapter, - new ArtemisXmlMessageRecoveryListener(adapter.getStore(), xmlMarshaller)); - - dbExporter.exportQueues(); - xmlMarshaller.appendJournalClose(true); - } - adapter.stop(); - try (BufferedReader br = new BufferedReader(new FileReader(file))) { + File xmlFile = storeFolder.newFile(); + Exporter.exportKahaDbStore(kahaDbDir, xmlFile); + + try (BufferedReader br = new BufferedReader(new FileReader(xmlFile))) { String line = null; while ((line = br.readLine()) != null) { System.out.println(line); @@ -195,13 +176,13 @@ public class ExporterTest { } - validate(file, 17); + validate(xmlFile, 17); final ActiveMQServer artemisServer = buildArtemisBroker(); artemisServer.start(); XmlDataImporter dataImporter = new XmlDataImporter(); - dataImporter.process(file.getAbsolutePath(), "localhost", 61400, false); + dataImporter.process(xmlFile.getAbsolutePath(), "localhost", 61400, false); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61400"); @@ -263,10 +244,12 @@ public class ExporterTest { @Test public void testExportTopics() throws Exception { + File kahaDbDir = storeFolder.newFolder(); + ActiveMQTopic topic = new ActiveMQTopic("test.topic"); KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); adapter.setJournalMaxFileLength(1024 * 1024); - adapter.setDirectory(storeFolder.newFolder()); + adapter.setDirectory(kahaDbDir); adapter.start(); TopicMessageStore messageStore = adapter.createTopicMessageStore(topic); messageStore.start(); @@ -296,49 +279,13 @@ public class ExporterTest { //ack for sub1 only messageStore.acknowledge(context, "clientId1", "sub1", first, new MessageAck()); - messageStore.stop(); - - // String queueName = ActiveMQDestination.createQueueNameForDurableSubscription(true, "clientId1", "sub1"); - - File file = storeFolder.newFile(); - try(FileOutputStream fos = new FileOutputStream(file)) { - XMLStreamWriter xmlWriter = XMLOutputFactory.newFactory().createXMLStreamWriter(fos); - ArtemisJournalMarshaller xmlMarshaller = new ArtemisJournalMarshaller(xmlWriter); - - xmlMarshaller.appendJournalOpen(); - xmlMarshaller.appendBindingsElement(); - - adapter.getStore().getDestinations().stream() - .filter(dest -> dest.isTopic()).forEach(dest -> { - - try { - for (SubscriptionInfo info : - adapter.getStore().createTopicMessageStore((ActiveMQTopic) dest).getAllSubscriptions()) { - xmlMarshaller.appendBinding(QueueBindingType.builder() - .withName(ActiveMQDestination.createQueueNameForDurableSubscription( - true, info.getClientId(), info.getSubcriptionName())) - .withRoutingType(RoutingType.MULTICAST.toString()) - .withAddress(dest.getPhysicalName()).build()); - } - - } catch (Exception e) { - fail(e.getMessage()); - } - }); - - xmlMarshaller.appendEndElement(); - xmlMarshaller.appendMessagesElement(); - - KahaDBExporter dbExporter = new KahaDBExporter(adapter, - new ArtemisXmlMessageRecoveryListener(adapter.getStore(), xmlMarshaller)); + adapter.stop(); - dbExporter.exportTopics(); - xmlMarshaller.appendJournalClose(true); - } + File xmlFile = storeFolder.newFile(); + Exporter.exportKahaDbStore(kahaDbDir, xmlFile); - adapter.stop(); - try (BufferedReader br = new BufferedReader(new FileReader(file))) { + try (BufferedReader br = new BufferedReader(new FileReader(xmlFile))) { String line = null; while ((line = br.readLine()) != null) { System.out.println(line); @@ -346,13 +293,13 @@ public class ExporterTest { } - validate(file, 5); + validate(xmlFile, 5); final ActiveMQServer artemisServer = buildArtemisBroker(); artemisServer.start(); XmlDataImporter dataImporter = new XmlDataImporter(); - dataImporter.process(file.getAbsolutePath(), "localhost", 61400, false); + dataImporter.process(xmlFile.getAbsolutePath(), "localhost", 61400, false); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61400");
