Repository: activemq-cli-tools Updated Branches: refs/heads/master 5b8a4b93c -> d297503ef
AMQCLI-5 - Add support for exporting Topics https://issues.apache.org/jira/browse/AMQCLI-5 Added basic support for exporting topics. This will still need a bit of polishing Project: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/commit/d297503e Tree: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/tree/d297503e Diff: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/diff/d297503e Branch: refs/heads/master Commit: d297503ef3079af2b79aee75cd67fab3579afb9f Parents: 5b8a4b9 Author: Christopher L. Shannon (cshannon) <[email protected]> Authored: Wed Feb 15 10:49:24 2017 -0500 Committer: Christopher L. Shannon (cshannon) <[email protected]> Committed: Wed Feb 15 10:49:24 2017 -0500 ---------------------------------------------------------------------- .../cli/kahadb/exporter/KahaDBExporter.java | 21 +-- .../ArtemisXmlMessageRecoveryListener.java | 8 +- .../artemis/OpenWireMessageTypeConverter.java | 39 ++++- .../activemq/store/kahadb/KahaDBUtil.java | 83 ++++++++++ .../cli/kahadb/exporter/ExporterTest.java | 153 +++++++++++++++++-- 5 files changed, 269 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/d297503e/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java index c178a8c..9c12644 100644 --- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java +++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java @@ -23,7 +23,6 @@ import java.util.stream.Collectors; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.TopicMessageStore; @@ -72,20 +71,12 @@ public class KahaDBExporter implements MessageStoreExporter { final ActiveMQTopic topic = (ActiveMQTopic) destination; final TopicMessageStore messageStore = adapter.createTopicMessageStore(topic); - //recover subscriptions - //TODO: This will most likely run into the same message more than once if there is - //more than one durable sub on a topic so we should look at optimizing this - //Ideally we'd just recover all the messages once and then ask KahaDB which subscriptions - //have not acked the message. This will probably require a new hook into KahaDB -// for (final SubscriptionInfo subscriptionInfo : messageStore.getAllSubscriptions()) { -// -// try { -// messageStore.recoverSubscription(subscriptionInfo.getClientId(), -// subscriptionInfo.getSubscriptionName(), recoveryListener); -// } catch (Exception e) { -// IOExceptionSupport.create(e); -// } -// } + //recover topic + try { + messageStore.recover(recoveryListener); + } catch (Exception e) { + IOExceptionSupport.create(e); + } } } } http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/d297503e/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java index c2d04a2..93bd439 100644 --- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java +++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java @@ -21,6 +21,7 @@ import org.apache.activemq.cli.schema.MessageType; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; import org.apache.activemq.store.MessageRecoveryListener; +import org.apache.activemq.store.kahadb.KahaDBStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,15 +32,16 @@ public class ArtemisXmlMessageRecoveryListener implements MessageRecoveryListene static final Logger LOG = LoggerFactory.getLogger(ArtemisXmlMessageRecoveryListener.class); private final ArtemisJournalMarshaller xmlMarshaller; - private final OpenWireMessageTypeConverter converter = new OpenWireMessageTypeConverter(); - + private final OpenWireMessageTypeConverter converter; /** * @param file */ - public ArtemisXmlMessageRecoveryListener(final ArtemisJournalMarshaller xmlMarshaller) { + public ArtemisXmlMessageRecoveryListener(final KahaDBStore store, + final ArtemisJournalMarshaller xmlMarshaller) { super(); this.xmlMarshaller = xmlMarshaller; + this.converter = new OpenWireMessageTypeConverter(store); } http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/d297503e/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java index 259decc..c921b48 100644 --- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java +++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java @@ -16,11 +16,10 @@ */ package org.apache.activemq.cli.kahadb.exporter.artemis; -import javax.jms.JMSException; - import org.apache.activemq.artemis.cli.commands.tools.XmlDataExporterUtil; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.cli.kahadb.exporter.OpenWireExportConverter; import org.apache.activemq.cli.schema.BodyType; import org.apache.activemq.cli.schema.MessageType; @@ -30,10 +29,22 @@ import org.apache.activemq.cli.schema.QueueType; import org.apache.activemq.cli.schema.QueuesType; import org.apache.activemq.command.Message; import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.store.kahadb.KahaDBStore; +import org.apache.activemq.store.kahadb.KahaDBUtil; public class OpenWireMessageTypeConverter implements OpenWireExportConverter<MessageType> { - final OpenWireMessageConverter converter = new OpenWireMessageConverter(new OpenWireFormat()); + private final OpenWireMessageConverter converter = new OpenWireMessageConverter(new OpenWireFormat()); + private final KahaDBStore store; + + + /** + * @param store + */ + public OpenWireMessageTypeConverter(KahaDBStore store) { + super(); + this.store = store; + } /* (non-Javadoc) * @see org.apache.activemq.cli.kahadb.exporter.MessageConverter#convert(org.apache.activemq.Message) @@ -66,11 +77,23 @@ public class OpenWireMessageTypeConverter implements OpenWireExportConverter<Mes return messageType; } - private QueuesType convertQueue(final Message message) throws JMSException { - return QueuesType.builder() - .withQueue(QueueType.builder() - .withName(message.getDestination().getPhysicalName()).build()) - .build(); + private QueuesType convertQueue(final Message message) throws Exception { + if (message.getDestination().isQueue()) { + return QueuesType.builder() + .withQueue(QueueType.builder() + .withName(message.getDestination().getPhysicalName()).build()) + .build(); + } else { + final QueuesType.Builder<Void> queuesBuilder = QueuesType.builder(); + + KahaDBUtil.getUnackedSubscriptions(store, message).forEach(sub -> { + queuesBuilder.addQueue(QueueType.builder().withName( + ActiveMQDestination.createQueueNameForDurableSubscription( + true, sub.getClientId(), sub.getSubcriptionName())).build()); + }); + + return queuesBuilder.build(); + } } private BodyType convertBody(final ServerMessage serverMessage) throws Exception { http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/d297503e/activemq-kahadb-exporter/src/main/java/org/apache/activemq/store/kahadb/KahaDBUtil.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/store/kahadb/KahaDBUtil.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/store/kahadb/KahaDBUtil.java new file mode 100644 index 0000000..40ea71d --- /dev/null +++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/store/kahadb/KahaDBUtil.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.SubscriptionInfo; +import org.apache.activemq.store.TopicMessageStore; +import org.apache.activemq.store.kahadb.MessageDatabase.LastAck; +import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination; +import org.apache.activemq.store.kahadb.disk.page.Transaction; + +public class KahaDBUtil { + + + /** + * Return subscriptions which have not acked this message + * + * @param store + * @param message + * @return + * @throws Exception + */ + public static List<SubscriptionInfo> getUnackedSubscriptions(KahaDBStore store, Message message) + throws Exception { + + final List<SubscriptionInfo> matching = new ArrayList<>(); + + if (!message.getDestination().isTopic()) { + return matching; + } + + ActiveMQTopic topic = (ActiveMQTopic) message.getDestination(); + String messageId = message.getMessageId().toString(); + TopicMessageStore messageStore = store.createTopicMessageStore(topic); + + store.indexLock.writeLock().lock(); + + final SubscriptionInfo[] infos = messageStore.getAllSubscriptions(); + + try { + store.pageFile.tx().execute(new Transaction.Closure<Exception>() { + @Override + public void execute(Transaction tx) throws Exception { + StoredDestination sd = store.getStoredDestination(store.convert(topic), tx); + + if (sd != null) { + Long position = sd.messageIdIndex.get(tx, messageId); + + for (SubscriptionInfo info : infos) { + LastAck cursorPos = store.getLastAck(tx, sd, + store.subscriptionKey(info.getClientId(), info.getSubcriptionName())); + if (cursorPos.lastAckedSequence < position) { + matching.add(info); + } + } + } + } + }); + } finally { + store.indexLock.writeLock().unlock(); + } + + return matching; + } +} http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/d297503e/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java index 73bcecd..b23ab7b 100644 --- a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java +++ b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java @@ -19,6 +19,8 @@ package org.apache.activemq.cli.kahadb.exporter; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; import java.io.BufferedReader; import java.io.File; @@ -28,6 +30,7 @@ import java.io.IOException; import java.util.Date; import java.util.HashMap; import java.util.Map; +import java.util.Set; import javax.jms.BytesMessage; import javax.jms.Connection; @@ -56,6 +59,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactor import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.cli.artemis.schema.ArtemisJournalMarshaller; import org.apache.activemq.cli.kahadb.exporter.artemis.ArtemisXmlMessageRecoveryListener; @@ -68,8 +72,12 @@ import org.apache.activemq.command.ActiveMQObjectMessage; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQStreamMessage; import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.IdGenerator; @@ -165,17 +173,19 @@ public class ExporterTest { xmlMarshaller.appendBindingsElement(); xmlMarshaller.appendBinding(QueueBindingType.builder() .withName("test.queue") + .withRoutingType(RoutingType.ANYCAST.toString()) .withAddress("test.queue").build()); xmlMarshaller.appendEndElement(); xmlMarshaller.appendMessagesElement(); KahaDBExporter dbExporter = new KahaDBExporter(adapter, - new ArtemisXmlMessageRecoveryListener(xmlMarshaller)); + new ArtemisXmlMessageRecoveryListener(adapter.getStore(), xmlMarshaller)); dbExporter.exportQueues(); xmlMarshaller.appendJournalClose(true); } + adapter.stop(); try (BufferedReader br = new BufferedReader(new FileReader(file))) { String line = null; @@ -250,6 +260,139 @@ public class ExporterTest { artemisServer.stop(); } + @Test + public void testExportTopics() throws Exception { + + ActiveMQTopic topic = new ActiveMQTopic("test.topic"); + KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); + adapter.setJournalMaxFileLength(1024 * 1024); + adapter.setDirectory(storeFolder.newFolder()); + adapter.start(); + TopicMessageStore messageStore = adapter.createTopicMessageStore(topic); + messageStore.start(); + + SubscriptionInfo sub1 = new SubscriptionInfo("clientId1", "sub1"); + SubscriptionInfo sub2 = new SubscriptionInfo("clientId1", "sub2"); + sub1.setDestination(topic); + messageStore.addSubscription(sub1, false); + messageStore.addSubscription(sub2, false); + + IdGenerator id = new IdGenerator(); + ConnectionContext context = new ConnectionContext(); + MessageId first = null; + for (int i = 0; i < 5; i++) { + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText("Test"); + message.setProperty("MyStringProperty", "abc"); + message.setProperty("MyIntegerProperty", 1); + message.setDestination(topic); + message.setMessageId(new MessageId(id.generateId() + ":1", i)); + messageStore.addMessage(context, message); + if (i == 0) { + first = message.getMessageId(); + } + } + + //ack for sub1 only + messageStore.acknowledge(context, "clientId1", "sub1", first, new MessageAck()); + + messageStore.stop(); + + // String queueName = ActiveMQDestination.createQueueNameForDurableSubscription(true, "clientId1", "sub1"); + + File file = storeFolder.newFile(); + try(FileOutputStream fos = new FileOutputStream(file)) { + XMLStreamWriter xmlWriter = XMLOutputFactory.newFactory().createXMLStreamWriter(fos); + ArtemisJournalMarshaller xmlMarshaller = new ArtemisJournalMarshaller(xmlWriter); + + xmlMarshaller.appendJournalOpen(); + xmlMarshaller.appendBindingsElement(); + + adapter.getStore().getDestinations().stream() + .filter(dest -> dest.isTopic()).forEach(dest -> { + + try { + for (SubscriptionInfo info : + adapter.getStore().createTopicMessageStore((ActiveMQTopic) dest).getAllSubscriptions()) { + xmlMarshaller.appendBinding(QueueBindingType.builder() + .withName(ActiveMQDestination.createQueueNameForDurableSubscription( + true, info.getClientId(), info.getSubcriptionName())) + .withRoutingType(RoutingType.MULTICAST.toString()) + .withAddress(dest.getPhysicalName()).build()); + } + + } catch (Exception e) { + fail(e.getMessage()); + } + }); + + xmlMarshaller.appendEndElement(); + xmlMarshaller.appendMessagesElement(); + + KahaDBExporter dbExporter = new KahaDBExporter(adapter, + new ArtemisXmlMessageRecoveryListener(adapter.getStore(), xmlMarshaller)); + + dbExporter.exportTopics(); + xmlMarshaller.appendJournalClose(true); + } + + adapter.stop(); + + try (BufferedReader br = new BufferedReader(new FileReader(file))) { + String line = null; + while ((line = br.readLine()) != null) { + System.out.println(line); + } + } + + + validate(file, 5); + + final ActiveMQServer artemisServer = buildArtemisBroker(); + artemisServer.start(); + + XmlDataImporter dataImporter = new XmlDataImporter(); + dataImporter.process(file.getAbsolutePath(), "localhost", 61400, false); + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61400"); + + Connection connection = null; + try { + + connection = cf.createConnection(); + connection.setClientID("clientId1"); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer messageConsumer = session.createSharedDurableConsumer( + session.createTopic("test.topic"), "sub1"); + MessageConsumer messageConsumer2 = session.createSharedDurableConsumer( + session.createTopic("test.topic"), "sub2"); + + for (int i = 0; i < 5; i++) { + TextMessage messageReceived1 = (TextMessage) messageConsumer.receive(1000); + if (i < 4) { + assertNotNull(messageReceived1); + } else { + assertNull(messageReceived1); + } + TextMessage messageReceived2 = (TextMessage) messageConsumer2.receive(1000); + assertNotNull(messageReceived2); + + assertEquals("abc", messageReceived2.getStringProperty("MyStringProperty")); + assertEquals("Test", messageReceived2.getText()); + } + + } finally { + if (connection != null) { + connection.close(); + } + cf.close(); + } + + artemisServer.stop(); + } + public ActiveMQServer buildArtemisBroker() throws IOException { Configuration configuration = new ConfigurationImpl(); @@ -270,14 +413,6 @@ public class ExporterTest { configuration.addConnectorConfiguration("connector", new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams)); - configuration.addAddressConfiguration(new CoreAddressConfiguration() - .setName("test.queue") - .addRoutingType(RoutingType.ANYCAST) - .addQueueConfiguration(new CoreQueueConfiguration() - .setAddress("test.queue") - .setName("test.queue") - .setRoutingType(RoutingType.ANYCAST)) - ); return new ActiveMQServerImpl(configuration); }
