Repository: activemq-cli-tools Updated Branches: refs/heads/master 8d58305d3 -> 3fee126f1
https://issues.apache.org/jira/browse/AMQCLI-4 Initial implementation of a Queue export functionality for KahaDB to Artemis XML. This still needs some work Project: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/commit/3fee126f Tree: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/tree/3fee126f Diff: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/diff/3fee126f Branch: refs/heads/master Commit: 3fee126f1b84fb75e8af39198291d229f469b311 Parents: 8d58305 Author: Christopher L. Shannon (cshannon) <[email protected]> Authored: Wed Feb 8 16:02:21 2017 -0500 Committer: Christopher L. Shannon (cshannon) <[email protected]> Committed: Mon Feb 13 13:22:56 2017 -0500 ---------------------------------------------------------------------- activemq-kahadb-exporter/pom.xml | 24 ++ .../schema/ArtemisJournalMarshaller.java | 85 +++++- .../cli/kahadb/exporter/KahaDBExporter.java | 91 +++++++ .../kahadb/exporter/MessageStoreExporter.java | 26 ++ .../exporter/OpenWireExportConverter.java | 28 ++ .../ArtemisXmlMessageRecoveryListener.java | 75 ++++++ .../artemis/OpenWireMessageTypeConverter.java | 140 ++++++++++ .../main/resources/artemis-import-export.xjb | 5 +- .../cli/kahadb/exporter/ExporterTest.java | 269 ++++++++++++++++++- .../src/test/resources/log4j.properties | 4 +- pom.xml | 22 ++ 11 files changed, 763 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/3fee126f/activemq-kahadb-exporter/pom.xml ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/pom.xml b/activemq-kahadb-exporter/pom.xml index 1247661..9bcbb72 100644 --- a/activemq-kahadb-exporter/pom.xml +++ b/activemq-kahadb-exporter/pom.xml @@ -34,6 +34,14 @@ <artifactId>activemq-kahadb-store</artifactId> </dependency> <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>artemis-cli</artifactId> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>artemis-openwire-protocol</artifactId> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> @@ -43,6 +51,21 @@ <artifactId>mockito-core</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> @@ -120,6 +143,7 @@ <extensionArgs> <extensionArg>-Xfluent-builder</extensionArg> </extensionArgs> + <extension>true</extension> </xsdOption> </xsdOptions> </configuration> http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/3fee126f/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/artemis/schema/ArtemisJournalMarshaller.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/artemis/schema/ArtemisJournalMarshaller.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/artemis/schema/ArtemisJournalMarshaller.java index 82e167b..f1695e5 100644 --- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/artemis/schema/ArtemisJournalMarshaller.java +++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/artemis/schema/ArtemisJournalMarshaller.java @@ -16,6 +16,11 @@ */ package org.apache.activemq.cli.artemis.schema; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.regex.Pattern; + import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBElement; import javax.xml.bind.JAXBException; @@ -53,7 +58,9 @@ public class ArtemisJournalMarshaller { this.context = JAXBContext.newInstance(ObjectFactory.class); this.marshaller = context.createMarshaller(); this.marshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE); - this.xmlWriter = xmlWriter; + + PrettyPrintHandler handler = new PrettyPrintHandler(xmlWriter); + this.xmlWriter = (XMLStreamWriter) Proxy.newProxyInstance(XMLStreamWriter.class.getClassLoader(), new Class[]{XMLStreamWriter.class}, handler); } public void appendJournalOpen() throws XMLStreamException { @@ -97,4 +104,80 @@ public class ArtemisJournalMarshaller { private <T> JAXBElement<T> wrap(String name, T object) { return new JAXBElement<T>(QName.valueOf(name), (Class<T>) object.getClass(), object); } + + static class PrettyPrintHandler implements InvocationHandler { + + private static final Pattern XML_CHARS = Pattern.compile( "[&<>]" ); + + private final XMLStreamWriter target; + + private int depth = 0; + + private static final char INDENT_CHAR = ' '; + + private static final String LINE_SEPARATOR = System.getProperty("line.separator"); + + boolean wrap = true; + + PrettyPrintHandler(XMLStreamWriter target) { + this.target = target; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + String m = method.getName(); + boolean useCData = false; + + switch (m) { + case "writeStartElement": + target.writeCharacters(LINE_SEPARATOR); + target.writeCharacters(indent(depth)); + + depth++; + break; + case "writeEndElement": + depth--; + if (wrap) { + target.writeCharacters(LINE_SEPARATOR); + target.writeCharacters(indent(depth)); + } + wrap = true; + break; + case "writeEmptyElement": + case "writeCData": + target.writeCharacters(LINE_SEPARATOR); + target.writeCharacters(indent(depth)); + break; + case "writeCharacters": + useCData = XML_CHARS.matcher( (String)args[0] ).find(); + if (!useCData) { + wrap = false; + break; + } else { + target.writeCharacters(LINE_SEPARATOR); + target.writeCharacters(indent(depth)); + break; + } + } + + if (useCData) { + Method cdata = XMLStreamWriter.class.getMethod("writeCData", String.class); + args[0] = ((String)args[0]).replace("<![CDATA[", "").replace("]]>", ""); + cdata.invoke(target, args); + } else { + method.invoke(target, args); + } + + return null; + } + + private String indent(int depth) { + depth *= 3; // level of indentation + char[] output = new char[depth]; + while (depth-- > 0) { + output[depth] = INDENT_CHAR; + } + return new String(output); + } + } } http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/3fee126f/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java new file mode 100644 index 0000000..c178a8c --- /dev/null +++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.cli.kahadb.exporter; + +import java.io.IOException; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.SubscriptionInfo; +import org.apache.activemq.store.MessageRecoveryListener; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.TopicMessageStore; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.IOExceptionSupport; + +public class KahaDBExporter implements MessageStoreExporter { + + private final KahaDBPersistenceAdapter adapter; + private final MessageRecoveryListener recoveryListener; + + public KahaDBExporter (final KahaDBPersistenceAdapter adapter, + final MessageRecoveryListener recoveryListener) { + this.adapter = adapter; + this.recoveryListener = recoveryListener; + } + + @Override + public void exportQueues() throws IOException { + + final Set<ActiveMQDestination> destinations = adapter.getDestinations().stream().filter( + dest -> dest.isQueue()).collect(Collectors.toSet()); + + // loop through all queues and export them + for (final ActiveMQDestination destination : destinations) { + + final ActiveMQQueue queue = (ActiveMQQueue) destination; + final MessageStore messageStore = adapter.createQueueMessageStore(queue); + + try { + // migrate the data + messageStore.recover(recoveryListener); + } catch (Exception e) { + IOExceptionSupport.create(e); + } + } + } + + @Override + public void exportTopics() throws IOException { + + final Set<ActiveMQDestination> destinations = adapter.getDestinations().stream().filter( + dest -> dest.isTopic()).collect(Collectors.toSet()); + + for (ActiveMQDestination destination : destinations) { + final ActiveMQTopic topic = (ActiveMQTopic) destination; + final TopicMessageStore messageStore = adapter.createTopicMessageStore(topic); + + //recover subscriptions + //TODO: This will most likely run into the same message more than once if there is + //more than one durable sub on a topic so we should look at optimizing this + //Ideally we'd just recover all the messages once and then ask KahaDB which subscriptions + //have not acked the message. This will probably require a new hook into KahaDB +// for (final SubscriptionInfo subscriptionInfo : messageStore.getAllSubscriptions()) { +// +// try { +// messageStore.recoverSubscription(subscriptionInfo.getClientId(), +// subscriptionInfo.getSubscriptionName(), recoveryListener); +// } catch (Exception e) { +// IOExceptionSupport.create(e); +// } +// } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/3fee126f/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreExporter.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreExporter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreExporter.java new file mode 100644 index 0000000..b228e19 --- /dev/null +++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreExporter.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.cli.kahadb.exporter; + +import java.io.IOException; + +public interface MessageStoreExporter { + + public void exportQueues() throws IOException; + + public void exportTopics() throws IOException; +} http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/3fee126f/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/OpenWireExportConverter.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/OpenWireExportConverter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/OpenWireExportConverter.java new file mode 100644 index 0000000..f364c20 --- /dev/null +++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/OpenWireExportConverter.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.cli.kahadb.exporter; + +import org.apache.activemq.command.Message; + +/** + * Convert an OpenWire message to another format + */ +public interface OpenWireExportConverter <T> { + + public T convert (final Message message) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/3fee126f/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java new file mode 100644 index 0000000..c2d04a2 --- /dev/null +++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.cli.kahadb.exporter.artemis; + +import org.apache.activemq.cli.artemis.schema.ArtemisJournalMarshaller; +import org.apache.activemq.cli.schema.MessageType; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.store.MessageRecoveryListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Recovery listener that can be used to export messages to Artemis + */ +public class ArtemisXmlMessageRecoveryListener implements MessageRecoveryListener { + static final Logger LOG = LoggerFactory.getLogger(ArtemisXmlMessageRecoveryListener.class); + + private final ArtemisJournalMarshaller xmlMarshaller; + private final OpenWireMessageTypeConverter converter = new OpenWireMessageTypeConverter(); + + + /** + * @param file + */ + public ArtemisXmlMessageRecoveryListener(final ArtemisJournalMarshaller xmlMarshaller) { + super(); + this.xmlMarshaller = xmlMarshaller; + } + + + @Override + public boolean recoverMessage(Message message) throws Exception { + try { + MessageType messageType = converter.convert(message); + xmlMarshaller.appendMessage(messageType); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + return false; + } + return true; + } + + + @Override + public boolean recoverMessageReference(MessageId ref) throws Exception { + return false; + } + + @Override + public boolean hasSpace() { + return true; + } + + + @Override + public boolean isDuplicate(MessageId ref) { + return false; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/3fee126f/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java new file mode 100644 index 0000000..8d24e96 --- /dev/null +++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.cli.kahadb.exporter.artemis; + +import javax.jms.JMSException; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; +import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.utils.Base64; +import org.apache.activemq.cli.kahadb.exporter.OpenWireExportConverter; +import org.apache.activemq.cli.schema.BodyType; +import org.apache.activemq.cli.schema.MessageType; +import org.apache.activemq.cli.schema.PropertiesType; +import org.apache.activemq.cli.schema.PropertyType; +import org.apache.activemq.cli.schema.QueueType; +import org.apache.activemq.cli.schema.QueuesType; +import org.apache.activemq.command.Message; +import org.apache.activemq.openwire.OpenWireFormat; + +public class OpenWireMessageTypeConverter implements OpenWireExportConverter<MessageType> { + + static final String MESSAGE_TIMESTAMP = "timestamp"; + static final String DEFAULT_TYPE_PRETTY = "default"; + static final String BYTES_TYPE_PRETTY = "bytes"; + static final String MAP_TYPE_PRETTY = "map"; + static final String OBJECT_TYPE_PRETTY = "object"; + static final String STREAM_TYPE_PRETTY = "stream"; + static final String TEXT_TYPE_PRETTY = "text"; + + final OpenWireMessageConverter converter = new OpenWireMessageConverter(new OpenWireFormat()); + + /* (non-Javadoc) + * @see org.apache.activemq.cli.kahadb.exporter.MessageConverter#convert(org.apache.activemq.Message) + */ + @Override + public MessageType convert(final Message message) throws Exception { + final ServerMessage serverMessage = converter.inbound(message); + final MessageType messageType = convertAttributes(serverMessage); + + try { + if (!message.getProperties().isEmpty()) { + final PropertiesType propertiesType = new PropertiesType(); + serverMessage.getPropertyNames().forEach(key -> { + Object value = serverMessage.getObjectProperty(key); + propertiesType.getProperty().add(PropertyType.builder() + .withName(key.toString()) + .withValueAttribute(convertPropertyValue(value)) + .withType(convertPropertyType(value.getClass())) + .build()); + }); + messageType.setProperties(propertiesType); + } + + messageType.setQueues(convertQueue(message)); + messageType.setBody(convertBody(serverMessage)); + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + + return messageType; + } + + private QueuesType convertQueue(final Message message) throws JMSException { + + return QueuesType.builder() + .withQueue(QueueType.builder() + .withName(message.getDestination().getPhysicalName()).build()) + .build(); + } + + private BodyType convertBody(final ServerMessage serverMessage) throws Exception { + int size = serverMessage.getEndOfBodyPosition() - serverMessage.getBodyBuffer().readerIndex(); + byte[] buffer = new byte[size]; + serverMessage.getBodyBuffer().readBytes(buffer); + String value = encode(buffer); + + //requires CDATA + return BodyType.builder() + .withValue("<![CDATA[" + value + "]]>") + .build(); + } + + private String convertPropertyValue(Object value) { + if (value instanceof byte[]) { + return encode((byte[]) value).toString(); + } + return value.toString(); + } + + private MessageType convertAttributes(final ServerMessage message) { + MessageType messageType = MessageType.builder() + .withId(message.getMessageID()) + .withTimestamp(message.getTimestamp()) + .withPriority(message.getPriority()).build(); + + byte rawType = message.getType(); + String prettyType = DEFAULT_TYPE_PRETTY; + if (rawType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) { + prettyType = BYTES_TYPE_PRETTY; + } else if (rawType == org.apache.activemq.artemis.api.core.Message.MAP_TYPE) { + prettyType = MAP_TYPE_PRETTY; + } else if (rawType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) { + prettyType = OBJECT_TYPE_PRETTY; + } else if (rawType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) { + prettyType = STREAM_TYPE_PRETTY; + } else if (rawType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) { + prettyType = TEXT_TYPE_PRETTY; + } + + messageType.setType(prettyType); + return messageType; + } + + private String convertPropertyType(Class<?> clazz) { + if (clazz.equals(SimpleString.class)) { + return String.class.getSimpleName().toLowerCase(); + } + return clazz.getSimpleName().toLowerCase(); + } + + private static String encode(final byte[] data) { + return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/3fee126f/activemq-kahadb-exporter/src/main/resources/artemis-import-export.xjb ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/src/main/resources/artemis-import-export.xjb b/activemq-kahadb-exporter/src/main/resources/artemis-import-export.xjb index 3f2eea3..ce7670e 100644 --- a/activemq-kahadb-exporter/src/main/resources/artemis-import-export.xjb +++ b/activemq-kahadb-exporter/src/main/resources/artemis-import-export.xjb @@ -17,8 +17,8 @@ --> <bindings xmlns="http://java.sun.com/xml/ns/jaxb" xmlns:xsi="http://www.w3.org/2000/10/XMLSchema-instance" xmlns:xs="http://www.w3.org/2001/XMLSchema" - extensionBindingPrefixes="xjc annox" - xmlns:annox="http://annox.dev.java.net" version="2.1"> + xmlns:xjc="http://java.sun.com/xml/ns/jaxb/xjc" + extensionBindingPrefixes="xjc" version="2.1"> <bindings schemaLocation="artemis-import-export.xsd" version="1.0"> <!-- Customize the package name --> @@ -32,5 +32,6 @@ <property name="valueAttribute" /> </bindings> </bindings> + </bindings> </bindings> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/3fee126f/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java index df55a20..84edb33 100644 --- a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java +++ b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java @@ -16,11 +16,278 @@ */ package org.apache.activemq.cli.kahadb.exporter; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.IOException; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBElement; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; +import javax.xml.stream.XMLOutputFactory; +import javax.xml.stream.XMLStreamWriter; + +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.cli.commands.tools.XmlDataImporter; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; +import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.cli.artemis.schema.ArtemisJournalMarshaller; +import org.apache.activemq.cli.kahadb.exporter.artemis.ArtemisXmlMessageRecoveryListener; +import org.apache.activemq.cli.schema.ActivemqJournalType; +import org.apache.activemq.cli.schema.ObjectFactory; +import org.apache.activemq.cli.schema.QueueBindingType; +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQMapMessage; +import org.apache.activemq.command.ActiveMQObjectMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQStreamMessage; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.IdGenerator; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ExporterTest { + static final Logger LOG = LoggerFactory.getLogger(ExporterTest.class); + + @Rule + public TemporaryFolder storeFolder = new TemporaryFolder(); + + /** + * TODO Improve test when real exporting is done, for now this just + * tests that the recovery listener iterates over all the queue messages + * + * @throws Exception + */ @Test - public void test() { + public void testExportQueues() throws Exception { + + ActiveMQQueue queue = new ActiveMQQueue("test.queue"); + KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); + adapter.setJournalMaxFileLength(1024 * 1024); + adapter.setDirectory(storeFolder.newFolder()); + adapter.start(); + MessageStore messageStore = adapter.createQueueMessageStore(queue); + messageStore.start(); + + IdGenerator id = new IdGenerator(); + ConnectionContext context = new ConnectionContext(); + for (int i = 0; i < 5; i++) { + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText("Test"); + message.setProperty("MyStringProperty", "abc"); + message.setProperty("MyIntegerProperty", 1); + message.setDestination(queue); + message.setMessageId(new MessageId(id.generateId() + ":1", i)); + messageStore.addMessage(context, message); + } + byte[] bytes = new byte[] {10, 11, 12}; + for (int i = 0; i < 3; i++) { + ActiveMQBytesMessage message = new ActiveMQBytesMessage(); + + message.setContent(new ByteSequence(bytes)); + message.setProperty("MyStringProperty", "abc"); + message.setProperty("MyByteProperty", (byte)10); + message.setDestination(queue); + message.setMessageId(new MessageId(id.generateId() + ":2", i)); + messageStore.addMessage(context, message); + } + + for (int i = 0; i < 3; i++) { + ActiveMQMapMessage message = new ActiveMQMapMessage(); + message.setObject("key", "value"); + message.setObject("key2", 10); + message.setProperty("MyStringProperty", "abc"); + message.setDestination(queue); + message.setMessageId(new MessageId(id.generateId() + ":3", i)); + messageStore.addMessage(context, message); + } + + Date date = new Date(); + for (int i = 0; i < 3; i++) { + ActiveMQObjectMessage message = new ActiveMQObjectMessage(); + message.setObject(date); + message.setDestination(queue); + message.setMessageId(new MessageId(id.generateId() + ":4", i)); + messageStore.addMessage(context, message); + } + + for (int i = 0; i < 3; i++) { + ActiveMQStreamMessage message = new ActiveMQStreamMessage(); + message.writeByte((byte)10); + message.storeContentAndClear(); + message.setDestination(queue); + message.setMessageId(new MessageId(id.generateId() + ":5", i)); + messageStore.addMessage(context, message); + } + + messageStore.stop(); + + File file = storeFolder.newFile(); + try(FileOutputStream fos = new FileOutputStream(file)) { + XMLStreamWriter xmlWriter = XMLOutputFactory.newFactory().createXMLStreamWriter(fos); + ArtemisJournalMarshaller xmlMarshaller = new ArtemisJournalMarshaller(xmlWriter); + + xmlMarshaller.appendJournalOpen(); + xmlMarshaller.appendBindingsElement(); + xmlMarshaller.appendBinding(QueueBindingType.builder() + .withName("test.queue") + .withAddress("test.queue").build()); + xmlMarshaller.appendEndElement(); + xmlMarshaller.appendMessagesElement(); + + KahaDBExporter dbExporter = new KahaDBExporter(adapter, + new ArtemisXmlMessageRecoveryListener(xmlMarshaller)); + + dbExporter.exportQueues(); + xmlMarshaller.appendJournalClose(true); + } + + + try (BufferedReader br = new BufferedReader(new FileReader(file))) { + String line = null; + while ((line = br.readLine()) != null) { + System.out.println(line); + } + } + + + validate(file, 17); + + final ActiveMQServer artemisServer = buildArtemisBroker(); + artemisServer.start(); + + XmlDataImporter dataImporter = new XmlDataImporter(); + dataImporter.process(file.getAbsolutePath(), "localhost", 61400, false); + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61400"); + + Connection connection = null; + try { + + connection = cf.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer messageConsumer = session.createConsumer(session.createQueue("test.queue")); + + for (int i = 0; i < 5; i++) { + TextMessage messageReceived = (TextMessage) messageConsumer.receive(1000); + assertNotNull(messageReceived); + assertEquals("abc", messageReceived.getStringProperty("MyStringProperty")); + assertEquals("Test", messageReceived.getText()); + } + + for (int i = 0; i < 3; i++) { + BytesMessage messageReceived = (BytesMessage) messageConsumer.receive(1000); + assertNotNull(messageReceived); + assertEquals("abc", messageReceived.getStringProperty("MyStringProperty")); + assertEquals((byte)10, messageReceived.getByteProperty("MyByteProperty")); + byte[] result = new byte[3]; + messageReceived.readBytes(result); + assertArrayEquals(bytes, result); + } + + for (int i = 0; i < 3; i++) { + MapMessage messageReceived = (MapMessage) messageConsumer.receive(1000); + assertNotNull(messageReceived); + assertEquals("abc", messageReceived.getStringProperty("MyStringProperty")); + assertEquals("value", messageReceived.getObject("key")); + } + + for (int i = 0; i < 3; i++) { + ObjectMessage messageReceived = (ObjectMessage) messageConsumer.receive(1000); + assertNotNull(messageReceived); + assertEquals(date, messageReceived.getObject()); + } + + for (int i = 0; i < 3; i++) { + StreamMessage messageReceived = (StreamMessage) messageConsumer.receive(1000); + assertNotNull(messageReceived); + assertEquals((byte)10, messageReceived.readByte()); + } + + } finally { + if (connection != null) { + connection.close(); + } + cf.close(); + } + + artemisServer.stop(); } + + public ActiveMQServer buildArtemisBroker() throws IOException { + Configuration configuration = new ConfigurationImpl(); + + configuration.setPersistenceEnabled(true); + configuration.setSecurityEnabled(false); + + Map<String, Object> connectionParams = new HashMap<String, Object>(); + connectionParams.put( + org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, 61400); + + configuration.setBindingsDirectory(storeFolder.newFolder().getAbsolutePath()); + configuration.setJournalDirectory(storeFolder.newFolder().getAbsolutePath()); + configuration.setLargeMessagesDirectory(storeFolder.newFolder().getAbsolutePath()); + configuration.setPagingDirectory(storeFolder.newFolder().getAbsolutePath()); + + configuration.addAcceptorConfiguration( + new TransportConfiguration(NettyAcceptorFactory.class.getName(), connectionParams)); + configuration.addConnectorConfiguration("connector", + new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams)); + + configuration.addAddressConfiguration(new CoreAddressConfiguration() + .setName("test.queue") + .addRoutingType(RoutingType. ANYCAST) + .addQueueConfiguration(new CoreQueueConfiguration() + .setAddress("test.queue") + .setName("test.queue") + .setRoutingType(RoutingType.ANYCAST)) + ); + + return new ActiveMQServerImpl(configuration); + } + + @SuppressWarnings("unchecked") + private void validate(File file, int count) throws JAXBException { + JAXBContext jaxbContext = JAXBContext.newInstance(ObjectFactory.class); + Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller(); + JAXBElement<ActivemqJournalType> read = (JAXBElement<ActivemqJournalType>) jaxbUnmarshaller.unmarshal(file); + assertEquals(count, read.getValue().getMessages().getMessage().size()); + } + } http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/3fee126f/activemq-kahadb-exporter/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/src/test/resources/log4j.properties b/activemq-kahadb-exporter/src/test/resources/log4j.properties index c1b759a..b9e69aa 100644 --- a/activemq-kahadb-exporter/src/test/resources/log4j.properties +++ b/activemq-kahadb-exporter/src/test/resources/log4j.properties @@ -18,9 +18,9 @@ # # The logging properties used during tests.. # -log4j.rootLogger=INFO, out, stdout +log4j.rootLogger=DEBUG, out, stdout -#log4j.logger.org.apache.activemq.store.kahadb=TRACE +log4j.logger.org.apache.activemq=DEBUG # CONSOLE appender not used by default log4j.appender.stdout=org.apache.log4j.ConsoleAppender http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/3fee126f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index dc06c89..9541e1b 100644 --- a/pom.xml +++ b/pom.xml @@ -39,6 +39,8 @@ <activemq-version>5.14.3</activemq-version> <artemis-version>2.0.0-SNAPSHOT</artemis-version> + <slf4j-version>1.7.13</slf4j-version> + <log4j-version>1.2.17</log4j-version> <!-- Test dependency versions --> <junit-version>4.12</junit-version> @@ -82,6 +84,11 @@ <artifactId>artemis-cli</artifactId> <version>${artemis-version}</version> </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>artemis-openwire-protocol</artifactId> + <version>${artemis-version}</version> + </dependency> <!-- Test dependencies --> <dependency> @@ -96,6 +103,21 @@ <version>${mockito-version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>${slf4j-version}</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>jcl-over-slf4j</artifactId> + <version>${slf4j-version}</version> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <version>${log4j-version}</version> + </dependency> </dependencies> </dependencyManagement>
