Repository: activemq-cli-tools Updated Branches: refs/heads/master 76ca845ff -> b8d33cdde
AMQCLI-4, AMQCLI-5 Adding MetadataExporter abstraction this will allow a pluggable implementation to export metadata Project: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/commit/b8d33cdd Tree: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/tree/b8d33cdd Diff: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/diff/b8d33cdd Branch: refs/heads/master Commit: b8d33cddecd5fd9a5cb2eee8550c3aa2ba704a37 Parents: 76ca845 Author: Christopher L. Shannon (cshannon) <[email protected]> Authored: Thu Feb 16 09:18:04 2017 -0500 Committer: Christopher L. Shannon (cshannon) <[email protected]> Committed: Thu Feb 16 09:18:04 2017 -0500 ---------------------------------------------------------------------- .../activemq/cli/kahadb/exporter/Exporter.java | 43 +++--------- .../cli/kahadb/exporter/KahaDBExporter.java | 47 ++++++------- .../kahadb/exporter/MessageStoreExporter.java | 2 + .../exporter/MessageStoreMetadataExporter.java | 24 +++++++ .../artemis/ArtemisXmlMetadataExporter.java | 74 ++++++++++++++++++++ 5 files changed, 130 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/b8d33cdd/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java index 4439a88..fee79bf 100644 --- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java +++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java @@ -25,13 +25,9 @@ import java.util.zip.GZIPOutputStream; import javax.xml.stream.XMLOutputFactory; import javax.xml.stream.XMLStreamWriter; -import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.cli.artemis.schema.ArtemisJournalMarshaller; import org.apache.activemq.cli.kahadb.exporter.artemis.ArtemisXmlMessageRecoveryListener; -import org.apache.activemq.cli.schema.QueueBindingType; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.command.SubscriptionInfo; +import org.apache.activemq.cli.kahadb.exporter.artemis.ArtemisXmlMetadataExporter; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,43 +63,20 @@ public class Exporter { try(OutputStream fos = new BufferedOutputStream(compress ? new GZIPOutputStream( new FileOutputStream(artemisXml)) : new FileOutputStream(artemisXml))) { - XMLStreamWriter xmlWriter = XMLOutputFactory.newFactory().createXMLStreamWriter(fos); - ArtemisJournalMarshaller xmlMarshaller = new ArtemisJournalMarshaller(xmlWriter); + final XMLStreamWriter xmlWriter = XMLOutputFactory.newFactory().createXMLStreamWriter(fos); + final ArtemisJournalMarshaller xmlMarshaller = new ArtemisJournalMarshaller(xmlWriter); + final KahaDBExporter dbExporter = new KahaDBExporter(adapter, + new ArtemisXmlMetadataExporter(adapter.getStore(), xmlMarshaller), + new ArtemisXmlMessageRecoveryListener(adapter.getStore(), xmlMarshaller)); xmlMarshaller.appendJournalOpen(); xmlMarshaller.appendBindingsElement(); - - adapter.getStore().getDestinations().stream() - .forEach(dest -> { - try { - if (dest.isQueue()) { - xmlMarshaller.appendBinding(QueueBindingType.builder() - .withName(dest.getPhysicalName()) - .withRoutingType(RoutingType.ANYCAST.toString()) - .withAddress(dest.getPhysicalName()).build()); - } else if (dest.isTopic()) { - for (SubscriptionInfo info : - adapter.getStore().createTopicMessageStore((ActiveMQTopic) dest).getAllSubscriptions()) { - xmlMarshaller.appendBinding(QueueBindingType.builder() - .withName(ActiveMQDestination.createQueueNameForDurableSubscription( - true, info.getClientId(), info.getSubcriptionName())) - .withRoutingType(RoutingType.MULTICAST.toString()) - .withAddress(dest.getPhysicalName()).build()); - } - } - } catch (Exception e) { - throw new IllegalStateException(e); - } - }); - + dbExporter.exportMetadata(); xmlMarshaller.appendEndElement(); xmlMarshaller.appendMessagesElement(); - - KahaDBExporter dbExporter = new KahaDBExporter(adapter, - new ArtemisXmlMessageRecoveryListener(adapter.getStore(), xmlMarshaller)); - dbExporter.exportQueues(); dbExporter.exportTopics(); + xmlMarshaller.appendEndElement(); xmlMarshaller.appendJournalClose(true); } finally { adapter.stop(); http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/b8d33cdd/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java index 7eee0aa..dbe0114 100644 --- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java +++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java @@ -25,7 +25,6 @@ import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; -import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.apache.activemq.util.IOExceptionSupport; import org.slf4j.Logger; @@ -36,26 +35,44 @@ public class KahaDBExporter implements MessageStoreExporter { static final Logger LOG = LoggerFactory.getLogger(KahaDBExporter.class); private final KahaDBPersistenceAdapter adapter; + private final MessageStoreMetadataExporter metadataExporter; private final MessageRecoveryListener recoveryListener; - public KahaDBExporter (final KahaDBPersistenceAdapter adapter, + public KahaDBExporter(final KahaDBPersistenceAdapter adapter, + final MessageStoreMetadataExporter metadataExporter, final MessageRecoveryListener recoveryListener) { this.adapter = adapter; + this.metadataExporter = metadataExporter; this.recoveryListener = recoveryListener; } + + @Override + public void exportMetadata() throws IOException { + metadataExporter.export(); + } + @Override public void exportQueues() throws IOException { + exportDestinations(ActiveMQDestination.QUEUE_TYPE); + } + + @Override + public void exportTopics() throws IOException { + exportDestinations(ActiveMQDestination.TOPIC_TYPE); + } + private void exportDestinations(byte destType) throws IOException { final Set<ActiveMQDestination> destinations = adapter.getDestinations().stream().filter( - dest -> dest.isQueue()).collect(Collectors.toSet()); + dest -> dest.getDestinationType() == destType).collect(Collectors.toSet()); // loop through all queues and export them for (final ActiveMQDestination destination : destinations) { LOG.info("Starting export of: " + destination); - final ActiveMQQueue queue = (ActiveMQQueue) destination; - final MessageStore messageStore = adapter.createQueueMessageStore(queue); + final MessageStore messageStore = destination.isQueue() ? + adapter.createQueueMessageStore((ActiveMQQueue) destination) : + adapter.createTopicMessageStore((ActiveMQTopic) destination); try { // migrate the data @@ -66,24 +83,4 @@ public class KahaDBExporter implements MessageStoreExporter { } } - @Override - public void exportTopics() throws IOException { - - final Set<ActiveMQDestination> destinations = adapter.getDestinations().stream().filter( - dest -> dest.isTopic()).collect(Collectors.toSet()); - - for (ActiveMQDestination destination : destinations) { - LOG.info("Starting export of: " + destination); - - final ActiveMQTopic topic = (ActiveMQTopic) destination; - final TopicMessageStore messageStore = adapter.createTopicMessageStore(topic); - - //recover topic - try { - messageStore.recover(recoveryListener); - } catch (Exception e) { - IOExceptionSupport.create(e); - } - } - } } http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/b8d33cdd/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreExporter.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreExporter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreExporter.java index b228e19..b1217b4 100644 --- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreExporter.java +++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreExporter.java @@ -20,6 +20,8 @@ import java.io.IOException; public interface MessageStoreExporter { + public void exportMetadata() throws IOException; + public void exportQueues() throws IOException; public void exportTopics() throws IOException; http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/b8d33cdd/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreMetadataExporter.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreMetadataExporter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreMetadataExporter.java new file mode 100644 index 0000000..994528b --- /dev/null +++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreMetadataExporter.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.cli.kahadb.exporter; + +import java.io.IOException; + +public interface MessageStoreMetadataExporter { + + public void export() throws IOException; +} http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/b8d33cdd/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMetadataExporter.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMetadataExporter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMetadataExporter.java new file mode 100644 index 0000000..216a6a3 --- /dev/null +++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMetadataExporter.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.cli.kahadb.exporter.artemis; + +import java.io.IOException; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; +import org.apache.activemq.cli.artemis.schema.ArtemisJournalMarshaller; +import org.apache.activemq.cli.kahadb.exporter.MessageStoreMetadataExporter; +import org.apache.activemq.cli.schema.QueueBindingType; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.SubscriptionInfo; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBStore; + +public class ArtemisXmlMetadataExporter implements MessageStoreMetadataExporter { + + private final KahaDBStore store; + private final ArtemisJournalMarshaller xmlMarshaller; + + + /** + * @param xmlMarshaller + */ + public ArtemisXmlMetadataExporter(final KahaDBStore store, + final ArtemisJournalMarshaller xmlMarshaller) { + super(); + this.store = store; + this.xmlMarshaller = xmlMarshaller; + } + + @Override + public void export() throws IOException { + store.getDestinations().stream() + .forEach(dest -> { + try { + if (dest.isQueue()) { + xmlMarshaller.appendBinding(QueueBindingType.builder() + .withName(dest.getPhysicalName()) + .withRoutingType(RoutingType.ANYCAST.toString()) + .withAddress(dest.getPhysicalName()).build()); + } else if (dest.isTopic()) { + for (SubscriptionInfo info : + store.createTopicMessageStore((ActiveMQTopic) dest).getAllSubscriptions()) { + xmlMarshaller.appendBinding(QueueBindingType.builder() + .withName(ActiveMQDestination.createQueueNameForDurableSubscription( + true, info.getClientId(), info.getSubcriptionName())) + .withRoutingType(RoutingType.MULTICAST.toString()) + .withAddress(dest.getPhysicalName()).build()); + } + } + } catch (Exception e) { + throw new IllegalStateException(e); + } + }); + + } + +}
