Repository: activemq-artemis Updated Branches: refs/heads/master b6a29a5b5 -> 309729bbd
ARTEMIS-1777 Adding Protocol specific into producer / consumer Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c2955af1 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c2955af1 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c2955af1 Branch: refs/heads/master Commit: c2955af16415f1479a002c8238d78410fd27e36b Parents: b6a29a5 Author: Clebert Suconic <[email protected]> Authored: Thu Mar 29 11:37:56 2018 -0400 Committer: Clebert Suconic <[email protected]> Committed: Thu Mar 29 15:21:38 2018 -0400 ---------------------------------------------------------------------- artemis-cli/pom.xml | 6 +++ .../artemis/cli/commands/AbstractAction.java | 4 +- .../artemis/cli/commands/messages/Browse.java | 7 ++- .../commands/messages/ConnectionAbstract.java | 45 +++++++++++++++++++- .../artemis/cli/commands/messages/Consumer.java | 8 ++-- .../cli/commands/messages/ConsumerThread.java | 8 +++- .../cli/commands/messages/DestAbstract.java | 12 ++++++ .../artemis/cli/commands/messages/Producer.java | 7 ++- artemis-distribution/pom.xml | 10 ++++- artemis-distribution/src/main/assembly/dep.xml | 1 + 10 files changed, 90 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2955af1/artemis-cli/pom.xml ---------------------------------------------------------------------- diff --git a/artemis-cli/pom.xml b/artemis-cli/pom.xml index 7870c59..88a48db 100644 --- a/artemis-cli/pom.xml +++ b/artemis-cli/pom.xml @@ -94,6 +94,12 @@ <groupId>org.apache.geronimo.specs</groupId> <artifactId>geronimo-json_1.0_spec</artifactId> </dependency> + <!-- artemis producer and consumer can use amqp as the protocol --> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-jms-client</artifactId> + <version>${qpid.jms.version}</version> + </dependency> <dependency> <groupId>org.jboss.logging</groupId> <artifactId>jboss-logging-annotations</artifactId> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2955af1/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java index 3619ed7..37f08c3 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java @@ -30,8 +30,8 @@ public abstract class AbstractAction extends ConnectionAbstract { public void performCoreManagement(ManagementCallback<ClientMessage> cb) throws Exception { - try (ActiveMQConnectionFactory factory = createConnectionFactory(); - ServerLocator locator = factory.getServerLocator(); + try (ActiveMQConnectionFactory factory = createCoreConnectionFactory(); + ServerLocator locator = factory.getServerLocator(); ClientSessionFactory sessionFactory = locator.createSessionFactory(); ClientSession session = sessionFactory.createSession(user, password, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)) { session.start(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2955af1/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Browse.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Browse.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Browse.java index 9562b59..e249cbf 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Browse.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Browse.java @@ -18,14 +18,13 @@ package org.apache.activemq.artemis.cli.commands.messages; import javax.jms.Connection; +import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.Session; import io.airlift.airline.Command; import io.airlift.airline.Option; import org.apache.activemq.artemis.cli.commands.ActionContext; -import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; -import org.apache.activemq.artemis.jms.client.ActiveMQDestination; @Command(name = "browser", description = "It will browse messages on an instance") public class Browse extends DestAbstract { @@ -39,9 +38,8 @@ public class Browse extends DestAbstract { System.out.println("Consumer:: filter = " + filter); - ActiveMQConnectionFactory factory = createConnectionFactory(); + ConnectionFactory factory = createConnectionFactory(); - Destination dest = ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.TYPE.QUEUE); try (Connection connection = factory.createConnection()) { ConsumerThread[] threadsArray = new ConsumerThread[threads]; for (int i = 0; i < threads; i++) { @@ -51,6 +49,7 @@ public class Browse extends DestAbstract { } else { session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } + Destination dest = lookupDestination(session); threadsArray[i] = new ConsumerThread(session, dest, i); threadsArray[i].setVerbose(verbose).setSleep(sleep).setMessageCount(messageCount).setFilter(filter).setBrowse(true); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2955af1/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConnectionAbstract.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConnectionAbstract.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConnectionAbstract.java index 41443c4..90882e6 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConnectionAbstract.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConnectionAbstract.java @@ -18,12 +18,14 @@ package org.apache.activemq.artemis.cli.commands.messages; import javax.jms.Connection; +import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.JMSSecurityException; import io.airlift.airline.Option; import org.apache.activemq.artemis.cli.commands.InputAbstract; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.qpid.jms.JmsConnectionFactory; public class ConnectionAbstract extends InputAbstract { @@ -39,7 +41,48 @@ public class ConnectionAbstract extends InputAbstract { @Option(name = "--clientID", description = "ClientID to be associated with connection") String clientID; - protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + @Option(name = "--protocol", description = "Protocol used. Valid values are amqp or core. Default=core.") + String protocol = "core"; + + protected ConnectionFactory createConnectionFactory() throws Exception { + if (protocol.equals("core")) { + return createCoreConnectionFactory(); + } else if (protocol.equals("amqp")) { + return createAMQPConnectionFactory(); + } else { + throw new IllegalStateException("protocol " + protocol + " not supported"); + } + } + + private ConnectionFactory createAMQPConnectionFactory() { + if (brokerURL.startsWith("tcp://")) { + // replacing tcp:// by amqp:// + brokerURL = "amqp" + brokerURL.substring(3); + } + JmsConnectionFactory cf = new JmsConnectionFactory(user, password, brokerURL); + if (clientID != null) { + cf.setClientID(clientID); + } + + try { + Connection connection = cf.createConnection(); + connection.close(); + return cf; + } catch (JMSSecurityException e) { + // if a security exception will get the user and password through an input + context.err.println("Connection failed::" + e.getMessage()); + userPassword(); + return new JmsConnectionFactory(user, password, brokerURL); + } catch (JMSException e) { + // if a connection exception will ask for the URL, user and password + context.err.println("Connection failed::" + e.getMessage()); + brokerURL = input("--url", "Type in the broker URL for a retry (e.g. tcp://localhost:61616)", brokerURL); + userPassword(); + return new JmsConnectionFactory(user, password, brokerURL); + } + } + + protected ActiveMQConnectionFactory createCoreConnectionFactory() { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerURL, user, password); if (clientID != null) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2955af1/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java index c58f792..ee15a66 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java @@ -18,14 +18,13 @@ package org.apache.activemq.artemis.cli.commands.messages; import javax.jms.Connection; +import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.Session; import io.airlift.airline.Command; import io.airlift.airline.Option; import org.apache.activemq.artemis.cli.commands.ActionContext; -import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; -import org.apache.activemq.artemis.jms.client.ActiveMQDestination; @Command(name = "consumer", description = "It will consume messages from an instance") public class Consumer extends DestAbstract { @@ -48,9 +47,8 @@ public class Consumer extends DestAbstract { System.out.println("Consumer:: filter = " + filter); - ActiveMQConnectionFactory factory = createConnectionFactory(); + ConnectionFactory factory = createConnectionFactory(); - Destination dest = ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.TYPE.QUEUE); try (Connection connection = factory.createConnection()) { ConsumerThread[] threadsArray = new ConsumerThread[threads]; for (int i = 0; i < threads; i++) { @@ -60,6 +58,7 @@ public class Consumer extends DestAbstract { } else { session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } + Destination dest = lookupDestination(session); threadsArray[i] = new ConsumerThread(session, dest, i); threadsArray[i].setVerbose(verbose).setSleep(sleep).setDurable(durable).setBatchSize(txBatchSize).setBreakOnNull(breakOnNull).setMessageCount(messageCount).setReceiveTimeOut(receiveTimeout).setFilter(filter).setBrowse(false); @@ -82,4 +81,5 @@ public class Consumer extends DestAbstract { } } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2955af1/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java index 7883e58..ecffa34 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java @@ -146,12 +146,16 @@ public class ConsumerThread extends Thread { consumer = session.createConsumer(destination); } } + int count = 0; while (running && received < messageCount) { Message msg = consumer.receive(receiveTimeOut); if (msg != null) { - System.out.println(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID())); if (verbose) { - System.out.println("..." + msg); + System.out.println(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID())); + } else { + if (++count % 1000 == 0) { + System.out.println("Received " + count); + } } if (bytesAsText && (msg instanceof BytesMessage)) { long length = ((BytesMessage) msg).getBodyLength(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2955af1/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java index acf0473..2f4a34c 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java @@ -17,7 +17,11 @@ package org.apache.activemq.artemis.cli.commands.messages; +import javax.jms.Destination; +import javax.jms.Session; + import io.airlift.airline.Option; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; public class DestAbstract extends ConnectionAbstract { @@ -36,4 +40,12 @@ public class DestAbstract extends ConnectionAbstract { @Option(name = "--threads", description = "Number of Threads to be used (Default: 1)") int threads = 1; + protected Destination lookupDestination(Session session) throws Exception { + if (protocol.equals("AMQP")) { + return session.createQueue(destination); + } else { + return ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.TYPE.QUEUE); + } + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2955af1/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java index 3ed4b57..e077fb0 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java @@ -18,14 +18,13 @@ package org.apache.activemq.artemis.cli.commands.messages; import javax.jms.Connection; +import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.Session; import io.airlift.airline.Command; import io.airlift.airline.Option; import org.apache.activemq.artemis.cli.commands.ActionContext; -import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; -import org.apache.activemq.artemis.jms.client.ActiveMQDestination; @Command(name = "producer", description = "It will send messages to an instance") public class Producer extends DestAbstract { @@ -49,9 +48,8 @@ public class Producer extends DestAbstract { public Object execute(ActionContext context) throws Exception { super.execute(context); - ActiveMQConnectionFactory factory = createConnectionFactory(); + ConnectionFactory factory = createConnectionFactory(); - Destination dest = ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.TYPE.QUEUE); try (Connection connection = factory.createConnection()) { ProducerThread[] threadsArray = new ProducerThread[threads]; for (int i = 0; i < threads; i++) { @@ -61,6 +59,7 @@ public class Producer extends DestAbstract { } else { session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } + Destination dest = lookupDestination(session); threadsArray[i] = new ProducerThread(session, dest, i); threadsArray[i].setVerbose(verbose).setSleep(sleep).setPersistent(!nonpersistent). http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2955af1/artemis-distribution/pom.xml ---------------------------------------------------------------------- diff --git a/artemis-distribution/pom.xml b/artemis-distribution/pom.xml index 84116a4..6565430 100644 --- a/artemis-distribution/pom.xml +++ b/artemis-distribution/pom.xml @@ -170,7 +170,15 @@ <artifactId>tomcat-servlet-api</artifactId> </dependency> - <!-- Management Console Dependencies --> + <!-- for artemis cli producer/consumer --> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-jms-client</artifactId> + <version>${qpid.jms.version}</version> + </dependency> + + + <!-- Management Console Dependencies --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>artemis-console</artifactId> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2955af1/artemis-distribution/src/main/assembly/dep.xml ---------------------------------------------------------------------- diff --git a/artemis-distribution/src/main/assembly/dep.xml b/artemis-distribution/src/main/assembly/dep.xml index f809713..aad4ab1 100644 --- a/artemis-distribution/src/main/assembly/dep.xml +++ b/artemis-distribution/src/main/assembly/dep.xml @@ -69,6 +69,7 @@ <include>org.apache.activemq:artemis-service-extensions</include> <include>org.apache.activemq:artemis-web</include> <include>org.apache.activemq.rest:artemis-rest</include> + <include>org.apache.qpid:qpid-jms-client</include> <!-- dependencies --> <include>org.apache.geronimo.specs:geronimo-jms_2.0_spec</include>
