This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 7ad53e5 ARTEMIS-2645 refactor CLI FQQN support
new 3bcaaab This closes #3003
7ad53e5 is described below
commit 7ad53e57487f58e895aa7188fc1a89096ec3714f
Author: Justin Bertram <[email protected]>
AuthorDate: Wed Mar 4 22:12:28 2020 -0600
ARTEMIS-2645 refactor CLI FQQN support
FQQN support for the CLI was implemented via ARTEMIS-1840 before general
FQQN support was added for producers via ARTEMIS-1867. The CLI's FQQN
functionality is slightly different from what is now generally available
and it can be confusing for users. By refactoring the CLI to use the
general FQQN support the code can be much simpler and consistent with
the expected behavior. Refactoring includes:
- Deprecating the use of "fqqn://". The CLI commands use JMS so using
"fqqn://" (instead of "queue://" or "topic://") makes the destination
type ambiguous which can yield unexpected message routing behavior.
Now "queue://" and "topic://" can be used with the normal FQQN syntax
(e.g. address::queue).
- Eliminating the use of the _AMQ_ROUTE_TO header when sending messags
to an FQQN. The _AMQ_ROUTE_TO header is an internal header used when
routing messages over a cluster bridge. Using it in the CLI for FQQN
support was a clever hack, but using the general FQQN support
eliminates complexity and makes behavior consistent between
standalone JMS clients using FQQN and the CLI.
- De-duplicating MessageSerializer initialization boilerplate.
- Removing limitation where using an FQQN with an anycast address
required the same name for the address and queue.
---
.../artemis/cli/commands/messages/Browse.java | 14 +--
.../artemis/cli/commands/messages/Consumer.java | 47 ++++-----
.../cli/commands/messages/DestAbstract.java | 109 +++++----------------
.../artemis/cli/commands/messages/Producer.java | 76 ++++++--------
.../cli/commands/messages/ProducerThread.java | 10 --
.../apache/activemq/cli/test/CliProducerTest.java | 5 +-
.../activemq/cli/test/MessageSerializerTest.java | 104 ++++++++------------
7 files changed, 125 insertions(+), 240 deletions(-)
diff --git
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Browse.java
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Browse.java
index 7af3713..facd34d 100644
---
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Browse.java
+++
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Browse.java
@@ -49,10 +49,15 @@ public class Browse extends DestAbstract {
} else {
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
}
- Destination dest = lookupDestination(session);
+ Destination dest = getDestination(session);
threadsArray[i] = new ConsumerThread(session, dest, i);
-
threadsArray[i].setVerbose(verbose).setSleep(sleep).setMessageCount(messageCount).setFilter(filter).setBrowse(true);
+ threadsArray[i]
+ .setVerbose(verbose)
+ .setSleep(sleep)
+ .setMessageCount(messageCount)
+ .setFilter(filter)
+ .setBrowse(true);
}
for (ConsumerThread thread : threadsArray) {
@@ -69,11 +74,6 @@ public class Browse extends DestAbstract {
}
return received;
- } finally {
- if (factory instanceof AutoCloseable) {
- ((AutoCloseable) factory).close();
- }
}
}
-
}
diff --git
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java
index c756e60..71eac78 100644
---
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java
+++
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java
@@ -56,30 +56,25 @@ public class Consumer extends DestAbstract {
System.out.println("Consumer:: filter = " + filter);
SerialiserMessageListener listener = null;
- MessageSerializer messageSerializer = null;
+ MessageSerializer serializer = null;
if (file != null) {
- try {
- String className = serializer == null ? DEFAULT_MESSAGE_SERIALIZER
: serializer;
- if (className.equals(DEFAULT_MESSAGE_SERIALIZER) &&
!protocol.equalsIgnoreCase("CORE")) {
- System.err.println("Default Serializer does not support: " +
protocol + " protocol");
- return null;
- }
- messageSerializer = (MessageSerializer)
Class.forName(className).getConstructor().newInstance();
- } catch (Exception e) {
- System.err.println("Error. Unable to instantiate serializer class:
" + serializer);
+ serializer = getMessageSerializer();
+ if (serializer == null) {
+ System.err.println("Error. Unable to instantiate serializer class:
" + this.serializer);
return null;
}
+ OutputStream out;
try {
- OutputStream out = new FileOutputStream(file);
- listener = new SerialiserMessageListener(messageSerializer, out);
+ out = new FileOutputStream(file);
} catch (Exception e) {
System.err.println("Error: Unable to open file for writing\n" +
e.getMessage());
return null;
}
- }
- if (messageSerializer != null) messageSerializer.start();
+ listener = new SerialiserMessageListener(serializer, out);
+ serializer.start();
+ }
ConnectionFactory factory = createConnectionFactory();
@@ -94,12 +89,20 @@ public class Consumer extends DestAbstract {
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
}
- // Do validation on FQQN
- Destination dest = isFQQN() ?
session.createQueue(getFQQNFromDestination(destination)) :
lookupDestination(session);
+ Destination dest = getDestination(session);
threadsArray[i] = new ConsumerThread(session, dest, i);
-
threadsArray[i].setVerbose(verbose).setSleep(sleep).setDurable(durable).setBatchSize(txBatchSize).setBreakOnNull(breakOnNull)
-
.setMessageCount(messageCount).setReceiveTimeOut(receiveTimeout).setFilter(filter).setBrowse(false).setListener(listener);
+ threadsArray[i]
+ .setVerbose(verbose)
+ .setSleep(sleep)
+ .setDurable(durable)
+ .setBatchSize(txBatchSize)
+ .setBreakOnNull(breakOnNull)
+ .setMessageCount(messageCount)
+ .setReceiveTimeOut(receiveTimeout)
+ .setFilter(filter)
+ .setBrowse(false)
+ .setListener(listener);
}
for (ConsumerThread thread : threadsArray) {
@@ -115,13 +118,11 @@ public class Consumer extends DestAbstract {
received += thread.getReceived();
}
- if (messageSerializer != null) messageSerializer.stop();
+ if (serializer != null) {
+ serializer.stop();
+ }
return received;
- } finally {
- if (factory instanceof AutoCloseable) {
- ((AutoCloseable) factory).close();
- }
}
}
diff --git
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java
index ff6f71a..0611ec5 100644
---
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java
+++
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java
@@ -18,33 +18,17 @@
package org.apache.activemq.artemis.cli.commands.messages;
import javax.jms.Destination;
+import javax.jms.JMSException;
import javax.jms.Session;
-import java.nio.ByteBuffer;
import io.airlift.airline.Option;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
-import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
-import org.apache.activemq.artemis.api.core.client.ClientMessage;
-import org.apache.activemq.artemis.api.core.client.ClientRequestor;
-import org.apache.activemq.artemis.api.core.client.ClientSession;
-import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
-import org.apache.activemq.artemis.api.core.client.ServerLocator;
-import org.apache.activemq.artemis.api.core.management.ManagementHelper;
-import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.cli.factory.serialize.MessageSerializer;
import org.apache.activemq.artemis.cli.factory.serialize.XMLMessageSerializer;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
public class DestAbstract extends ConnectionAbstract {
- public static final String DEFAULT_MESSAGE_SERIALIZER =
"org.apache.activemq.artemis.cli.factory.serialize.XMLMessageSerializer";
-
- private static final String FQQN_PREFIX = "fqqn://";
-
- private static final String FQQN_SEPERATOR = "::";
-
- @Option(name = "--destination", description = "Destination to be used. It
can be prefixed with queue:// or topic:// or fqqn:// (Default: queue://TEST)")
+ @Option(name = "--destination", description = "Destination to be used. It
can be prefixed with queue:// or topic:// and can be an FQQN in the form of
<address>::<queue>. (Default: queue://TEST)")
String destination = "queue://TEST";
@Option(name = "--message-count", description = "Number of messages to act
on (Default: 1000)")
@@ -62,86 +46,37 @@ public class DestAbstract extends ConnectionAbstract {
@Option(name = "--serializer", description = "Override the default
serializer with a custom implementation")
String serializer;
- protected boolean isFQQN() throws ActiveMQException {
- boolean fqqn = destination.contains("::");
- if (fqqn) {
- if (!destination.startsWith("fqqn://")) {
- throw new ActiveMQException("FQQN destinations must start with the
fqqn:// prefix");
- }
-
- if (protocol.equalsIgnoreCase("AMQP")) {
- throw new ActiveMQException("Sending to FQQN destinations is not
support via AMQP protocol");
+ protected MessageSerializer getMessageSerializer() {
+ if (serializer != null) {
+ try {
+ return (MessageSerializer)
Class.forName(serializer).getConstructor().newInstance();
+ } catch (Exception e) {
+ System.err.println("Error: unable to instantiate serializer class:
" + serializer);
+ System.err.println("Defaulting to: " +
XMLMessageSerializer.class.getName());
}
- return true;
- } else {
- return false;
}
- }
- protected Destination lookupDestination(Session session) throws Exception {
- if (protocol.equals("AMQP")) {
- return session.createQueue(destination);
- } else {
- return ActiveMQDestination.createDestination(this.destination,
ActiveMQDestination.TYPE.QUEUE);
+ if (!protocol.equalsIgnoreCase("CORE")) {
+ System.err.println("Default Serializer does not support: " + protocol
+ " protocol");
+ return null;
}
- }
- protected MessageSerializer getMessageSerializer() {
- if (serializer == null) return new XMLMessageSerializer();
- try {
- return (MessageSerializer)
Class.forName(serializer).getConstructor().newInstance();
- } catch (Exception e) {
- System.out.println("Error: unable to instantiate serializer class: "
+ serializer);
- System.out.println("Defaulting to: " + DEFAULT_MESSAGE_SERIALIZER);
- }
return new XMLMessageSerializer();
}
- public byte[] getQueueIdFromName(String queueName) throws Exception {
- try {
- ClientMessage message = getQueueAttribute(queueName, "ID");
- Number idObject = (Number) ManagementHelper.getResult(message);
- ByteBuffer byteBuffer = ByteBuffer.allocate(8);
- byteBuffer.putLong(idObject.longValue());
- return byteBuffer.array();
- } catch (Exception e) {
- throw new ActiveMQException("Error occured when looking up FQQN.
Please ensure the FQQN exists.", e, ActiveMQExceptionType.ILLEGAL_STATE);
+ protected Destination getDestination(Session session) throws JMSException {
+ if (destination.startsWith(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX)) {
+ return session.createTopic(stripPrefix(destination));
}
+ return session.createQueue(stripPrefix(destination));
}
- protected ClientMessage getQueueAttribute(String queueName, String
attribute) throws Exception {
- try (ServerLocator serverLocator =
ActiveMQClient.createServerLocator(brokerURL)) {
- try (ClientSessionFactory sf = serverLocator.createSessionFactory()) {
- ClientSession managementSession;
- if (user != null || password != null) {
- managementSession = sf.createSession(user, password, false,
true, true, false, 0);
- } else {
- managementSession = sf.createSession(false, true, true);
- }
- managementSession.start();
-
- try (ClientRequestor requestor = new
ClientRequestor(managementSession, "activemq.management")) {
- ClientMessage managementMessage =
managementSession.createMessage(false);
- ManagementHelper.putAttribute(managementMessage,
ResourceNames.QUEUE + queueName, attribute);
- managementSession.start();
- ClientMessage reply = requestor.request(managementMessage);
- return reply;
- } finally {
- managementSession.stop();
- }
- }
+ private String stripPrefix(String destination) {
+ int index = destination.indexOf("://");
+ if (index != -1) {
+ return destination.substring(index + 3);
+ } else {
+ return destination;
}
}
-
- protected String getQueueFromFQQN(String fqqn) {
- return fqqn.substring(fqqn.indexOf(FQQN_SEPERATOR) +
FQQN_SEPERATOR.length());
- }
-
- protected String getAddressFromFQQN(String fqqn) {
- return fqqn.substring(fqqn.indexOf(FQQN_PREFIX) + FQQN_PREFIX.length(),
fqqn.indexOf(FQQN_SEPERATOR));
- }
-
- protected String getFQQNFromDestination(String destination) {
- return destination.substring(destination.indexOf(FQQN_PREFIX) +
FQQN_PREFIX.length());
- }
}
diff --git
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java
index 92e7634..442017c 100644
---
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java
+++
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java
@@ -25,15 +25,12 @@ import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.io.FileInputStream;
+import java.io.InputStream;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.client.ClientMessage;
-import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.factory.serialize.MessageSerializer;
-import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
@Command(name = "producer", description = "It will send messages to an
instance")
public class Producer extends DestAbstract {
@@ -72,16 +69,10 @@ public class Producer extends DestAbstract {
try (Connection connection = factory.createConnection()) {
- byte[] queueId = null;
- boolean isFQQN = isFQQN();
- if (isFQQN) {
- queueId = getQueueIdFromName(getQueueFromFQQN(destination));
- }
-
// If we are reading from file, we process messages sequentially to
guarantee ordering. i.e. no thread creation.
if (fileName != null) {
Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
- Destination dest = lookupDestination(session, isFQQN);
+ Destination dest = getDestination(session);
MessageProducer producer = session.createProducer(dest);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
@@ -89,13 +80,25 @@ public class Producer extends DestAbstract {
int messageCount = 0;
try {
MessageSerializer serializer = getMessageSerializer();
- serializer.setInput(new FileInputStream(fileName), session);
+ if (serializer == null) {
+ System.err.println("Error. Unable to instantiate serializer
class: " + serializer);
+ return null;
+ }
+
+ InputStream in;
+ try {
+ in = new FileInputStream(fileName);
+ } catch (Exception e) {
+ System.err.println("Error: Unable to open file for
reading\n" + e.getMessage());
+ return null;
+ }
+
+ serializer.setInput(in, session);
serializer.start();
Message message = serializer.read();
while (message != null) {
- if (queueId != null) ((ActiveMQMessage)
message).getCoreMessage().putBytesProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTE_TO_IDS,
queueId);
producer.send(message);
message = serializer.read();
messageCount++;
@@ -120,13 +123,21 @@ public class Producer extends DestAbstract {
} else {
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
}
- Destination dest = lookupDestination(session, isFQQN);
+ Destination dest = getDestination(session);
threadsArray[i] = new ProducerThread(session, dest, i);
-
threadsArray[i].setVerbose(verbose).setSleep(sleep).setPersistent(!nonpersistent).
-
setMessageSize(messageSize).setTextMessageSize(textMessageSize).setMessage(message).setObjectSize(objectSize).
-
setMsgTTL(msgTTL).setMsgGroupID(msgGroupID).setTransactionBatchSize(txBatchSize).
- setMessageCount(messageCount).setQueueId(queueId);
+ threadsArray[i]
+ .setVerbose(verbose)
+ .setSleep(sleep)
+ .setPersistent(!nonpersistent)
+ .setMessageSize(messageSize)
+ .setTextMessageSize(textMessageSize)
+ .setMessage(message)
+ .setObjectSize(objectSize)
+ .setMsgTTL(msgTTL)
+ .setMsgGroupID(msgGroupID)
+ .setTransactionBatchSize(txBatchSize)
+ .setMessageCount(messageCount);
}
for (ProducerThread thread : threadsArray) {
@@ -140,35 +151,6 @@ public class Producer extends DestAbstract {
}
return messagesProduced;
}
- } finally {
- if (factory instanceof AutoCloseable) {
- ((AutoCloseable) factory).close();
- }
}
}
-
- public Destination lookupDestination(Session session, boolean isFQQN)
throws Exception {
- Destination dest;
- if (!isFQQN) {
- dest = lookupDestination(session);
- } else {
- String address = getAddressFromFQQN(destination);
- if (isFQQNAnycast(getQueueFromFQQN(destination))) {
- String queue = getQueueFromFQQN(destination);
- if (!queue.equals(address)) {
- throw new ActiveMQException("FQQN support is limited to Anycast
queues where the queue name equals the address.");
- }
- dest = session.createQueue(address);
- } else {
- dest = session.createTopic(address);
- }
- }
- return dest;
- }
-
- protected boolean isFQQNAnycast(String queueName) throws Exception {
- ClientMessage message = getQueueAttribute(queueName, "RoutingType");
- String routingType = (String) ManagementHelper.getResult(message);
- return routingType.equalsIgnoreCase("anycast");
- }
}
diff --git
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ProducerThread.java
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ProducerThread.java
index e3a4a23..19fbba9 100644
---
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ProducerThread.java
+++
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ProducerThread.java
@@ -30,7 +30,6 @@ import java.io.InputStreamReader;
import java.net.URL;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
import org.apache.activemq.artemis.utils.ReusableLatch;
public class ProducerThread extends Thread {
@@ -49,7 +48,6 @@ public class ProducerThread extends Thread {
long msgTTL = 0L;
String msgGroupID = null;
int transactionBatchSize;
- byte[] queueId = null;
int transactions = 0;
final AtomicInteger sentCount = new AtomicInteger(0);
@@ -124,10 +122,6 @@ public class ProducerThread extends Thread {
private void sendMessage(MessageProducer producer, String threadName)
throws Exception {
Message message = createMessage(sentCount.get(), threadName);
- if (queueId != null) {
- ((ActiveMQMessage)
message).getCoreMessage().putBytesProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTE_TO_IDS,
queueId);
- }
-
producer.send(message);
if (verbose) {
System.out.println(threadName + " Sent: " + (message instanceof
TextMessage ? ((TextMessage) message).getText() : message.getJMSMessageID()));
@@ -377,8 +371,4 @@ public class ProducerThread extends Thread {
this.objectSize = objectSize;
return this;
}
-
- public void setQueueId(byte[] queueId) {
- this.queueId = queueId;
- }
}
diff --git
a/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliProducerTest.java
b/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliProducerTest.java
index 85d6727..0f39a1f 100644
---
a/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliProducerTest.java
+++
b/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliProducerTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.cli.test;
import org.apache.activemq.artemis.cli.Artemis;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.utils.CompositeAddress;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -73,10 +74,8 @@ public class CliProducerTest extends CliTestBase {
private void checkSentMessages(Session session, String address, String
messageBody) throws Exception {
final boolean isCustomMessageBody = messageBody != null;
- boolean fqqn = false;
- if (address.contains("::")) fqqn = true;
- List<Message> received = consumeMessages(session, address,
TEST_MESSAGE_COUNT, fqqn);
+ List<Message> received = consumeMessages(session, address,
TEST_MESSAGE_COUNT, CompositeAddress.isFullyQualified(address));
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
if (!isCustomMessageBody) messageBody = "test message: " +
String.valueOf(i);
assertEquals(messageBody, ((TextMessage) received.get(i)).getText());
diff --git
a/artemis-cli/src/test/java/org/apache/activemq/cli/test/MessageSerializerTest.java
b/artemis-cli/src/test/java/org/apache/activemq/cli/test/MessageSerializerTest.java
index aa2e76a..51cc3e5 100644
---
a/artemis-cli/src/test/java/org/apache/activemq/cli/test/MessageSerializerTest.java
+++
b/artemis-cli/src/test/java/org/apache/activemq/cli/test/MessageSerializerTest.java
@@ -27,17 +27,18 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
-import java.io.InputStreamReader;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.UUID;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.cli.Artemis;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.After;
import org.junit.Before;
@@ -100,10 +101,7 @@ public class MessageSerializerTest extends CliTestBase {
}
private void checkSentMessages(Session session, List<Message> messages,
String address) throws Exception {
- boolean fqqn = false;
- if (address.contains("::")) fqqn = true;
-
- List<Message> recieved = consumeMessages(session, address,
TEST_MESSAGE_COUNT, fqqn);
+ List<Message> recieved = consumeMessages(session, address,
TEST_MESSAGE_COUNT, CompositeAddress.isFullyQualified(address));
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
assertEquals(((TextMessage) messages.get(i)).getText(),
((TextMessage) recieved.get(i)).getText());
}
@@ -128,26 +126,18 @@ public class MessageSerializerTest extends CliTestBase {
}
private void exportMessages(String address, int noMessages, boolean
durable, String clientId, File output) throws Exception {
+ List<String> args = new ArrayList<>(Arrays.asList("consumer",
+ "--user", "admin",
+ "--password", "admin",
+ "--destination",
address,
+ "--message-count",
Integer.toString(noMessages),
+ "--data",
output.getAbsolutePath(),
+ "--clientID",
clientId));
if (durable) {
- String[] args = {"consumer",
- "--user", "admin",
- "--password", "admin",
- "--destination", address,
- "--message-count", Integer.toString(noMessages),
- "--data", output.getAbsolutePath(),
- "--clientID", clientId,
- "--durable"};
- Artemis.main(args);
- } else {
- String[] args = {"consumer",
- "--user", "admin",
- "--password", "admin",
- "--destination", address,
- "--message-count", Integer.toString(noMessages),
- "--data", output.getAbsolutePath(),
- "--clientID", clientId};
- Artemis.main(args);
+ args.add("--durable");
}
+
+ Artemis.main(args.toArray(new String[0]));
}
private void importMessages(String address, File input) throws Exception {
@@ -246,22 +236,31 @@ public class MessageSerializerTest extends CliTestBase {
}
@Test
- public void testSendDirectToQueue() throws Exception {
+ public void testSendDirectToMulticastQueue() throws Exception {
+ internalTestSendDirectToQueue(RoutingType.MULTICAST);
+ }
+
+ @Test
+ public void testSendDirectToAnycastQueue() throws Exception {
+ internalTestSendDirectToQueue(RoutingType.ANYCAST);
+ }
+
+ private void internalTestSendDirectToQueue(RoutingType routingType) throws
Exception {
String address = "test";
String queue1Name = "queue1";
String queue2Name = "queue2";
- createQueue("--multicast", address, queue1Name);
- createQueue("--multicast", address, queue2Name);
+ createQueue("--" + routingType.toString().toLowerCase(), address,
queue1Name);
+ createQueue("--" + routingType.toString().toLowerCase(), address,
queue2Name);
try (ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection =
cf.createConnection("admin", "admin");) {
// send messages to queue
Session session = createSession(connection);
- Destination queue1 = session.createQueue(address + "::" + queue1Name);
- Destination queue2 = session.createQueue(address + "::" + queue2Name);
+ Destination queue1 =
session.createQueue(CompositeAddress.toFullyQualified(address, queue1Name));
+ Destination queue2 =
session.createQueue(CompositeAddress.toFullyQualified(address, queue2Name));
MessageConsumer consumer1 = session.createConsumer(queue1);
MessageConsumer consumer2 = session.createConsumer(queue2);
@@ -269,7 +268,7 @@ public class MessageSerializerTest extends CliTestBase {
Artemis.main("producer",
"--user", "admin",
"--password", "admin",
- "--destination", "fqqn://" + address + "::" + queue1Name,
+ "--destination", (routingType == RoutingType.ANYCAST ?
ActiveMQDestination.QUEUE_QUALIFIED_PREFIX :
ActiveMQDestination.TOPIC_QUALIFIED_PREFIX) +
CompositeAddress.toFullyQualified(address, queue1Name),
"--message-count", "5");
assertNull(consumer2.receive(1000));
@@ -281,8 +280,8 @@ public class MessageSerializerTest extends CliTestBase {
public void exportFromFQQN() throws Exception {
String addr = "address";
String queue = "queue";
- String fqqn = addr + "::" + queue;
- String destination = "fqqn://" + fqqn;
+ String fqqn = CompositeAddress.toFullyQualified(addr, queue);
+ String destination = ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + fqqn;
File file = createMessageFile();
@@ -294,7 +293,7 @@ public class MessageSerializerTest extends CliTestBase {
List<Message> messages = generateTextMessages(session, topic);
- exportMessages(destination, file);
+ exportMessages(fqqn, file);
importMessages(destination, file);
checkSentMessages(session, messages, fqqn);
@@ -317,7 +316,7 @@ public class MessageSerializerTest extends CliTestBase {
List<Message> messages = generateTextMessages(session, aAddress);
exportMessages(aAddress, file);
- importMessages("topic://" + mAddress, file);
+ importMessages(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + mAddress,
file);
checkSentMessages(session, messages, queueM1Name);
checkSentMessages(session, messages, queueM2Name);
@@ -329,8 +328,8 @@ public class MessageSerializerTest extends CliTestBase {
String aAddress = "testAnycast";
String queueM1Name = "queueM1";
String queueM2Name = "queueM2";
- String fqqnMulticast1 = mAddress + "::" + queueM1Name;
- String fqqnMulticast2 = mAddress + "::" + queueM2Name;
+ String fqqnMulticast1 = CompositeAddress.toFullyQualified(mAddress,
queueM1Name);
+ String fqqnMulticast2 = CompositeAddress.toFullyQualified(mAddress,
queueM2Name);
File file = createMessageFile();
@@ -342,7 +341,7 @@ public class MessageSerializerTest extends CliTestBase {
List<Message> messages = generateTextMessages(session, aAddress);
exportMessages(aAddress, file);
- importMessages("fqqn://" + fqqnMulticast1, file);
+ importMessages(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX +
fqqnMulticast1, file);
checkSentMessages(session, messages, fqqnMulticast1);
@@ -359,13 +358,13 @@ public class MessageSerializerTest extends CliTestBase {
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
createBothTypeAddress(address);
- exportMessages("topic://" + address, 0, true, clientId, file);
+ exportMessages(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + address, 0,
true, clientId, file);
connection.start();
List<Message> messages = generateTextMessages(session,
getTopicDestination(address));
- exportMessages("topic://" + address, TEST_MESSAGE_COUNT, true, clientId,
file);
+ exportMessages(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + address,
TEST_MESSAGE_COUNT, true, clientId, file);
importMessages(address, file);
@@ -390,32 +389,11 @@ public class MessageSerializerTest extends CliTestBase {
exportMessages(address, file);
- importMessages("topic://" + address, file);
+ importMessages(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + address,
file);
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
TextMessage messageReceived = (TextMessage) subscriber.receive(1000);
+ assertNotNull(messageReceived);
assertEquals(((TextMessage) messages.get(i)).getText(),
messageReceived.getText());
}
}
-
- //read individual lines from byteStream
- private ArrayList<String> getOutputLines(TestActionContext context, boolean
errorOutput) throws IOException {
- byte[] bytes;
-
- if (errorOutput) {
- bytes = context.getStdErrBytes();
- } else {
- bytes = context.getStdoutBytes();
- }
- BufferedReader bufferedReader = new BufferedReader(new
InputStreamReader(new ByteArrayInputStream(bytes)));
- ArrayList<String> lines = new ArrayList<>();
-
- String currentLine = bufferedReader.readLine();
- while (currentLine != null) {
- lines.add(currentLine);
- currentLine = bufferedReader.readLine();
- }
-
- return lines;
- }
-
}