Repository: activemq-artemis
Updated Branches:
  refs/heads/master 077a416ee -> c6cba4088


ARTEMIS-144 Producer / Consumer command

https://issues.apache.org/jira/browse/ARTEMIS-144

As I was working through the examples I realized the tool used to consume and 
sending messages
through the console.

this will import such tool to the CLI as users are used to consume and send 
messages.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/cf777ec6
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/cf777ec6
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/cf777ec6

Branch: refs/heads/master
Commit: cf777ec6b69c4ebe872746e8f984dde3836ce0b2
Parents: 077a416
Author: Clebert Suconic <[email protected]>
Authored: Mon Jun 22 16:20:29 2015 -0400
Committer: Clebert Suconic <[email protected]>
Committed: Tue Jun 23 20:40:27 2015 -0400

----------------------------------------------------------------------
 .../apache/activemq/artemis/cli/Artemis.java    |   6 +-
 .../activemq/artemis/cli/commands/Consumer.java |  88 ++++
 .../activemq/artemis/cli/commands/Create.java   |   7 +-
 .../artemis/cli/commands/DestAbstract.java      |  51 +++
 .../activemq/artemis/cli/commands/Producer.java |  92 ++++
 .../activemq/artemis/cli/commands/Run.java      |  23 +-
 .../cli/commands/util/ConsumerThread.java       | 268 ++++++++++++
 .../cli/commands/util/ProducerThread.java       | 431 +++++++++++++++++++
 .../activemq/artemis/cli/commands/util/demo.txt |  15 +
 .../activemq/artemis/test/ArtemisTest.java      |  41 ++
 artemis-distribution/src/main/assembly/dep.xml  |   1 +
 .../artemis/jms/client/ActiveMQDestination.java |  56 ++-
 .../artemis/jms/client/ActiveMQQueue.java       |   5 +
 .../artemis/jms/client/ActiveMQTopic.java       |   7 +-
 14 files changed, 1083 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java
----------------------------------------------------------------------
diff --git 
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java
index 137d36c..636d729 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java
@@ -22,9 +22,11 @@ import java.io.OutputStream;
 import io.airlift.airline.Cli;
 import org.apache.activemq.artemis.cli.commands.Action;
 import org.apache.activemq.artemis.cli.commands.ActionContext;
+import org.apache.activemq.artemis.cli.commands.Consumer;
 import org.apache.activemq.artemis.cli.commands.Create;
 import org.apache.activemq.artemis.cli.commands.HelpAction;
 import org.apache.activemq.artemis.cli.commands.Kill;
+import org.apache.activemq.artemis.cli.commands.Producer;
 import org.apache.activemq.artemis.cli.commands.Run;
 import org.apache.activemq.artemis.cli.commands.Stop;
 import org.apache.activemq.artemis.cli.commands.tools.DecodeJournal;
@@ -37,12 +39,14 @@ import 
org.apache.activemq.artemis.cli.commands.tools.XmlDataImporter;
 public class Artemis
 {
    @SuppressWarnings("unchecked")
-   public static void main(String[] args) throws Exception
+   public static void main(String...args) throws Exception
    {
       String instance = System.getProperty("artemis.instance");
       Cli.CliBuilder<Action> builder = Cli.<Action>builder("artemis")
          .withDescription("ActiveMQ Artemis Command Line")
          .withCommand(HelpAction.class)
+         .withCommand(Producer.class)
+         .withCommand(Consumer.class)
          .withDefaultCommand(HelpAction.class);
 
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Consumer.java
----------------------------------------------------------------------
diff --git 
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Consumer.java
 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Consumer.java
new file mode 100644
index 0000000..96b8455
--- /dev/null
+++ 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Consumer.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.cli.commands;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Session;
+
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+import org.apache.activemq.artemis.cli.commands.util.ConsumerThread;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+
+@Command(name = "consume", description = "It will send consume messages from 
an instance")
+public class Consumer extends DestAbstract
+{
+
+
+   @Option(name = "--durable", description = "It will use durable subscription 
in case of client")
+   boolean durable = false;
+
+   @Option(name = "--breakOnNull", description = "It will break on null 
messages")
+   boolean breakOnNull = false;
+
+   @Option(name = "--receiveTimeout", description = "Time used on 
receive(timeout)")
+   int receiveTimeout;
+
+   @Override
+   public Object execute(ActionContext context) throws Exception
+   {
+      super.execute(context);
+
+      ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(brokerURL, user, password);
+
+      Destination dest = 
ActiveMQDestination.createDestination(this.destination, 
ActiveMQDestination.QUEUE_TYPE);
+      try (Connection connection = factory.createConnection())
+      {
+         ConsumerThread[] threadsArray = new ConsumerThread[threads];
+         for (int i = 0; i < threads; i++)
+         {
+            Session session;
+            if (txBatchSize > 0)
+            {
+               session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            }
+            else
+            {
+               session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            }
+            threadsArray[i] = new ConsumerThread(session, dest, i);
+
+            
threadsArray[i].setVerbose(verbose).setSleep(sleep).setDurable(durable).setBatchSize(txBatchSize).setBreakOnNull(breakOnNull)
+                          
.setMessageCount(messageCount).setReceiveTimeOut(receiveTimeout);
+         }
+
+         for (ConsumerThread thread : threadsArray)
+         {
+            thread.start();
+         }
+
+         connection.start();
+
+         for (ConsumerThread thread : threadsArray)
+         {
+            thread.join();
+         }
+      }
+
+      return null;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
----------------------------------------------------------------------
diff --git 
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
index 8d5f87e..182b662 100644
--- 
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
+++ 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
@@ -248,7 +248,12 @@ public class Create extends InputAbstract
    {
       if (home == null)
       {
-         home = new File(System.getProperty("artemis.home"));
+         String homeStr = System.getProperty("artemis.home");
+         if (homeStr == null)
+         {
+            homeStr = ".";
+         }
+         home = new File(homeStr);
       }
       return home;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/DestAbstract.java
----------------------------------------------------------------------
diff --git 
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/DestAbstract.java
 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/DestAbstract.java
new file mode 100644
index 0000000..b54e7b8
--- /dev/null
+++ 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/DestAbstract.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.cli.commands;
+
+   import io.airlift.airline.Option;
+
+public class DestAbstract extends ActionAbstract
+{
+   @Option(name = "--url", description = "URL towards the broker. (default: 
tcp://localhost:61616)")
+   String brokerURL = "tcp://localhost:61616";
+
+   @Option(name = "--destination", description = "Destination to be used. it 
could be prefixed with queue:// or topic:: (Default: queue://TEST")
+   String destination = "queue://TEST";
+
+   @Option(name = "--messageCount", description = "Number of messages to act 
on (Default: 1000)")
+   int messageCount = 1000;
+
+   @Option(name = "--user", description = "User used to connect")
+   String user;
+
+   @Option(name = "--password", description = "Password used to connect")
+   String password;
+
+   @Option(name = "--verbose", description = "It will print messages 
individually")
+   boolean verbose;
+
+   @Option(name = "--sleep", description = "Time wait between each message")
+   int sleep = 0;
+
+   @Option(name = "--txSize", description = "TX Batch Size")
+   int txBatchSize;
+
+   @Option(name = "--threads", description = "Number of Threads to be used 
(Default: 1)")
+   int threads = 1;
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Producer.java
----------------------------------------------------------------------
diff --git 
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Producer.java
 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Producer.java
new file mode 100644
index 0000000..e55e0b8
--- /dev/null
+++ 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Producer.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.cli.commands;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Session;
+
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+import org.apache.activemq.artemis.cli.commands.util.ProducerThread;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+
+@Command(name = "produce", description = "It will send messages to an 
instance")
+public class Producer extends DestAbstract
+{
+
+   @Option(name = "--nonPersistent", description = "It will send messages non 
persistently")
+   boolean nonpersistent = false;
+
+   @Option(name = "--messageSize", description = "Size of each byteMessage 
(The producer will use byte message on this case)")
+   int messageSize = 0;
+
+   @Option(name = "--textSize", description = "Size of each textNessage (The 
producer will use text message on this case)")
+   int textMessageSize;
+
+   @Option(name = "--msgttl", description = "TTL for each message")
+   long msgTTL = 0L;
+
+   @Option(name = "--group", description = "Message Group to be used")
+   String msgGroupID = null;
+
+   @Override
+   public Object execute(ActionContext context) throws Exception
+   {
+      super.execute(context);
+
+      ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(brokerURL, user, password);
+
+      Destination dest = 
ActiveMQDestination.createDestination(this.destination, 
ActiveMQDestination.QUEUE_TYPE);
+      try (Connection connection = factory.createConnection())
+      {
+         ProducerThread[] threadsArray = new ProducerThread[threads];
+         for (int i = 0; i < threads; i++)
+         {
+            Session session;
+            if (txBatchSize > 0)
+            {
+               session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            }
+            else
+            {
+               session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            }
+            threadsArray[i] = new ProducerThread(session, dest, i);
+
+            
threadsArray[i].setVerbose(verbose).setSleep(sleep).setPersistent(!nonpersistent).
+                            
setMessageSize(messageSize).setTextMessageSize(textMessageSize).
+                            
setMsgTTL(msgTTL).setMsgGroupID(msgGroupID).setTransactionBatchSize(txBatchSize);
+         }
+
+         for (ProducerThread thread : threadsArray)
+         {
+            thread.start();
+         }
+
+         for (ProducerThread thread : threadsArray)
+         {
+            thread.join();
+         }
+      }
+
+      return null;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Run.java
----------------------------------------------------------------------
diff --git 
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Run.java 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Run.java
index 8679600..e9b7705 100644
--- 
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Run.java
+++ 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Run.java
@@ -34,6 +34,7 @@ import 
org.apache.activemq.artemis.factory.SecurityManagerFactory;
 import org.apache.activemq.artemis.integration.Broker;
 import 
org.apache.activemq.artemis.integration.bootstrap.ActiveMQBootstrapLogger;
 import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
+import org.apache.activemq.artemis.utils.ReusableLatch;
 
 @Command(name = "run", description = "runs the broker instance")
 public class Run extends Configurable
@@ -41,6 +42,20 @@ public class Run extends Configurable
    @Option(name = "--allow-kill", description = "This will allow the server to 
kill itself. Useful for tests (failover tests for instance)")
    boolean allowKill;
 
+   static boolean embedded = false;
+
+   public static final ReusableLatch latchRunning = new ReusableLatch(0);
+
+   /**
+    * This will disable the System.exit at the end of the server.stop, as that 
means there are other things
+    * happening on the same VM.
+    * @param embedded
+    */
+   public static void setEmbedded(boolean embedded)
+   {
+      Run.embedded = true;
+   }
+
    private Broker server;
 
    private ArrayList<ActiveMQComponent> components = new ArrayList<>();
@@ -96,6 +111,8 @@ public class Run extends Configurable
     */
    private void addShutdownHook(File configurationDir)
    {
+
+      latchRunning.countUp();
       final File file = new File(configurationDir,"STOP_ME");
       if (file.exists())
       {
@@ -147,7 +164,11 @@ public class Run extends Configurable
                }
                finally
                {
-                  Runtime.getRuntime().exit(0);
+                  latchRunning.countDown();
+                  if (!embedded)
+                  {
+                     Runtime.getRuntime().exit(0);
+                  }
                }
             }
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ConsumerThread.java
----------------------------------------------------------------------
diff --git 
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ConsumerThread.java
 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ConsumerThread.java
new file mode 100644
index 0000000..3efa1af
--- /dev/null
+++ 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ConsumerThread.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.cli.commands.util;
+
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.util.concurrent.CountDownLatch;
+
+public class ConsumerThread extends Thread
+{
+
+   int messageCount = 1000;
+   int receiveTimeOut = 3000;
+   Destination destination;
+   Session session;
+   boolean durable;
+   boolean breakOnNull = true;
+   int sleep;
+   int batchSize;
+   boolean verbose;
+
+   int received = 0;
+   int transactions = 0;
+   boolean running = false;
+   CountDownLatch finished;
+   boolean bytesAsText;
+
+   public ConsumerThread(Session session, Destination destination, int 
threadNr)
+   {
+      super("Consumer " + destination.toString() + ", thread=" + threadNr);
+      this.destination = destination;
+      this.session = session;
+   }
+
+   @Override
+   public void run()
+   {
+      running = true;
+      MessageConsumer consumer = null;
+      String threadName = Thread.currentThread().getName();
+      System.out.println(threadName + " wait until " + messageCount + " 
messages are consumed");
+      try
+      {
+         if (durable && destination instanceof Topic)
+         {
+            consumer = session.createDurableSubscriber((Topic) destination, 
getName());
+         }
+         else
+         {
+            consumer = session.createConsumer(destination);
+         }
+         while (running && received < messageCount)
+         {
+            Message msg = consumer.receive(receiveTimeOut);
+            if (msg != null)
+            {
+               System.out.println(threadName + " Received " + (msg instanceof 
TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
+               if (bytesAsText && (msg instanceof BytesMessage))
+               {
+                  long length = ((BytesMessage) msg).getBodyLength();
+                  byte[] bytes = new byte[(int) length];
+                  ((BytesMessage) msg).readBytes(bytes);
+                  System.out.println("Message:" + msg);
+               }
+               received++;
+            }
+            else
+            {
+               if (breakOnNull)
+               {
+                  break;
+               }
+            }
+
+            if (session.getTransacted())
+            {
+               if (batchSize > 0 && received > 0 && received % batchSize == 0)
+               {
+                  System.out.println(threadName + " Committing transaction: " 
+ transactions++);
+                  session.commit();
+               }
+            }
+            else if (session.getAcknowledgeMode() == 
Session.CLIENT_ACKNOWLEDGE)
+            {
+               if (batchSize > 0 && received > 0 && received % batchSize == 0)
+               {
+                  System.out.println("Acknowledging last " + batchSize + " 
messages; messages so far = " + received);
+                  msg.acknowledge();
+               }
+            }
+            if (sleep > 0)
+            {
+               Thread.sleep(sleep);
+            }
+
+         }
+
+         try
+         {
+            session.commit();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+      finally
+      {
+         if (finished != null)
+         {
+            finished.countDown();
+         }
+         if (consumer != null)
+         {
+            System.out.println(threadName + " Consumed: " + this.getReceived() 
+ " messages");
+            try
+            {
+               consumer.close();
+            }
+            catch (JMSException e)
+            {
+               e.printStackTrace();
+            }
+         }
+      }
+
+      System.out.println(threadName + " Consumer thread finished");
+   }
+
+   public int getReceived()
+   {
+      return received;
+   }
+
+   public boolean isDurable()
+   {
+      return durable;
+   }
+
+   public ConsumerThread setDurable(boolean durable)
+   {
+      this.durable = durable;
+      return this;
+   }
+
+   public ConsumerThread setMessageCount(int messageCount)
+   {
+      this.messageCount = messageCount;
+      return this;
+   }
+
+   public ConsumerThread setBreakOnNull(boolean breakOnNull)
+   {
+      this.breakOnNull = breakOnNull;
+      return this;
+   }
+
+   public int getBatchSize()
+   {
+      return batchSize;
+   }
+
+   public ConsumerThread setBatchSize(int batchSize)
+   {
+      this.batchSize = batchSize;
+      return this;
+   }
+
+   public int getMessageCount()
+   {
+      return messageCount;
+   }
+
+   public boolean isBreakOnNull()
+   {
+      return breakOnNull;
+   }
+
+   public int getReceiveTimeOut()
+   {
+      return receiveTimeOut;
+   }
+
+   public ConsumerThread setReceiveTimeOut(int receiveTimeOut)
+   {
+      this.receiveTimeOut = receiveTimeOut;
+      return this;
+   }
+
+   public boolean isRunning()
+   {
+      return running;
+   }
+
+   public ConsumerThread setRunning(boolean running)
+   {
+      this.running = running;
+      return this;
+   }
+
+   public int getSleep()
+   {
+      return sleep;
+   }
+
+   public ConsumerThread setSleep(int sleep)
+   {
+      this.sleep = sleep;
+      return this;
+   }
+
+   public CountDownLatch getFinished()
+   {
+      return finished;
+   }
+
+   public ConsumerThread setFinished(CountDownLatch finished)
+   {
+      this.finished = finished;
+      return this;
+   }
+
+   public boolean isBytesAsText()
+   {
+      return bytesAsText;
+   }
+
+   public boolean isVerbose()
+   {
+      return verbose;
+   }
+
+   public ConsumerThread setVerbose(boolean verbose)
+   {
+      this.verbose = verbose;
+      return this;
+   }
+
+   public ConsumerThread setBytesAsText(boolean bytesAsText)
+   {
+      this.bytesAsText = bytesAsText;
+      return this;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ProducerThread.java
----------------------------------------------------------------------
diff --git 
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ProducerThread.java
 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ProducerThread.java
new file mode 100644
index 0000000..820ebbd
--- /dev/null
+++ 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ProducerThread.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.cli.commands.util;
+
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.utils.ReusableLatch;
+
+public class ProducerThread extends Thread
+{
+   protected final Session session;
+
+   boolean verbose;
+   int messageCount = 1000;
+   boolean runIndefinitely = false;
+   Destination destination;
+   int sleep = 0;
+   boolean persistent = true;
+   int messageSize = 0;
+   int textMessageSize;
+   long msgTTL = 0L;
+   String msgGroupID = null;
+   int transactionBatchSize;
+
+   int transactions = 0;
+   final AtomicInteger sentCount = new AtomicInteger(0);
+   String message;
+   String messageText = null;
+   String payloadUrl = null;
+   byte[] payload = null;
+   boolean running = false;
+   final ReusableLatch finished = new ReusableLatch(1);
+   final ReusableLatch paused = new ReusableLatch(0);
+
+
+   public ProducerThread(Session session, Destination destination, int 
threadNr)
+   {
+      super("Producer " + destination.toString() + ", thread=" + threadNr);
+      this.destination = destination;
+      this.session = session;
+   }
+
+   public void run()
+   {
+      MessageProducer producer = null;
+      String threadName = Thread.currentThread().getName();
+      try
+      {
+         producer = session.createProducer(destination);
+         producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : 
DeliveryMode.NON_PERSISTENT);
+         producer.setTimeToLive(msgTTL);
+         initPayLoad();
+         running = true;
+
+         System.out.println(threadName + " Started to calculate elapsed time 
...\n");
+         long tStart = System.currentTimeMillis();
+
+         if (runIndefinitely)
+         {
+            while (running)
+            {
+               paused.await();
+               sendMessage(producer, threadName);
+               sentCount.incrementAndGet();
+            }
+         }
+         else
+         {
+            for (sentCount.set(0); sentCount.get() < messageCount && running; 
sentCount.incrementAndGet())
+            {
+               paused.await();
+               sendMessage(producer, threadName);
+            }
+         }
+
+         try
+         {
+            session.commit();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         System.out.println(threadName + " Produced: " + this.getSentCount() + 
" messages");
+         long tEnd = System.currentTimeMillis();
+         long elapsed = (tEnd - tStart) / 1000;
+         System.out.println(threadName + " Elapsed time in second : " + 
elapsed + " s");
+         System.out.println(threadName + " Elapsed time in milli second : " + 
(tEnd - tStart) + " milli seconds");
+
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+      finally
+      {
+         if (finished != null)
+         {
+            finished.countDown();
+         }
+         if (producer != null)
+         {
+            try
+            {
+               producer.close();
+            }
+            catch (JMSException e)
+            {
+               e.printStackTrace();
+            }
+         }
+      }
+   }
+
+   private void sendMessage(MessageProducer producer, String threadName) 
throws Exception
+   {
+      Message message = createMessage(sentCount.get(), threadName);
+      producer.send(message);
+      if (verbose)
+      {
+         System.out.println(threadName + " Sent: " + (message instanceof 
TextMessage ? ((TextMessage) message).getText() : message.getJMSMessageID()));
+      }
+
+      if (transactionBatchSize > 0 && sentCount.get() > 0 && sentCount.get() % 
transactionBatchSize == 0)
+      {
+         System.out.println(threadName + " Committing transaction: " + 
transactions++);
+         session.commit();
+      }
+
+      if (sleep > 0)
+      {
+         Thread.sleep(sleep);
+      }
+   }
+
+   private void initPayLoad()
+   {
+      if (messageSize > 0)
+      {
+         payload = new byte[messageSize];
+         for (int i = 0; i < payload.length; i++)
+         {
+            payload[i] = '.';
+         }
+      }
+   }
+
+   protected Message createMessage(int i, String threadName) throws Exception
+   {
+      Message answer;
+      if (payload != null)
+      {
+         answer = session.createBytesMessage();
+         ((BytesMessage) answer).writeBytes(payload);
+      }
+      else
+      {
+         if (textMessageSize > 0)
+         {
+            if (messageText == null)
+            {
+               messageText = 
readInputStream(getClass().getResourceAsStream("demo.txt"), textMessageSize, i);
+            }
+         }
+         else if (payloadUrl != null)
+         {
+            messageText = readInputStream(new URL(payloadUrl).openStream(), 
-1, i);
+         }
+         else if (message != null)
+         {
+            messageText = message;
+         }
+         else
+         {
+            messageText = createDefaultMessage(i);
+         }
+         answer = session.createTextMessage(messageText);
+      }
+      if ((msgGroupID != null) && (!msgGroupID.isEmpty()))
+      {
+         answer.setStringProperty("JMSXGroupID", msgGroupID);
+      }
+
+      answer.setIntProperty("count", i);
+      answer.setStringProperty("ThreadSent", threadName);
+      return answer;
+   }
+
+   private String readInputStream(InputStream is, int size, int messageNumber) 
throws IOException
+   {
+      InputStreamReader reader = new InputStreamReader(is);
+      try
+      {
+         char[] buffer;
+         if (size > 0)
+         {
+            buffer = new char[size];
+         }
+         else
+         {
+            buffer = new char[1024];
+         }
+         int count;
+         StringBuilder builder = new StringBuilder();
+         while ((count = reader.read(buffer)) != -1)
+         {
+            builder.append(buffer, 0, count);
+            if (size > 0) break;
+         }
+         return builder.toString();
+      }
+      catch (IOException ioe)
+      {
+         return createDefaultMessage(messageNumber);
+      }
+      finally
+      {
+         reader.close();
+      }
+   }
+
+   private String createDefaultMessage(int messageNumber)
+   {
+      return "test message: " + messageNumber;
+   }
+
+   public ProducerThread setMessageCount(int messageCount)
+   {
+      this.messageCount = messageCount;
+      return this;
+   }
+
+   public int getSleep()
+   {
+      return sleep;
+   }
+
+   public ProducerThread setSleep(int sleep)
+   {
+      this.sleep = sleep;
+      return this;
+   }
+
+   public int getMessageCount()
+   {
+      return messageCount;
+   }
+
+   public int getSentCount()
+   {
+      return sentCount.get();
+   }
+
+   public boolean isPersistent()
+   {
+      return persistent;
+   }
+
+   public ProducerThread setPersistent(boolean persistent)
+   {
+      this.persistent = persistent;
+      return this;
+   }
+
+   public boolean isRunning()
+   {
+      return running;
+   }
+
+   public ProducerThread setRunning(boolean running)
+   {
+      this.running = running;
+      return this;
+   }
+
+   public long getMsgTTL()
+   {
+      return msgTTL;
+   }
+
+   public ProducerThread setMsgTTL(long msgTTL)
+   {
+      this.msgTTL = msgTTL;
+      return this;
+   }
+
+   public int getTransactionBatchSize()
+   {
+      return transactionBatchSize;
+   }
+
+   public ProducerThread setTransactionBatchSize(int transactionBatchSize)
+   {
+      this.transactionBatchSize = transactionBatchSize;
+      return this;
+   }
+
+   public String getMsgGroupID()
+   {
+      return msgGroupID;
+   }
+
+   public ProducerThread setMsgGroupID(String msgGroupID)
+   {
+      this.msgGroupID = msgGroupID;
+      return this;
+   }
+
+   public int getTextMessageSize()
+   {
+      return textMessageSize;
+   }
+
+   public ProducerThread setTextMessageSize(int textMessageSize)
+   {
+      this.textMessageSize = textMessageSize;
+      return this;
+   }
+
+   public int getMessageSize()
+   {
+      return messageSize;
+   }
+
+   public ProducerThread setMessageSize(int messageSize)
+   {
+      this.messageSize = messageSize;
+      return this;
+   }
+
+   public ReusableLatch getFinished()
+   {
+      return finished;
+   }
+
+   public ProducerThread setFinished(int value)
+   {
+      finished.setCount(value);
+      return this;
+   }
+
+   public String getPayloadUrl()
+   {
+      return payloadUrl;
+   }
+
+   public ProducerThread setPayloadUrl(String payloadUrl)
+   {
+      this.payloadUrl = payloadUrl;
+      return this;
+   }
+
+   public String getMessage()
+   {
+      return message;
+   }
+
+   public ProducerThread setMessage(String message)
+   {
+      this.message = message;
+      return this;
+   }
+
+   public boolean isRunIndefinitely()
+   {
+      return runIndefinitely;
+   }
+
+   public ProducerThread setRunIndefinitely(boolean runIndefinitely)
+   {
+      this.runIndefinitely = runIndefinitely;
+      return this;
+   }
+
+   public ProducerThread pauseProducer()
+   {
+      this.paused.countUp();
+      return this;
+   }
+
+   public ProducerThread resumeProducer()
+   {
+      this.paused.countDown();
+      return this;
+   }
+
+   public ProducerThread resetCounters()
+   {
+      this.sentCount.set(0);
+      return this;
+   }
+
+
+   public boolean isVerbose()
+   {
+      return verbose;
+   }
+
+   public ProducerThread setVerbose(boolean verbose)
+   {
+      this.verbose = verbose;
+      return this;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/util/demo.txt
----------------------------------------------------------------------
diff --git 
a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/util/demo.txt
 
b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/util/demo.txt
new file mode 100644
index 0000000..4a6002e
--- /dev/null
+++ 
b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/util/demo.txt
@@ -0,0 +1,15 @@
+Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy 
eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam 
voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita 
kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem 
ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod 
tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At 
vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd 
gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum 
dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor 
invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero 
eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no 
sea takimata sanctus est Lorem ipsum dolor sit amet.
+Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie 
consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan 
et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis 
dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer 
adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore 
magna aliquam erat volutpat.
+Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit 
lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure 
dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore 
eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui 
blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla 
facilisi.
+Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet 
doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, 
consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut 
laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis 
nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea 
commodo consequat.
+Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie 
consequat, vel illum dolore eu feugiat nulla facilisis.
+At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd 
gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum 
dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor 
invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero 
eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no 
sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit 
amet, consetetur sadipscing elitr, At accusam aliquyam diam diam dolore dolores 
duo eirmod eos erat, et nonumy sed tempor et et invidunt justo labore Stet 
clita ea et gubergren, kasd magna no rebum. sanctus sea sed takimata ut vero 
voluptua. est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, 
consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore 
et dolore magna aliquyam erat.
+Consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore 
et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et 
justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata 
sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur 
sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore 
magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo 
dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est 
Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing 
elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna 
aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores 
et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus. Lorem ipsum 
dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor 
invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero 
eos 
 et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea 
takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, 
consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore 
et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et 
justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata 
sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur 
sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore 
magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo 
dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est 
Lorem ipsum dolor sit amet.
+Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie 
consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan 
et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis 
dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer 
adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore 
magna aliquam erat volutpat.
+Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit 
lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure 
dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore 
eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui 
blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla 
facilisi.
+Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet 
doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, 
consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut 
laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis 
nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea 
commodo consequat.Duis autem vel eum iriure dolor in hendrerit in vulputate 
velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis.
+At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd 
gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum 
dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor 
invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero 
eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no 
sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit 
amet, consetetur sadipscing elitr, At accusam aliquyam diam diam dolore dolores 
duo eirmod eos erat, et nonumy sed tempor et et invidunt justo labore Stet 
clita ea et gubergren, kasd magna no rebum. sanctus sea sed takimata ut vero 
voluptua. est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, 
consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore 
et dolore magna aliquyam erat.
+Consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore 
et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et 
justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata 
sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur 
sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore 
magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo 
dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est 
Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing 
elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna 
aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores 
et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus. Lorem ipsum 
dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor 
invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero 
eos 
 et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea 
takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, 
consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore 
et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et 
justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata 
sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur 
sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore 
magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo 
dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est 
Lorem ipsum dolor sit amet.
+Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie 
consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan 
et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis 
dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer 
adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore 
magna aliquam erat volutpat.
+Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit 
lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure 
dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore 
eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui 
blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla 
facilisi.
+Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet 
doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, 
consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut 
laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis 
nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea 
commodo consequat.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-cli/src/test/java/org/apache/activemq/artemis/test/ArtemisTest.java
----------------------------------------------------------------------
diff --git 
a/artemis-cli/src/test/java/org/apache/activemq/artemis/test/ArtemisTest.java 
b/artemis-cli/src/test/java/org/apache/activemq/artemis/test/ArtemisTest.java
index e38ed14..7d00328 100644
--- 
a/artemis-cli/src/test/java/org/apache/activemq/artemis/test/ArtemisTest.java
+++ 
b/artemis-cli/src/test/java/org/apache/activemq/artemis/test/ArtemisTest.java
@@ -16,15 +16,40 @@
  */
 package org.apache.activemq.artemis.test;
 
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.activemq.artemis.cli.Artemis;
+import org.apache.activemq.artemis.cli.commands.Run;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 /**
  * Test to validate that the CLI doesn't throw improper exceptions when 
invoked.
  */
 public class ArtemisTest
 {
+   @Rule
+   public TemporaryFolder temporaryFolder;
+
+   public ArtemisTest()
+   {
+      File parent = new File("./target/tmp");
+      parent.mkdirs();
+      temporaryFolder = new TemporaryFolder(parent);
+   }
+
+
+   @After
+   public void cleanup()
+   {
+      System.clearProperty("artemis.instance");
+      Run.setEmbedded(false);
+   }
+
    @Test
    public void invalidCliDoesntThrowException()
    {
@@ -37,6 +62,22 @@ public class ArtemisTest
       testCli("create","/rawr");
    }
 
+   @Test
+   public void testSimpleRun() throws Exception
+   {
+      Run.setEmbedded(true);
+      Artemis.main("create", temporaryFolder.getRoot().getAbsolutePath(), 
"--force", "--silent-input", "--no-web");
+      System.setProperty("artemis.instance", 
temporaryFolder.getRoot().getAbsolutePath());
+      // Some exceptions may happen on the initialization, but they should be 
ok on start the basic core protocol
+      Artemis.main("run");
+      Artemis.main("produce", "--txSize", "500");
+      Artemis.main("consume", "--txSize", "500", "--verbose");
+      Artemis.main("stop");
+      Artemis.main("data", "print");
+      Assert.assertTrue(Run.latchRunning.await(5, TimeUnit.SECONDS));
+
+   }
+
    private void testCli(String... args)
    {
       try

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-distribution/src/main/assembly/dep.xml
----------------------------------------------------------------------
diff --git a/artemis-distribution/src/main/assembly/dep.xml 
b/artemis-distribution/src/main/assembly/dep.xml
index 686ed6f..febc8ec 100644
--- a/artemis-distribution/src/main/assembly/dep.xml
+++ b/artemis-distribution/src/main/assembly/dep.xml
@@ -83,6 +83,7 @@
             <include>org.apache.tomcat:tomcat-servlet-api</include>
             <include>commons-beanutils:commons-beanutils</include>
             <include>commons-logging:commons-logging</include>
+            <include>commons-collections:commons-collections</include>
             <include>org.fusesource.hawtbuf:hawtbuf</include>
             <include>org.jgroups:jgroups</include>
          </includes>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
----------------------------------------------------------------------
diff --git 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
index 5be2aa2..2927f2d 100644
--- 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
+++ 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
@@ -16,15 +16,14 @@
  */
 package org.apache.activemq.artemis.jms.client;
 
-import java.io.Serializable;
-import java.util.UUID;
-
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.JMSRuntimeException;
 import javax.naming.NamingException;
 import javax.naming.Reference;
 import javax.naming.Referenceable;
+import java.io.Serializable;
+import java.util.UUID;
 
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -53,6 +52,17 @@ public class ActiveMQDestination implements Destination, 
Serializable, Reference
 
    public static final String JMS_TEMP_TOPIC_ADDRESS_PREFIX = "jms.temptopic.";
 
+   public static final String QUEUE_QUALIFIED_PREFIX = "queue://";
+   public static final String TOPIC_QUALIFIED_PREFIX = "topic://";
+   public static final String TEMP_QUEUE_QUALIFED_PREFIX = "temp-queue://";
+   public static final String TEMP_TOPIC_QUALIFED_PREFIX = "temp-topic://";
+   public static final byte QUEUE_TYPE = 0x01;
+   public static final byte TOPIC_TYPE = 0x02;
+   public static final byte TEMP_MASK = 0x04;
+   public static final byte TEMP_TOPIC_TYPE = TOPIC_TYPE | TEMP_MASK;
+   public static final byte TEMP_QUEUE_TYPE = QUEUE_TYPE | TEMP_MASK;
+
+
    private static final char SEPARATOR = '.';
 
    private static String escape(final String input)
@@ -64,6 +74,44 @@ public class ActiveMQDestination implements Destination, 
Serializable, Reference
       return input.replace("\\", "\\\\").replace(".", "\\.");
    }
 
+   /**
+    * Static helper method for working with destinations.
+    */
+   public static ActiveMQDestination createDestination(String name, byte 
defaultType)
+   {
+      if (name.startsWith(QUEUE_QUALIFIED_PREFIX))
+      {
+         return new 
ActiveMQQueue(name.substring(QUEUE_QUALIFIED_PREFIX.length()));
+      }
+      else if (name.startsWith(TOPIC_QUALIFIED_PREFIX))
+      {
+         return new 
ActiveMQTopic(name.substring(TOPIC_QUALIFIED_PREFIX.length()));
+      }
+      else if (name.startsWith(TEMP_QUEUE_QUALIFED_PREFIX))
+      {
+         return new 
ActiveMQQueue(name.substring(TEMP_QUEUE_QUALIFED_PREFIX.length()), true);
+      }
+      else if (name.startsWith(TEMP_TOPIC_QUALIFED_PREFIX))
+      {
+         return new 
ActiveMQTopic(name.substring(TEMP_TOPIC_QUALIFED_PREFIX.length()), true);
+      }
+
+      switch (defaultType)
+      {
+         case QUEUE_TYPE:
+            return new ActiveMQQueue(name);
+         case TOPIC_TYPE:
+            return new ActiveMQTopic(name);
+         case TEMP_QUEUE_TYPE:
+            return new ActiveMQQueue(name, true);
+         case TEMP_TOPIC_TYPE:
+            return new ActiveMQTopic(name, true);
+         default:
+            throw new IllegalArgumentException("Invalid default destination 
type: " + defaultType);
+      }
+   }
+
+
    public static Destination fromAddress(final String address)
    {
       if (address.startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX))
@@ -356,7 +404,7 @@ public class ActiveMQDestination implements Destination, 
Serializable, Reference
          return false;
       }
 
-      ActiveMQDestination that = (ActiveMQDestination)o;
+      ActiveMQDestination that = (ActiveMQDestination) o;
 
       return address.equals(that.address);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java
----------------------------------------------------------------------
diff --git 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java
 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java
index 9143f34..0fac316 100644
--- 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java
+++ 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java
@@ -46,6 +46,11 @@ public class ActiveMQQueue extends ActiveMQDestination 
implements Queue
       super(JMS_QUEUE_ADDRESS_PREFIX + name, name, false, true, null);
    }
 
+   public ActiveMQQueue(final String name, boolean temporary)
+   {
+      super(JMS_QUEUE_ADDRESS_PREFIX + name, name, temporary, true, null);
+   }
+
 
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java
----------------------------------------------------------------------
diff --git 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java
 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java
index 88ea605..d7a6f38 100644
--- 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java
+++ 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java
@@ -43,7 +43,12 @@ public class ActiveMQTopic extends ActiveMQDestination 
implements Topic
 
    public ActiveMQTopic(final String name)
    {
-      super(JMS_TOPIC_ADDRESS_PREFIX + name, name, false, false, null);
+      this(name, false);
+   }
+
+   public ActiveMQTopic(final String name, boolean temporary)
+   {
+      super(JMS_TOPIC_ADDRESS_PREFIX + name, name, temporary, false, null);
    }
 
 

Reply via email to