Repository: activemq-artemis Updated Branches: refs/heads/master 077a416ee -> c6cba4088
ARTEMIS-144 Producer / Consumer command https://issues.apache.org/jira/browse/ARTEMIS-144 As I was working through the examples I realized the tool used to consume and sending messages through the console. this will import such tool to the CLI as users are used to consume and send messages. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/cf777ec6 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/cf777ec6 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/cf777ec6 Branch: refs/heads/master Commit: cf777ec6b69c4ebe872746e8f984dde3836ce0b2 Parents: 077a416 Author: Clebert Suconic <[email protected]> Authored: Mon Jun 22 16:20:29 2015 -0400 Committer: Clebert Suconic <[email protected]> Committed: Tue Jun 23 20:40:27 2015 -0400 ---------------------------------------------------------------------- .../apache/activemq/artemis/cli/Artemis.java | 6 +- .../activemq/artemis/cli/commands/Consumer.java | 88 ++++ .../activemq/artemis/cli/commands/Create.java | 7 +- .../artemis/cli/commands/DestAbstract.java | 51 +++ .../activemq/artemis/cli/commands/Producer.java | 92 ++++ .../activemq/artemis/cli/commands/Run.java | 23 +- .../cli/commands/util/ConsumerThread.java | 268 ++++++++++++ .../cli/commands/util/ProducerThread.java | 431 +++++++++++++++++++ .../activemq/artemis/cli/commands/util/demo.txt | 15 + .../activemq/artemis/test/ArtemisTest.java | 41 ++ artemis-distribution/src/main/assembly/dep.xml | 1 + .../artemis/jms/client/ActiveMQDestination.java | 56 ++- .../artemis/jms/client/ActiveMQQueue.java | 5 + .../artemis/jms/client/ActiveMQTopic.java | 7 +- 14 files changed, 1083 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java index 137d36c..636d729 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java @@ -22,9 +22,11 @@ import java.io.OutputStream; import io.airlift.airline.Cli; import org.apache.activemq.artemis.cli.commands.Action; import org.apache.activemq.artemis.cli.commands.ActionContext; +import org.apache.activemq.artemis.cli.commands.Consumer; import org.apache.activemq.artemis.cli.commands.Create; import org.apache.activemq.artemis.cli.commands.HelpAction; import org.apache.activemq.artemis.cli.commands.Kill; +import org.apache.activemq.artemis.cli.commands.Producer; import org.apache.activemq.artemis.cli.commands.Run; import org.apache.activemq.artemis.cli.commands.Stop; import org.apache.activemq.artemis.cli.commands.tools.DecodeJournal; @@ -37,12 +39,14 @@ import org.apache.activemq.artemis.cli.commands.tools.XmlDataImporter; public class Artemis { @SuppressWarnings("unchecked") - public static void main(String[] args) throws Exception + public static void main(String...args) throws Exception { String instance = System.getProperty("artemis.instance"); Cli.CliBuilder<Action> builder = Cli.<Action>builder("artemis") .withDescription("ActiveMQ Artemis Command Line") .withCommand(HelpAction.class) + .withCommand(Producer.class) + .withCommand(Consumer.class) .withDefaultCommand(HelpAction.class); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Consumer.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Consumer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Consumer.java new file mode 100644 index 0000000..96b8455 --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Consumer.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.cli.commands; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Session; + +import io.airlift.airline.Command; +import io.airlift.airline.Option; +import org.apache.activemq.artemis.cli.commands.util.ConsumerThread; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; + +@Command(name = "consume", description = "It will send consume messages from an instance") +public class Consumer extends DestAbstract +{ + + + @Option(name = "--durable", description = "It will use durable subscription in case of client") + boolean durable = false; + + @Option(name = "--breakOnNull", description = "It will break on null messages") + boolean breakOnNull = false; + + @Option(name = "--receiveTimeout", description = "Time used on receive(timeout)") + int receiveTimeout; + + @Override + public Object execute(ActionContext context) throws Exception + { + super.execute(context); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL, user, password); + + Destination dest = ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.QUEUE_TYPE); + try (Connection connection = factory.createConnection()) + { + ConsumerThread[] threadsArray = new ConsumerThread[threads]; + for (int i = 0; i < threads; i++) + { + Session session; + if (txBatchSize > 0) + { + session = connection.createSession(true, Session.SESSION_TRANSACTED); + } + else + { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + threadsArray[i] = new ConsumerThread(session, dest, i); + + threadsArray[i].setVerbose(verbose).setSleep(sleep).setDurable(durable).setBatchSize(txBatchSize).setBreakOnNull(breakOnNull) + .setMessageCount(messageCount).setReceiveTimeOut(receiveTimeout); + } + + for (ConsumerThread thread : threadsArray) + { + thread.start(); + } + + connection.start(); + + for (ConsumerThread thread : threadsArray) + { + thread.join(); + } + } + + return null; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java index 8d5f87e..182b662 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java @@ -248,7 +248,12 @@ public class Create extends InputAbstract { if (home == null) { - home = new File(System.getProperty("artemis.home")); + String homeStr = System.getProperty("artemis.home"); + if (homeStr == null) + { + homeStr = "."; + } + home = new File(homeStr); } return home; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/DestAbstract.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/DestAbstract.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/DestAbstract.java new file mode 100644 index 0000000..b54e7b8 --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/DestAbstract.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.cli.commands; + + import io.airlift.airline.Option; + +public class DestAbstract extends ActionAbstract +{ + @Option(name = "--url", description = "URL towards the broker. (default: tcp://localhost:61616)") + String brokerURL = "tcp://localhost:61616"; + + @Option(name = "--destination", description = "Destination to be used. it could be prefixed with queue:// or topic:: (Default: queue://TEST") + String destination = "queue://TEST"; + + @Option(name = "--messageCount", description = "Number of messages to act on (Default: 1000)") + int messageCount = 1000; + + @Option(name = "--user", description = "User used to connect") + String user; + + @Option(name = "--password", description = "Password used to connect") + String password; + + @Option(name = "--verbose", description = "It will print messages individually") + boolean verbose; + + @Option(name = "--sleep", description = "Time wait between each message") + int sleep = 0; + + @Option(name = "--txSize", description = "TX Batch Size") + int txBatchSize; + + @Option(name = "--threads", description = "Number of Threads to be used (Default: 1)") + int threads = 1; + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Producer.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Producer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Producer.java new file mode 100644 index 0000000..e55e0b8 --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Producer.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.cli.commands; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Session; + +import io.airlift.airline.Command; +import io.airlift.airline.Option; +import org.apache.activemq.artemis.cli.commands.util.ProducerThread; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; + +@Command(name = "produce", description = "It will send messages to an instance") +public class Producer extends DestAbstract +{ + + @Option(name = "--nonPersistent", description = "It will send messages non persistently") + boolean nonpersistent = false; + + @Option(name = "--messageSize", description = "Size of each byteMessage (The producer will use byte message on this case)") + int messageSize = 0; + + @Option(name = "--textSize", description = "Size of each textNessage (The producer will use text message on this case)") + int textMessageSize; + + @Option(name = "--msgttl", description = "TTL for each message") + long msgTTL = 0L; + + @Option(name = "--group", description = "Message Group to be used") + String msgGroupID = null; + + @Override + public Object execute(ActionContext context) throws Exception + { + super.execute(context); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL, user, password); + + Destination dest = ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.QUEUE_TYPE); + try (Connection connection = factory.createConnection()) + { + ProducerThread[] threadsArray = new ProducerThread[threads]; + for (int i = 0; i < threads; i++) + { + Session session; + if (txBatchSize > 0) + { + session = connection.createSession(true, Session.SESSION_TRANSACTED); + } + else + { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + threadsArray[i] = new ProducerThread(session, dest, i); + + threadsArray[i].setVerbose(verbose).setSleep(sleep).setPersistent(!nonpersistent). + setMessageSize(messageSize).setTextMessageSize(textMessageSize). + setMsgTTL(msgTTL).setMsgGroupID(msgGroupID).setTransactionBatchSize(txBatchSize); + } + + for (ProducerThread thread : threadsArray) + { + thread.start(); + } + + for (ProducerThread thread : threadsArray) + { + thread.join(); + } + } + + return null; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Run.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Run.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Run.java index 8679600..e9b7705 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Run.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Run.java @@ -34,6 +34,7 @@ import org.apache.activemq.artemis.factory.SecurityManagerFactory; import org.apache.activemq.artemis.integration.Broker; import org.apache.activemq.artemis.integration.bootstrap.ActiveMQBootstrapLogger; import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager; +import org.apache.activemq.artemis.utils.ReusableLatch; @Command(name = "run", description = "runs the broker instance") public class Run extends Configurable @@ -41,6 +42,20 @@ public class Run extends Configurable @Option(name = "--allow-kill", description = "This will allow the server to kill itself. Useful for tests (failover tests for instance)") boolean allowKill; + static boolean embedded = false; + + public static final ReusableLatch latchRunning = new ReusableLatch(0); + + /** + * This will disable the System.exit at the end of the server.stop, as that means there are other things + * happening on the same VM. + * @param embedded + */ + public static void setEmbedded(boolean embedded) + { + Run.embedded = true; + } + private Broker server; private ArrayList<ActiveMQComponent> components = new ArrayList<>(); @@ -96,6 +111,8 @@ public class Run extends Configurable */ private void addShutdownHook(File configurationDir) { + + latchRunning.countUp(); final File file = new File(configurationDir,"STOP_ME"); if (file.exists()) { @@ -147,7 +164,11 @@ public class Run extends Configurable } finally { - Runtime.getRuntime().exit(0); + latchRunning.countDown(); + if (!embedded) + { + Runtime.getRuntime().exit(0); + } } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ConsumerThread.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ConsumerThread.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ConsumerThread.java new file mode 100644 index 0000000..3efa1af --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ConsumerThread.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.cli.commands.util; + + +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import java.util.concurrent.CountDownLatch; + +public class ConsumerThread extends Thread +{ + + int messageCount = 1000; + int receiveTimeOut = 3000; + Destination destination; + Session session; + boolean durable; + boolean breakOnNull = true; + int sleep; + int batchSize; + boolean verbose; + + int received = 0; + int transactions = 0; + boolean running = false; + CountDownLatch finished; + boolean bytesAsText; + + public ConsumerThread(Session session, Destination destination, int threadNr) + { + super("Consumer " + destination.toString() + ", thread=" + threadNr); + this.destination = destination; + this.session = session; + } + + @Override + public void run() + { + running = true; + MessageConsumer consumer = null; + String threadName = Thread.currentThread().getName(); + System.out.println(threadName + " wait until " + messageCount + " messages are consumed"); + try + { + if (durable && destination instanceof Topic) + { + consumer = session.createDurableSubscriber((Topic) destination, getName()); + } + else + { + consumer = session.createConsumer(destination); + } + while (running && received < messageCount) + { + Message msg = consumer.receive(receiveTimeOut); + if (msg != null) + { + System.out.println(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID())); + if (bytesAsText && (msg instanceof BytesMessage)) + { + long length = ((BytesMessage) msg).getBodyLength(); + byte[] bytes = new byte[(int) length]; + ((BytesMessage) msg).readBytes(bytes); + System.out.println("Message:" + msg); + } + received++; + } + else + { + if (breakOnNull) + { + break; + } + } + + if (session.getTransacted()) + { + if (batchSize > 0 && received > 0 && received % batchSize == 0) + { + System.out.println(threadName + " Committing transaction: " + transactions++); + session.commit(); + } + } + else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) + { + if (batchSize > 0 && received > 0 && received % batchSize == 0) + { + System.out.println("Acknowledging last " + batchSize + " messages; messages so far = " + received); + msg.acknowledge(); + } + } + if (sleep > 0) + { + Thread.sleep(sleep); + } + + } + + try + { + session.commit(); + } + catch (Throwable ignored) + { + } + } + catch (Exception e) + { + e.printStackTrace(); + } + finally + { + if (finished != null) + { + finished.countDown(); + } + if (consumer != null) + { + System.out.println(threadName + " Consumed: " + this.getReceived() + " messages"); + try + { + consumer.close(); + } + catch (JMSException e) + { + e.printStackTrace(); + } + } + } + + System.out.println(threadName + " Consumer thread finished"); + } + + public int getReceived() + { + return received; + } + + public boolean isDurable() + { + return durable; + } + + public ConsumerThread setDurable(boolean durable) + { + this.durable = durable; + return this; + } + + public ConsumerThread setMessageCount(int messageCount) + { + this.messageCount = messageCount; + return this; + } + + public ConsumerThread setBreakOnNull(boolean breakOnNull) + { + this.breakOnNull = breakOnNull; + return this; + } + + public int getBatchSize() + { + return batchSize; + } + + public ConsumerThread setBatchSize(int batchSize) + { + this.batchSize = batchSize; + return this; + } + + public int getMessageCount() + { + return messageCount; + } + + public boolean isBreakOnNull() + { + return breakOnNull; + } + + public int getReceiveTimeOut() + { + return receiveTimeOut; + } + + public ConsumerThread setReceiveTimeOut(int receiveTimeOut) + { + this.receiveTimeOut = receiveTimeOut; + return this; + } + + public boolean isRunning() + { + return running; + } + + public ConsumerThread setRunning(boolean running) + { + this.running = running; + return this; + } + + public int getSleep() + { + return sleep; + } + + public ConsumerThread setSleep(int sleep) + { + this.sleep = sleep; + return this; + } + + public CountDownLatch getFinished() + { + return finished; + } + + public ConsumerThread setFinished(CountDownLatch finished) + { + this.finished = finished; + return this; + } + + public boolean isBytesAsText() + { + return bytesAsText; + } + + public boolean isVerbose() + { + return verbose; + } + + public ConsumerThread setVerbose(boolean verbose) + { + this.verbose = verbose; + return this; + } + + public ConsumerThread setBytesAsText(boolean bytesAsText) + { + this.bytesAsText = bytesAsText; + return this; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ProducerThread.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ProducerThread.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ProducerThread.java new file mode 100644 index 0000000..820ebbd --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/ProducerThread.java @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.cli.commands.util; + + +import javax.jms.BytesMessage; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URL; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.utils.ReusableLatch; + +public class ProducerThread extends Thread +{ + protected final Session session; + + boolean verbose; + int messageCount = 1000; + boolean runIndefinitely = false; + Destination destination; + int sleep = 0; + boolean persistent = true; + int messageSize = 0; + int textMessageSize; + long msgTTL = 0L; + String msgGroupID = null; + int transactionBatchSize; + + int transactions = 0; + final AtomicInteger sentCount = new AtomicInteger(0); + String message; + String messageText = null; + String payloadUrl = null; + byte[] payload = null; + boolean running = false; + final ReusableLatch finished = new ReusableLatch(1); + final ReusableLatch paused = new ReusableLatch(0); + + + public ProducerThread(Session session, Destination destination, int threadNr) + { + super("Producer " + destination.toString() + ", thread=" + threadNr); + this.destination = destination; + this.session = session; + } + + public void run() + { + MessageProducer producer = null; + String threadName = Thread.currentThread().getName(); + try + { + producer = session.createProducer(destination); + producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + producer.setTimeToLive(msgTTL); + initPayLoad(); + running = true; + + System.out.println(threadName + " Started to calculate elapsed time ...\n"); + long tStart = System.currentTimeMillis(); + + if (runIndefinitely) + { + while (running) + { + paused.await(); + sendMessage(producer, threadName); + sentCount.incrementAndGet(); + } + } + else + { + for (sentCount.set(0); sentCount.get() < messageCount && running; sentCount.incrementAndGet()) + { + paused.await(); + sendMessage(producer, threadName); + } + } + + try + { + session.commit(); + } + catch (Throwable ignored) + { + } + + System.out.println(threadName + " Produced: " + this.getSentCount() + " messages"); + long tEnd = System.currentTimeMillis(); + long elapsed = (tEnd - tStart) / 1000; + System.out.println(threadName + " Elapsed time in second : " + elapsed + " s"); + System.out.println(threadName + " Elapsed time in milli second : " + (tEnd - tStart) + " milli seconds"); + + } + catch (Exception e) + { + e.printStackTrace(); + } + finally + { + if (finished != null) + { + finished.countDown(); + } + if (producer != null) + { + try + { + producer.close(); + } + catch (JMSException e) + { + e.printStackTrace(); + } + } + } + } + + private void sendMessage(MessageProducer producer, String threadName) throws Exception + { + Message message = createMessage(sentCount.get(), threadName); + producer.send(message); + if (verbose) + { + System.out.println(threadName + " Sent: " + (message instanceof TextMessage ? ((TextMessage) message).getText() : message.getJMSMessageID())); + } + + if (transactionBatchSize > 0 && sentCount.get() > 0 && sentCount.get() % transactionBatchSize == 0) + { + System.out.println(threadName + " Committing transaction: " + transactions++); + session.commit(); + } + + if (sleep > 0) + { + Thread.sleep(sleep); + } + } + + private void initPayLoad() + { + if (messageSize > 0) + { + payload = new byte[messageSize]; + for (int i = 0; i < payload.length; i++) + { + payload[i] = '.'; + } + } + } + + protected Message createMessage(int i, String threadName) throws Exception + { + Message answer; + if (payload != null) + { + answer = session.createBytesMessage(); + ((BytesMessage) answer).writeBytes(payload); + } + else + { + if (textMessageSize > 0) + { + if (messageText == null) + { + messageText = readInputStream(getClass().getResourceAsStream("demo.txt"), textMessageSize, i); + } + } + else if (payloadUrl != null) + { + messageText = readInputStream(new URL(payloadUrl).openStream(), -1, i); + } + else if (message != null) + { + messageText = message; + } + else + { + messageText = createDefaultMessage(i); + } + answer = session.createTextMessage(messageText); + } + if ((msgGroupID != null) && (!msgGroupID.isEmpty())) + { + answer.setStringProperty("JMSXGroupID", msgGroupID); + } + + answer.setIntProperty("count", i); + answer.setStringProperty("ThreadSent", threadName); + return answer; + } + + private String readInputStream(InputStream is, int size, int messageNumber) throws IOException + { + InputStreamReader reader = new InputStreamReader(is); + try + { + char[] buffer; + if (size > 0) + { + buffer = new char[size]; + } + else + { + buffer = new char[1024]; + } + int count; + StringBuilder builder = new StringBuilder(); + while ((count = reader.read(buffer)) != -1) + { + builder.append(buffer, 0, count); + if (size > 0) break; + } + return builder.toString(); + } + catch (IOException ioe) + { + return createDefaultMessage(messageNumber); + } + finally + { + reader.close(); + } + } + + private String createDefaultMessage(int messageNumber) + { + return "test message: " + messageNumber; + } + + public ProducerThread setMessageCount(int messageCount) + { + this.messageCount = messageCount; + return this; + } + + public int getSleep() + { + return sleep; + } + + public ProducerThread setSleep(int sleep) + { + this.sleep = sleep; + return this; + } + + public int getMessageCount() + { + return messageCount; + } + + public int getSentCount() + { + return sentCount.get(); + } + + public boolean isPersistent() + { + return persistent; + } + + public ProducerThread setPersistent(boolean persistent) + { + this.persistent = persistent; + return this; + } + + public boolean isRunning() + { + return running; + } + + public ProducerThread setRunning(boolean running) + { + this.running = running; + return this; + } + + public long getMsgTTL() + { + return msgTTL; + } + + public ProducerThread setMsgTTL(long msgTTL) + { + this.msgTTL = msgTTL; + return this; + } + + public int getTransactionBatchSize() + { + return transactionBatchSize; + } + + public ProducerThread setTransactionBatchSize(int transactionBatchSize) + { + this.transactionBatchSize = transactionBatchSize; + return this; + } + + public String getMsgGroupID() + { + return msgGroupID; + } + + public ProducerThread setMsgGroupID(String msgGroupID) + { + this.msgGroupID = msgGroupID; + return this; + } + + public int getTextMessageSize() + { + return textMessageSize; + } + + public ProducerThread setTextMessageSize(int textMessageSize) + { + this.textMessageSize = textMessageSize; + return this; + } + + public int getMessageSize() + { + return messageSize; + } + + public ProducerThread setMessageSize(int messageSize) + { + this.messageSize = messageSize; + return this; + } + + public ReusableLatch getFinished() + { + return finished; + } + + public ProducerThread setFinished(int value) + { + finished.setCount(value); + return this; + } + + public String getPayloadUrl() + { + return payloadUrl; + } + + public ProducerThread setPayloadUrl(String payloadUrl) + { + this.payloadUrl = payloadUrl; + return this; + } + + public String getMessage() + { + return message; + } + + public ProducerThread setMessage(String message) + { + this.message = message; + return this; + } + + public boolean isRunIndefinitely() + { + return runIndefinitely; + } + + public ProducerThread setRunIndefinitely(boolean runIndefinitely) + { + this.runIndefinitely = runIndefinitely; + return this; + } + + public ProducerThread pauseProducer() + { + this.paused.countUp(); + return this; + } + + public ProducerThread resumeProducer() + { + this.paused.countDown(); + return this; + } + + public ProducerThread resetCounters() + { + this.sentCount.set(0); + return this; + } + + + public boolean isVerbose() + { + return verbose; + } + + public ProducerThread setVerbose(boolean verbose) + { + this.verbose = verbose; + return this; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/util/demo.txt ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/util/demo.txt b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/util/demo.txt new file mode 100644 index 0000000..4a6002e --- /dev/null +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/util/demo.txt @@ -0,0 +1,15 @@ +Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. +Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. +Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. +Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. +Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis. +At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, At accusam aliquyam diam diam dolore dolores duo eirmod eos erat, et nonumy sed tempor et et invidunt justo labore Stet clita ea et gubergren, kasd magna no rebum. sanctus sea sed takimata ut vero voluptua. est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat. +Consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. +Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. +Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. +Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat.Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis. +At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, At accusam aliquyam diam diam dolore dolores duo eirmod eos erat, et nonumy sed tempor et et invidunt justo labore Stet clita ea et gubergren, kasd magna no rebum. sanctus sea sed takimata ut vero voluptua. est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat. +Consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. +Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. +Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. +Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-cli/src/test/java/org/apache/activemq/artemis/test/ArtemisTest.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/test/java/org/apache/activemq/artemis/test/ArtemisTest.java b/artemis-cli/src/test/java/org/apache/activemq/artemis/test/ArtemisTest.java index e38ed14..7d00328 100644 --- a/artemis-cli/src/test/java/org/apache/activemq/artemis/test/ArtemisTest.java +++ b/artemis-cli/src/test/java/org/apache/activemq/artemis/test/ArtemisTest.java @@ -16,15 +16,40 @@ */ package org.apache.activemq.artemis.test; +import java.io.File; +import java.util.concurrent.TimeUnit; + import org.apache.activemq.artemis.cli.Artemis; +import org.apache.activemq.artemis.cli.commands.Run; +import org.junit.After; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; /** * Test to validate that the CLI doesn't throw improper exceptions when invoked. */ public class ArtemisTest { + @Rule + public TemporaryFolder temporaryFolder; + + public ArtemisTest() + { + File parent = new File("./target/tmp"); + parent.mkdirs(); + temporaryFolder = new TemporaryFolder(parent); + } + + + @After + public void cleanup() + { + System.clearProperty("artemis.instance"); + Run.setEmbedded(false); + } + @Test public void invalidCliDoesntThrowException() { @@ -37,6 +62,22 @@ public class ArtemisTest testCli("create","/rawr"); } + @Test + public void testSimpleRun() throws Exception + { + Run.setEmbedded(true); + Artemis.main("create", temporaryFolder.getRoot().getAbsolutePath(), "--force", "--silent-input", "--no-web"); + System.setProperty("artemis.instance", temporaryFolder.getRoot().getAbsolutePath()); + // Some exceptions may happen on the initialization, but they should be ok on start the basic core protocol + Artemis.main("run"); + Artemis.main("produce", "--txSize", "500"); + Artemis.main("consume", "--txSize", "500", "--verbose"); + Artemis.main("stop"); + Artemis.main("data", "print"); + Assert.assertTrue(Run.latchRunning.await(5, TimeUnit.SECONDS)); + + } + private void testCli(String... args) { try http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-distribution/src/main/assembly/dep.xml ---------------------------------------------------------------------- diff --git a/artemis-distribution/src/main/assembly/dep.xml b/artemis-distribution/src/main/assembly/dep.xml index 686ed6f..febc8ec 100644 --- a/artemis-distribution/src/main/assembly/dep.xml +++ b/artemis-distribution/src/main/assembly/dep.xml @@ -83,6 +83,7 @@ <include>org.apache.tomcat:tomcat-servlet-api</include> <include>commons-beanutils:commons-beanutils</include> <include>commons-logging:commons-logging</include> + <include>commons-collections:commons-collections</include> <include>org.fusesource.hawtbuf:hawtbuf</include> <include>org.jgroups:jgroups</include> </includes> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java index 5be2aa2..2927f2d 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java @@ -16,15 +16,14 @@ */ package org.apache.activemq.artemis.jms.client; -import java.io.Serializable; -import java.util.UUID; - import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.JMSRuntimeException; import javax.naming.NamingException; import javax.naming.Reference; import javax.naming.Referenceable; +import java.io.Serializable; +import java.util.UUID; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; @@ -53,6 +52,17 @@ public class ActiveMQDestination implements Destination, Serializable, Reference public static final String JMS_TEMP_TOPIC_ADDRESS_PREFIX = "jms.temptopic."; + public static final String QUEUE_QUALIFIED_PREFIX = "queue://"; + public static final String TOPIC_QUALIFIED_PREFIX = "topic://"; + public static final String TEMP_QUEUE_QUALIFED_PREFIX = "temp-queue://"; + public static final String TEMP_TOPIC_QUALIFED_PREFIX = "temp-topic://"; + public static final byte QUEUE_TYPE = 0x01; + public static final byte TOPIC_TYPE = 0x02; + public static final byte TEMP_MASK = 0x04; + public static final byte TEMP_TOPIC_TYPE = TOPIC_TYPE | TEMP_MASK; + public static final byte TEMP_QUEUE_TYPE = QUEUE_TYPE | TEMP_MASK; + + private static final char SEPARATOR = '.'; private static String escape(final String input) @@ -64,6 +74,44 @@ public class ActiveMQDestination implements Destination, Serializable, Reference return input.replace("\\", "\\\\").replace(".", "\\."); } + /** + * Static helper method for working with destinations. + */ + public static ActiveMQDestination createDestination(String name, byte defaultType) + { + if (name.startsWith(QUEUE_QUALIFIED_PREFIX)) + { + return new ActiveMQQueue(name.substring(QUEUE_QUALIFIED_PREFIX.length())); + } + else if (name.startsWith(TOPIC_QUALIFIED_PREFIX)) + { + return new ActiveMQTopic(name.substring(TOPIC_QUALIFIED_PREFIX.length())); + } + else if (name.startsWith(TEMP_QUEUE_QUALIFED_PREFIX)) + { + return new ActiveMQQueue(name.substring(TEMP_QUEUE_QUALIFED_PREFIX.length()), true); + } + else if (name.startsWith(TEMP_TOPIC_QUALIFED_PREFIX)) + { + return new ActiveMQTopic(name.substring(TEMP_TOPIC_QUALIFED_PREFIX.length()), true); + } + + switch (defaultType) + { + case QUEUE_TYPE: + return new ActiveMQQueue(name); + case TOPIC_TYPE: + return new ActiveMQTopic(name); + case TEMP_QUEUE_TYPE: + return new ActiveMQQueue(name, true); + case TEMP_TOPIC_TYPE: + return new ActiveMQTopic(name, true); + default: + throw new IllegalArgumentException("Invalid default destination type: " + defaultType); + } + } + + public static Destination fromAddress(final String address) { if (address.startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX)) @@ -356,7 +404,7 @@ public class ActiveMQDestination implements Destination, Serializable, Reference return false; } - ActiveMQDestination that = (ActiveMQDestination)o; + ActiveMQDestination that = (ActiveMQDestination) o; return address.equals(that.address); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java index 9143f34..0fac316 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java @@ -46,6 +46,11 @@ public class ActiveMQQueue extends ActiveMQDestination implements Queue super(JMS_QUEUE_ADDRESS_PREFIX + name, name, false, true, null); } + public ActiveMQQueue(final String name, boolean temporary) + { + super(JMS_QUEUE_ADDRESS_PREFIX + name, name, temporary, true, null); + } + /** http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf777ec6/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java index 88ea605..d7a6f38 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java @@ -43,7 +43,12 @@ public class ActiveMQTopic extends ActiveMQDestination implements Topic public ActiveMQTopic(final String name) { - super(JMS_TOPIC_ADDRESS_PREFIX + name, name, false, false, null); + this(name, false); + } + + public ActiveMQTopic(final String name, boolean temporary) + { + super(JMS_TOPIC_ADDRESS_PREFIX + name, name, temporary, false, null); }
