Repository: activemq-artemis Updated Branches: refs/heads/master 13fac8608 -> ab9f5128b
ARTEMIS-1840 Added FQQN Import/Export Live Broker Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/64ce26e7 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/64ce26e7 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/64ce26e7 Branch: refs/heads/master Commit: 64ce26e7cc06c0e5779430a8126128d11da561bd Parents: 812776f Author: Martyn Taylor <[email protected]> Authored: Wed May 2 11:35:17 2018 +0100 Committer: Clebert Suconic <[email protected]> Committed: Wed May 2 12:09:50 2018 -0400 ---------------------------------------------------------------------- .../artemis/cli/commands/messages/Consumer.java | 57 ++- .../cli/commands/messages/ConsumerThread.java | 73 ++-- .../cli/commands/messages/DestAbstract.java | 96 +++++ .../artemis/cli/commands/messages/Producer.java | 121 +++++-- .../cli/commands/messages/ProducerThread.java | 11 + .../factory/serialize/MessageSerializer.java | 37 ++ .../factory/serialize/XMLMessageSerializer.java | 118 ++++++ .../cli/test/MessageSerializerTest.java | 362 +++++++++++++++++++ 8 files changed, 823 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64ce26e7/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java index ee15a66..856e82b 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java @@ -20,11 +20,16 @@ package org.apache.activemq.artemis.cli.commands.messages; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageListener; import javax.jms.Session; +import java.io.FileOutputStream; +import java.io.OutputStream; import io.airlift.airline.Command; import io.airlift.airline.Option; import org.apache.activemq.artemis.cli.commands.ActionContext; +import org.apache.activemq.artemis.cli.factory.serialize.MessageSerializer; @Command(name = "consumer", description = "It will consume messages from an instance") public class Consumer extends DestAbstract { @@ -41,6 +46,9 @@ public class Consumer extends DestAbstract { @Option(name = "--filter", description = "filter to be used with the consumer") String filter; + @Option(name = "--data", description = "serialize the messages to the specified file as they are consumed") + String file; + @Override public Object execute(ActionContext context) throws Exception { super.execute(context); @@ -49,7 +57,34 @@ public class Consumer extends DestAbstract { ConnectionFactory factory = createConnectionFactory(); + SerialiserMessageListener listener = null; + MessageSerializer messageSerializer = null; + if (file != null) { + try { + String className = serializer == null ? DEFAULT_MESSAGE_SERIALIZER : serializer; + if (className.equals(DEFAULT_MESSAGE_SERIALIZER) && !protocol.equalsIgnoreCase("CORE")) { + System.err.println("Default Serializer does not support: " + protocol + " protocol"); + return null; + } + messageSerializer = (MessageSerializer) Class.forName(className).getConstructor().newInstance(); + } catch (Exception e) { + System.err.println("Error. Unable to instantiate serializer class: " + serializer); + return null; + } + + try { + OutputStream out = new FileOutputStream(file); + listener = new SerialiserMessageListener(messageSerializer, out); + } catch (Exception e) { + System.err.println("Error: Unable to open file for writing\n" + e.getMessage()); + return null; + } + } + + if (messageSerializer != null) messageSerializer.start(); + try (Connection connection = factory.createConnection()) { + // We read messages in a single thread when persisting to file. ConsumerThread[] threadsArray = new ConsumerThread[threads]; for (int i = 0; i < threads; i++) { Session session; @@ -58,10 +93,13 @@ public class Consumer extends DestAbstract { } else { session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } - Destination dest = lookupDestination(session); + + // Do validation on FQQN + Destination dest = isFQQN() ? session.createQueue(getFQQNFromDestination(destination)) : lookupDestination(session); threadsArray[i] = new ConsumerThread(session, dest, i); - threadsArray[i].setVerbose(verbose).setSleep(sleep).setDurable(durable).setBatchSize(txBatchSize).setBreakOnNull(breakOnNull).setMessageCount(messageCount).setReceiveTimeOut(receiveTimeout).setFilter(filter).setBrowse(false); + threadsArray[i].setVerbose(verbose).setSleep(sleep).setDurable(durable).setBatchSize(txBatchSize).setBreakOnNull(breakOnNull) + .setMessageCount(messageCount).setReceiveTimeOut(receiveTimeout).setFilter(filter).setBrowse(false).setListener(listener); } for (ConsumerThread thread : threadsArray) { @@ -77,9 +115,24 @@ public class Consumer extends DestAbstract { received += thread.getReceived(); } + if (messageSerializer != null) messageSerializer.stop(); + return received; } } + private class SerialiserMessageListener implements MessageListener { + private MessageSerializer messageSerializer; + + SerialiserMessageListener(MessageSerializer messageSerializer, OutputStream outputStream) throws Exception { + this.messageSerializer = messageSerializer; + this.messageSerializer.setOutput(outputStream); + } + + @Override + public void onMessage(Message message) { + messageSerializer.write(message); + } + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64ce26e7/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java index ab3640b..9fbff81 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java @@ -21,6 +21,7 @@ import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageListener; import javax.jms.ObjectMessage; import javax.jms.Queue; import javax.jms.QueueBrowser; @@ -50,6 +51,7 @@ public class ConsumerThread extends Thread { boolean running = false; CountDownLatch finished; boolean bytesAsText; + MessageListener listener; public ConsumerThread(Session session, Destination destination, int threadNr) { super("Consumer " + destination.toString() + ", thread=" + threadNr); @@ -66,6 +68,43 @@ public class ConsumerThread extends Thread { } } + private void handle(Message msg, boolean browse) throws JMSException { + if (listener != null) { + listener.onMessage(msg); + } else { + if (browse) { + if (verbose) { + System.out.println("..." + msg); + } + if (bytesAsText && (msg instanceof BytesMessage)) { + long length = ((BytesMessage) msg).getBodyLength(); + byte[] bytes = new byte[(int) length]; + ((BytesMessage) msg).readBytes(bytes); + System.out.println("Message:" + msg); + } + } else { + if (verbose) { + if (bytesAsText && (msg instanceof BytesMessage)) { + long length = ((BytesMessage) msg).getBodyLength(); + byte[] bytes = new byte[(int) length]; + ((BytesMessage) msg).readBytes(bytes); + System.out.println("Received a message with " + bytes.length); + } + + if (msg instanceof TextMessage) { + String text = ((TextMessage) msg).getText(); + System.out.println("Received text sized at " + text.length()); + } + + if (msg instanceof ObjectMessage) { + Object obj = ((ObjectMessage) msg).getObject(); + System.out.println("Received object " + obj.toString().length()); + } + } + } + } + } + public void browse() { running = true; QueueBrowser consumer = null; @@ -83,16 +122,7 @@ public class ConsumerThread extends Thread { Message msg = enumBrowse.nextElement(); if (msg != null) { System.out.println(threadName + " browsing " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID())); - - if (verbose) { - System.out.println("..." + msg); - } - if (bytesAsText && (msg instanceof BytesMessage)) { - long length = ((BytesMessage) msg).getBodyLength(); - byte[] bytes = new byte[(int) length]; - ((BytesMessage) msg).readBytes(bytes); - System.out.println("Message:" + msg); - } + handle(msg, true); received++; if (received >= messageCount) { @@ -158,24 +188,7 @@ public class ConsumerThread extends Thread { System.out.println("Received " + count); } } - if (verbose) { - if (bytesAsText && (msg instanceof BytesMessage)) { - long length = ((BytesMessage) msg).getBodyLength(); - byte[] bytes = new byte[(int) length]; - ((BytesMessage) msg).readBytes(bytes); - System.out.println("Received a message with " + bytes.length); - } - - if (msg instanceof TextMessage) { - String text = ((TextMessage) msg).getText(); - System.out.println("Received text sized at " + text.length()); - } - - if (msg instanceof ObjectMessage) { - Object obj = ((ObjectMessage) msg).getObject(); - System.out.println("Received object " + obj.toString().length()); - } - } + handle(msg, false); received++; } else { if (breakOnNull) { @@ -334,4 +347,8 @@ public class ConsumerThread extends Thread { this.browse = browse; return this; } + + public void setListener(MessageListener listener) { + this.listener = listener; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64ce26e7/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java index 2f4a34c..63b5f17 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java @@ -19,12 +19,30 @@ package org.apache.activemq.artemis.cli.commands.messages; import javax.jms.Destination; import javax.jms.Session; +import java.nio.ByteBuffer; import io.airlift.airline.Option; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientRequestor; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.api.core.management.ManagementHelper; +import org.apache.activemq.artemis.api.core.management.ResourceNames; +import org.apache.activemq.artemis.cli.factory.serialize.MessageSerializer; +import org.apache.activemq.artemis.cli.factory.serialize.XMLMessageSerializer; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; public class DestAbstract extends ConnectionAbstract { + public static final String DEFAULT_MESSAGE_SERIALIZER = "org.apache.activemq.artemis.cli.factory.serialize.XMLMessageSerializer"; + + private static final String FQQN_PREFIX = "fqqn://"; + + private static final String FQQN_SEPERATOR = "::"; + @Option(name = "--destination", description = "Destination to be used. It can be prefixed with queue:// or topic:// (Default: queue://TEST)") String destination = "queue://TEST"; @@ -40,6 +58,25 @@ public class DestAbstract extends ConnectionAbstract { @Option(name = "--threads", description = "Number of Threads to be used (Default: 1)") int threads = 1; + @Option(name = "--serializer", description = "Override the default serializer with a custom implementation") + String serializer; + + protected boolean isFQQN() throws ActiveMQException { + boolean fqqn = destination.contains("::"); + if (fqqn) { + if (!destination.startsWith("fqqn://")) { + throw new ActiveMQException("FQQN destinations must start with the fqqn:// prefix"); + } + + if (protocol.equalsIgnoreCase("AMQP")) { + throw new ActiveMQException("Sending to FQQN destinations is not support via AMQP protocol"); + } + return true; + } else { + return false; + } + } + protected Destination lookupDestination(Session session) throws Exception { if (protocol.equals("AMQP")) { return session.createQueue(destination); @@ -48,4 +85,63 @@ public class DestAbstract extends ConnectionAbstract { } } + protected MessageSerializer getMessageSerializer() { + if (serializer == null) return new XMLMessageSerializer(); + try { + return (MessageSerializer) Class.forName(serializer).getConstructor().newInstance(); + } catch (Exception e) { + System.out.println("Error: unable to instantiate serializer class: " + serializer); + System.out.println("Defaulting to: " + DEFAULT_MESSAGE_SERIALIZER); + } + return new XMLMessageSerializer(); + } + + // FIXME We currently do not support producing to FQQN. This is a work around. + private ClientSession getManagementSession() throws Exception { + ServerLocator serverLocator = ActiveMQClient.createServerLocator(brokerURL); + ClientSessionFactory sf = serverLocator.createSessionFactory(); + + ClientSession managementSession; + if (user != null || password != null) { + managementSession = sf.createSession(user, password, false, true, true, false, 0); + } else { + managementSession = sf.createSession(false, true, true); + } + return managementSession; + } + + public byte[] getQueueIdFromName(String queueName) throws Exception { + ClientMessage message = getQueueAttribute(queueName, "ID"); + Number idObject = (Number) ManagementHelper.getResult(message); + ByteBuffer byteBuffer = ByteBuffer.allocate(8); + byteBuffer.putLong(idObject.longValue()); + return byteBuffer.array(); + } + + protected ClientMessage getQueueAttribute(String queueName, String attribute) throws Exception { + ClientSession managementSession = getManagementSession(); + managementSession.start(); + + try (ClientRequestor requestor = new ClientRequestor(managementSession, "activemq.management")) { + ClientMessage managementMessage = managementSession.createMessage(false); + ManagementHelper.putAttribute(managementMessage, ResourceNames.QUEUE + queueName, attribute); + managementSession.start(); + ClientMessage reply = requestor.request(managementMessage); + return reply; + } finally { + managementSession.stop(); + } + } + + protected String getQueueFromFQQN(String fqqn) { + return fqqn.substring(fqqn.indexOf(FQQN_SEPERATOR) + FQQN_SEPERATOR.length()); + } + + protected String getAddressFromFQQN(String fqqn) { + return fqqn.substring(fqqn.indexOf(FQQN_PREFIX) + FQQN_PREFIX.length(), fqqn.indexOf(FQQN_SEPERATOR)); + } + + protected String getFQQNFromDestination(String destination) { + return destination.substring(destination.indexOf(FQQN_PREFIX) + FQQN_PREFIX.length()); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64ce26e7/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java index 3cb5eff..0936578 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java @@ -19,12 +19,21 @@ package org.apache.activemq.artemis.cli.commands.messages; import javax.jms.Connection; import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageProducer; import javax.jms.Session; +import java.io.FileInputStream; import io.airlift.airline.Command; import io.airlift.airline.Option; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.cli.commands.ActionContext; +import org.apache.activemq.artemis.cli.factory.serialize.MessageSerializer; +import org.apache.activemq.artemis.jms.client.ActiveMQMessage; @Command(name = "producer", description = "It will send messages to an instance") public class Producer extends DestAbstract { @@ -49,6 +58,9 @@ public class Producer extends DestAbstract { @Option(name = "--group", description = "Message Group to be used") String msgGroupID = null; + @Option(name = "--data", description = "Messages will be read form the specified file, other message options will be ignored.") + String fileName = null; + @Override public Object execute(ActionContext context) throws Exception { super.execute(context); @@ -56,35 +68,100 @@ public class Producer extends DestAbstract { ConnectionFactory factory = createConnectionFactory(); try (Connection connection = factory.createConnection()) { - ProducerThread[] threadsArray = new ProducerThread[threads]; - for (int i = 0; i < threads; i++) { - Session session; - if (txBatchSize > 0) { - session = connection.createSession(true, Session.SESSION_TRANSACTED); - } else { - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - Destination dest = lookupDestination(session); - threadsArray[i] = new ProducerThread(session, dest, i); - threadsArray[i].setVerbose(verbose).setSleep(sleep).setPersistent(!nonpersistent). - setMessageSize(messageSize).setTextMessageSize(textMessageSize).setObjectSize(objectSize). - setMsgTTL(msgTTL).setMsgGroupID(msgGroupID).setTransactionBatchSize(txBatchSize). - setMessageCount(messageCount); + byte[] queueId = null; + boolean isFQQN = isFQQN(); + if (isFQQN) { + queueId = getQueueIdFromName(getQueueFromFQQN(destination)); } - for (ProducerThread thread : threadsArray) { - thread.start(); - } + // If we are reading from file, we process messages sequentially to guarantee ordering. i.e. no thread creation. + if (fileName != null) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination dest = lookupDestination(session, isFQQN); + + MessageProducer producer = session.createProducer(dest); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + int messageCount = 0; + try { + MessageSerializer serializer = getMessageSerializer(); + serializer.setInput(new FileInputStream(fileName), session); + serializer.start(); + + Message message = serializer.read(); + + while (message != null) { + if (queueId != null) ((ActiveMQMessage) message).getCoreMessage().putBytesProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTE_TO_IDS, queueId); + producer.send(message); + message = serializer.read(); + messageCount++; + } + + session.commit(); + serializer.stop(); + } catch (Exception e) { + System.err.println("Error occurred during import. Rolling back."); + session.rollback(); + e.printStackTrace(); + return 0; + } + System.out.println("Sent " + messageCount + " Messages."); + return messageCount; + } else { + ProducerThread[] threadsArray = new ProducerThread[threads]; + for (int i = 0; i < threads; i++) { + Session session; + if (txBatchSize > 0) { + session = connection.createSession(true, Session.SESSION_TRANSACTED); + } else { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + Destination dest = lookupDestination(session, isFQQN); + threadsArray[i] = new ProducerThread(session, dest, i); + + threadsArray[i].setVerbose(verbose).setSleep(sleep).setPersistent(!nonpersistent). + setMessageSize(messageSize).setTextMessageSize(textMessageSize).setObjectSize(objectSize). + setMsgTTL(msgTTL).setMsgGroupID(msgGroupID).setTransactionBatchSize(txBatchSize). + setMessageCount(messageCount).setQueueId(queueId); + } + + for (ProducerThread thread : threadsArray) { + thread.start(); + } - int messagesProduced = 0; - for (ProducerThread thread : threadsArray) { - thread.join(); - messagesProduced += thread.getSentCount(); + int messagesProduced = 0; + for (ProducerThread thread : threadsArray) { + thread.join(); + messagesProduced += thread.getSentCount(); + } + return messagesProduced; } + } + } - return messagesProduced; + public Destination lookupDestination(Session session, boolean isFQQN) throws Exception { + Destination dest; + if (!isFQQN) { + dest = lookupDestination(session); + } else { + String address = getAddressFromFQQN(destination); + if (isFQQNAnycast(getQueueFromFQQN(destination))) { + String queue = getQueueFromFQQN(destination); + if (!queue.equals(address)) { + throw new ActiveMQException("FQQN support is limited to Anycast queues where the queue name equals the address."); + } + dest = session.createQueue(address); + } else { + dest = session.createTopic(address); + } } + return dest; } + protected boolean isFQQNAnycast(String queueName) throws Exception { + ClientMessage message = getQueueAttribute(queueName, "RoutingType"); + String routingType = (String) ManagementHelper.getResult(message); + return routingType.equalsIgnoreCase("anycast"); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64ce26e7/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ProducerThread.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ProducerThread.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ProducerThread.java index 6e9fc5c..58a57ef 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ProducerThread.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ProducerThread.java @@ -30,6 +30,7 @@ import java.io.InputStreamReader; import java.net.URL; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.jms.client.ActiveMQMessage; import org.apache.activemq.artemis.utils.ReusableLatch; public class ProducerThread extends Thread { @@ -48,6 +49,7 @@ public class ProducerThread extends Thread { long msgTTL = 0L; String msgGroupID = null; int transactionBatchSize; + byte[] queueId = null; int transactions = 0; final AtomicInteger sentCount = new AtomicInteger(0); @@ -121,6 +123,11 @@ public class ProducerThread extends Thread { private void sendMessage(MessageProducer producer, String threadName) throws Exception { Message message = createMessage(sentCount.get(), threadName); + + if (queueId != null) { + ((ActiveMQMessage) message).getCoreMessage().putBytesProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTE_TO_IDS, queueId); + } + producer.send(message); if (verbose) { System.out.println(threadName + " Sent: " + (message instanceof TextMessage ? ((TextMessage) message).getText() : message.getJMSMessageID())); @@ -370,4 +377,8 @@ public class ProducerThread extends Thread { this.objectSize = objectSize; return this; } + + public void setQueueId(byte[] queueId) { + this.queueId = queueId; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64ce26e7/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/factory/serialize/MessageSerializer.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/factory/serialize/MessageSerializer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/factory/serialize/MessageSerializer.java new file mode 100644 index 0000000..9207908 --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/factory/serialize/MessageSerializer.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.cli.factory.serialize; + +import javax.jms.Message; +import javax.jms.Session; +import java.io.InputStream; +import java.io.OutputStream; + +public interface MessageSerializer { + + Message read() throws Exception; + + void write(Message message); + + void setOutput(OutputStream out) throws Exception; + + void setInput(InputStream in, Session session) throws Exception; + + void start() throws Exception; + + void stop() throws Exception; +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64ce26e7/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/factory/serialize/XMLMessageSerializer.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/factory/serialize/XMLMessageSerializer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/factory/serialize/XMLMessageSerializer.java new file mode 100644 index 0000000..e353b94 --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/factory/serialize/XMLMessageSerializer.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.cli.factory.serialize; + +import javax.jms.Message; +import javax.jms.Session; +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLOutputFactory; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; +import java.io.FileInputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.Proxy; + +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.cli.commands.tools.xml.XMLMessageExporter; +import org.apache.activemq.artemis.cli.commands.tools.xml.XMLMessageImporter; +import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataConstants; +import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataExporter; +import org.apache.activemq.artemis.jms.client.ActiveMQMessage; +import org.apache.activemq.artemis.jms.client.ActiveMQSession; + +public class XMLMessageSerializer implements MessageSerializer { + + private XMLMessageExporter writer; + + private XMLMessageImporter reader; + + private ClientSession clientSession; + + private OutputStream out; + + @Override + public synchronized Message read() throws Exception { + reader.getRawXMLReader().nextTag(); + + // End of document. + if (reader.getRawXMLReader().getLocalName().equals("messages")) return null; + + XMLMessageImporter.MessageInfo messageInfo = reader.readMessage(true); + if (messageInfo == null) return null; + + // This is a large message + ActiveMQMessage jmsMessage = new ActiveMQMessage((ClientMessage) messageInfo.message, clientSession); + if (messageInfo.tempFile != null) { + jmsMessage.setInputStream(new FileInputStream(messageInfo.tempFile)); + } + return jmsMessage; + } + + @Override + public synchronized void write(Message message) { + try { + ICoreMessage core = ((ActiveMQMessage) message).getCoreMessage(); + writer.printSingleMessageAsXML(core, null, true); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void setOutput(OutputStream outputStream) throws Exception { + this.out = outputStream; + XMLOutputFactory factory = XMLOutputFactory.newInstance(); + XMLStreamWriter rawXmlWriter = factory.createXMLStreamWriter(outputStream, "UTF-8"); + XmlDataExporter.PrettyPrintHandler handler = new XmlDataExporter.PrettyPrintHandler(rawXmlWriter); + XMLStreamWriter xmlWriter = (XMLStreamWriter) Proxy.newProxyInstance(XMLStreamWriter.class.getClassLoader(), new Class[]{XMLStreamWriter.class}, handler); + this.writer = new XMLMessageExporter(xmlWriter); + } + + @Override + public void setInput(InputStream inputStream, Session session) throws Exception { + XMLStreamReader streamReader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream); + this.clientSession = ((ActiveMQSession) session).getCoreSession(); + this.reader = new XMLMessageImporter(streamReader, clientSession); + } + + @Override + public synchronized void start() throws Exception { + if (writer != null) { + writer.getRawXMLWriter().writeStartDocument(XmlDataConstants.XML_VERSION); + writer.getRawXMLWriter().writeStartElement(XmlDataConstants.MESSAGES_PARENT); + } + + if (reader != null) { + // <messages> + reader.getRawXMLReader().nextTag(); + } + } + + @Override + public synchronized void stop() throws Exception { + if (writer != null) { + writer.getRawXMLWriter().writeEndElement(); + writer.getRawXMLWriter().writeEndDocument(); + writer.getRawXMLWriter().flush(); + writer.getRawXMLWriter().close(); + out.flush(); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64ce26e7/artemis-cli/src/test/java/org/apache/activemq/cli/test/MessageSerializerTest.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/MessageSerializerTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/MessageSerializerTest.java new file mode 100644 index 0000000..df79104 --- /dev/null +++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/MessageSerializerTest.java @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.cli.test; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.cli.Artemis; +import org.apache.activemq.artemis.cli.commands.Run; +import org.apache.activemq.artemis.jlibaio.LibaioContext; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Test to validate that the CLI doesn't throw improper exceptions when invoked. + */ +public class MessageSerializerTest extends CliTestBase { + + private Connection connection; + + @Before + @Override + public void setup() throws Exception { + setupAuth(); + super.setup(); + startServer(); + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); + connection = cf.createConnection("admin", "admin"); + } + + @After + @Override + public void tearDown() throws Exception { + try { + connection.close(); + } finally { + stopServer(); + super.tearDown(); + } + } + + private void setupAuth() throws Exception { + setupAuth(temporaryFolder.getRoot()); + } + + private void setupAuth(File folder) throws Exception { + System.setProperty("java.security.auth.login.config", folder.getAbsolutePath() + "/etc/login.config"); + } + + private void startServer() throws Exception { + File rootDirectory = new File(temporaryFolder.getRoot(), "broker"); + setupAuth(rootDirectory); + Run.setEmbedded(true); + Artemis.main("create", rootDirectory.getAbsolutePath(), "--silent", "--no-fsync", "--no-autotune", "--no-web", "--require-login"); + System.setProperty("artemis.instance", rootDirectory.getAbsolutePath()); + Artemis.internalExecute("run"); + } + + private void stopServer() throws Exception { + Artemis.internalExecute("stop"); + assertTrue(Run.latchRunning.await(5, TimeUnit.SECONDS)); + assertEquals(0, LibaioContext.getTotalMaxIO()); + } + + private File createMessageFile() throws IOException { + return temporaryFolder.newFile("messages.xml"); + } + + @Test + public void testTextMessageImportExport() throws Exception { + String address = "test"; + int noMessages = 10; + File file = createMessageFile(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + + List<Message> sent = new ArrayList<>(noMessages); + for (int i = 0; i < noMessages; i++) { + sent.add(session.createTextMessage(RandomUtil.randomString())); + } + + sendMessages(session, address, sent); + exportMessages(address, noMessages, file); + + // Ensure there's nothing left to consume + MessageConsumer consumer = session.createConsumer(getDestination(address)); + assertNull(consumer.receive(1000)); + consumer.close(); + + importMessages(address, file); + + List<Message> received = consumeMessages(session, address, noMessages, false); + for (int i = 0; i < noMessages; i++) { + assertEquals(((TextMessage) sent.get(i)).getText(), ((TextMessage) received.get(i)).getText()); + } + } + + @Test + public void testObjectMessageImportExport() throws Exception { + String address = "test"; + int noMessages = 10; + File file = createMessageFile(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + + // Send initial messages. + List<Message> sent = new ArrayList<>(noMessages); + for (int i = 0; i < noMessages; i++) { + sent.add(session.createObjectMessage(UUID.randomUUID())); + } + + sendMessages(session, address, sent); + exportMessages(address, noMessages, file); + + // Ensure there's nothing left to consume + MessageConsumer consumer = session.createConsumer(getDestination(address)); + assertNull(consumer.receive(1000)); + consumer.close(); + + importMessages(address, file); + List<Message> received = consumeMessages(session, address, noMessages, false); + for (int i = 0; i < noMessages; i++) { + assertEquals(((ObjectMessage) sent.get(i)).getObject(), ((ObjectMessage) received.get(i)).getObject()); + } + } + + @Test + public void testMapMessageImportExport() throws Exception { + String address = "test"; + int noMessages = 10; + String key = "testKey"; + File file = createMessageFile(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + + List<Message> sent = new ArrayList<>(noMessages); + for (int i = 0; i < noMessages; i++) { + MapMessage m = session.createMapMessage(); + m.setString(key, RandomUtil.randomString()); + sent.add(m); + } + + sendMessages(session, address, sent); + exportMessages(address, noMessages, file); + + // Ensure there's nothing left to consume + MessageConsumer consumer = session.createConsumer(getDestination(address)); + assertNull(consumer.receive(1000)); + consumer.close(); + + importMessages(address, file); + List<Message> received = consumeMessages(session, address, noMessages, false); + for (int i = 0; i < noMessages; i++) { + assertEquals(((MapMessage) sent.get(i)).getString(key), ((MapMessage) received.get(i)).getString(key)); + } + } + + private void sendMessages(Session session, String address, List<Message> messages) throws Exception { + MessageProducer producer = session.createProducer(getDestination(address)); + for (Message m : messages) { + producer.send(m); + } + } + + private void sendMessages(Session session, Destination destination, List<Message> messages) throws Exception { + MessageProducer producer = session.createProducer(destination); + for (Message m : messages) { + producer.send(m); + } + } + + private List<Message> consumeMessages(Session session, String address, int noMessages, boolean fqqn) throws Exception { + Destination destination = fqqn ? session.createQueue(address) : getDestination(address); + MessageConsumer consumer = session.createConsumer(destination); + + List<Message> messages = new ArrayList<>(); + for (int i = 0; i < noMessages; i++) { + Message m = consumer.receive(1000); + assertNotNull(m); + messages.add(m); + } + return messages; + } + + private void exportMessages(String address, int noMessages, File output) throws Exception { + Artemis.main("consumer", + "--user", "admin", + "--password", "admin", + "--destination", address, + "--message-count", "" + noMessages, + "--data", output.getAbsolutePath()); + } + + private void importMessages(String address, File input) throws Exception { + Artemis.main("producer", + "--user", "admin", + "--password", "admin", + "--destination", address, + "--data", input.getAbsolutePath()); + } + + private void createQueue(String routingTypeOption, String address, String queueName) throws Exception { + Artemis.main("queue", "create", + "--user", "admin", + "--password", "admin", + "--address", address, + "--name", queueName, + routingTypeOption, + "--durable", + "--preserve-on-no-consumers", + "--auto-create-address"); + } + + @Test + public void testSendDirectToQueue() throws Exception { + + String address = "test"; + String queue1Name = "queue1"; + String queue2Name = "queue2"; + + createQueue("--multicast", address, queue1Name); + createQueue("--multicast", address, queue2Name); + + try (ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = cf.createConnection("admin", "admin");) { + + // send messages to queue + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + + Destination queue1 = session.createQueue(address + "::" + queue1Name); + Destination queue2 = session.createQueue(address + "::" + queue2Name); + + MessageConsumer consumer1 = session.createConsumer(queue1); + MessageConsumer consumer2 = session.createConsumer(queue2); + + Artemis.main("producer", + "--user", "admin", + "--password", "admin", + "--destination", "fqqn://" + address + "::" + queue1Name, + "--message-count", "5"); + + assertNull(consumer2.receive(1000)); + assertNotNull(consumer1.receive(1000)); + } + } + + @Test + public void exportFromFQQN() throws Exception { + String addr = "address"; + String queue = "queue"; + String fqqn = addr + "::" + queue; + String destination = "fqqn://" + fqqn; + + File file = createMessageFile(); + int noMessages = 10; + + createQueue("--multicast", addr, queue); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + + Topic topic = session.createTopic(addr); + + List<Message> messages = new ArrayList<>(noMessages); + for (int i = 0; i < noMessages; i++) { + messages.add(session.createTextMessage(RandomUtil.randomString())); + } + + sendMessages(session, topic, messages); + + exportMessages(destination, noMessages, file); + importMessages(destination, file); + + List<Message> recieved = consumeMessages(session, fqqn, noMessages, true); + for (int i = 0; i < noMessages; i++) { + assertEquals(((TextMessage) messages.get(i)).getText(), ((TextMessage) recieved.get(i)).getText()); + } + } + + //read individual lines from byteStream + private ArrayList<String> getOutputLines(TestActionContext context, boolean errorOutput) throws IOException { + byte[] bytes; + + if (errorOutput) { + bytes = context.getStdErrBytes(); + } else { + bytes = context.getStdoutBytes(); + } + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(bytes))); + ArrayList<String> lines = new ArrayList<>(); + + String currentLine = bufferedReader.readLine(); + while (currentLine != null) { + lines.add(currentLine); + currentLine = bufferedReader.readLine(); + } + + return lines; + } + + private void sendMessages(Session session, String queueName, int messageCount) throws JMSException { + MessageProducer producer = session.createProducer(getDestination(queueName)); + + TextMessage message = session.createTextMessage(getTestMessageBody()); + + for (int i = 0; i < messageCount; i++) { + producer.send(message); + } + } + + private String getTestMessageBody() { + return "Sample Message"; + } + + private Destination getDestination(String queueName) { + return ActiveMQDestination.createDestination("queue://" + queueName, ActiveMQDestination.TYPE.QUEUE); + } +}
