Repository: activemq Updated Branches: refs/heads/master 85d9d4e94 -> df3ff9c65
https://issues.apache.org/jira/browse/AMQ-5558 - some more options for producer/consumer tools Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/df3ff9c6 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/df3ff9c6 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/df3ff9c6 Branch: refs/heads/master Commit: df3ff9c65e0567dc6bc18de769c47e65623ccb71 Parents: 85d9d4e Author: Dejan Bosanac <[email protected]> Authored: Mon Mar 23 14:46:14 2015 +0100 Committer: Dejan Bosanac <[email protected]> Committed: Mon Mar 23 14:46:35 2015 +0100 ---------------------------------------------------------------------- .../apache/activemq/util/ConsumerThread.java | 15 ++++ .../apache/activemq/util/ProducerThread.java | 84 ++++++++++++++------ .../console/command/ConsumerCommand.java | 11 ++- .../console/command/ProducerCommand.java | 22 ++++- .../activemq/console/command/consumer.txt | 3 +- .../activemq/console/command/producer.txt | 2 + 6 files changed, 109 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/df3ff9c6/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java b/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java index 402b2a5..86bcadb 100644 --- a/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java +++ b/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java @@ -38,6 +38,7 @@ public class ConsumerThread extends Thread { int transactions = 0; boolean running = false; CountDownLatch finished; + boolean bytesAsText; public ConsumerThread(Session session, Destination destination) { this.destination = destination; @@ -56,6 +57,12 @@ public class ConsumerThread extends Thread { Message msg = consumer.receive(receiveTimeOut); if (msg != null) { LOG.info(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID())); + if (bytesAsText && (msg instanceof BytesMessage)) { + long length = ((BytesMessage) msg).getBodyLength(); + byte[] bytes = new byte[(int) length]; + ((BytesMessage) msg).readBytes(bytes); + LOG.info("BytesMessage as text string: " + new String(bytes)); + } received++; } else { if (breakOnNull) { @@ -151,4 +158,12 @@ public class ConsumerThread extends Thread { public void setFinished(CountDownLatch finished) { this.finished = finished; } + + public boolean isBytesAsText() { + return bytesAsText; + } + + public void setBytesAsText(boolean bytesAsText) { + this.bytesAsText = bytesAsText; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/df3ff9c6/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java b/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java index ad44259..638f60b 100644 --- a/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java +++ b/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java @@ -20,10 +20,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.*; -import java.io.File; -import java.io.FileReader; -import java.io.InputStream; -import java.io.InputStreamReader; +import java.io.*; import java.net.URL; import java.util.concurrent.CountDownLatch; @@ -44,6 +41,9 @@ public class ProducerThread extends Thread { int transactions = 0; int sentCount = 0; + String message; + String messageText = null; + String url = null; byte[] payload = null; boolean running = false; CountDownLatch finished; @@ -114,35 +114,55 @@ public class ProducerThread extends Thread { } protected Message createMessage(int i) throws Exception { - Message message = null; + Message answer; if (payload != null) { - message = session.createBytesMessage(); - ((BytesMessage)message).writeBytes(payload); + answer = session.createBytesMessage(); + ((BytesMessage) answer).writeBytes(payload); } else { if (textMessageSize > 0) { - InputStreamReader reader = null; - try { - InputStream is = getClass().getResourceAsStream("demo.txt"); - reader = new InputStreamReader(is); - char[] chars = new char[textMessageSize]; - reader.read(chars); - message = session.createTextMessage(String.valueOf(chars)); - } catch (Exception e) { - LOG.warn(Thread.currentThread().getName() + " Failed to load " + textMessageSize + " bytes of demo text. Using default text message instead"); - message = session.createTextMessage("test message: " + i); - } finally { - if (reader != null) { - reader.close(); - } + if (messageText == null) { + messageText = readInputStream(getClass().getResourceAsStream("demo.txt"), textMessageSize, i); } + } else if (url != null) { + messageText = readInputStream(new URL(url).openStream(), -1, i); + } else if (message != null) { + messageText = message; } else { - message = session.createTextMessage("test message: " + i); + messageText = createDefaultMessage(i); } + answer = session.createTextMessage(messageText); } if ((msgGroupID != null) && (!msgGroupID.isEmpty())) { - message.setStringProperty("JMSXGroupID", msgGroupID); + answer.setStringProperty("JMSXGroupID", msgGroupID); } - return message; + return answer; + } + + private String readInputStream(InputStream is, int size, int messageNumber) throws IOException { + InputStreamReader reader = new InputStreamReader(is); + try { + char[] buffer; + if (size > 0) { + buffer = new char[size]; + } else { + buffer = new char[1024]; + } + int count; + StringBuilder builder = new StringBuilder(); + while ((count = reader.read(buffer)) != -1) { + builder.append(buffer, 0, count); + if (size > 0) break; + } + return builder.toString(); + } catch (IOException ioe) { + return createDefaultMessage(messageNumber); + } finally { + reader.close(); + } + } + + private String createDefaultMessage(int messageNumber) { + return "test message: " + messageNumber; } public void setMessageCount(int messageCount) { @@ -228,4 +248,20 @@ public class ProducerThread extends Thread { public void setFinished(CountDownLatch finished) { this.finished = finished; } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/df3ff9c6/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java ---------------------------------------------------------------------- diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java index 3998610..58f37b8 100644 --- a/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java +++ b/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java @@ -39,6 +39,7 @@ public class ConsumerCommand extends AbstractCommand { int sleep; int transactionBatchSize; int parallelThreads = 1; + boolean bytesAsText; @Override protected void runTask(List<String> tokens) throws Exception { @@ -71,6 +72,7 @@ public class ConsumerCommand extends AbstractCommand { consumer.setSleep(sleep); consumer.setTransactionBatchSize(transactionBatchSize); consumer.setFinished(active); + consumer.setBytesAsText(bytesAsText); consumer.start(); } @@ -146,6 +148,14 @@ public class ConsumerCommand extends AbstractCommand { this.parallelThreads = parallelThreads; } + public boolean isBytesAsText() { + return bytesAsText; + } + + public void setBytesAsText(boolean bytesAsText) { + this.bytesAsText = bytesAsText; + } + @Override protected void printHelp() { printHelpFromFile(); @@ -160,5 +170,4 @@ public class ConsumerCommand extends AbstractCommand { public String getOneLineDescription() { return "Receives messages from the broker"; } - } http://git-wip-us.apache.org/repos/asf/activemq/blob/df3ff9c6/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java ---------------------------------------------------------------------- diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java index 5f3ac92..e4c53fe 100644 --- a/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java +++ b/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java @@ -18,14 +18,12 @@ package org.apache.activemq.console.command; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.ProducerThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.Connection; import javax.jms.Session; -import java.io.*; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -39,6 +37,8 @@ public class ProducerCommand extends AbstractCommand { int messageCount = 1000; int sleep = 0; boolean persistent = true; + String message = null; + String url = null; int messageSize = 0; int textMessageSize; long msgTTL = 0L; @@ -77,6 +77,8 @@ public class ProducerCommand extends AbstractCommand { producer.setMsgTTL(msgTTL); producer.setPersistent(persistent); producer.setTransactionBatchSize(transactionBatchSize); + producer.setMessage(message); + producer.setUrl(url); producer.setMessageSize(messageSize); producer.setMsgGroupID(msgGroupID); producer.setTextMessageSize(textMessageSize); @@ -196,6 +198,22 @@ public class ProducerCommand extends AbstractCommand { this.parallelThreads = parallelThreads; } + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + @Override protected void printHelp() { printHelpFromFile(); http://git-wip-us.apache.org/repos/asf/activemq/blob/df3ff9c6/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt ---------------------------------------------------------------------- diff --git a/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt b/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt index bc6f4df..986a771 100644 --- a/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt +++ b/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt @@ -8,4 +8,5 @@ Options : [--messageCount N] - number of messages to send; default 1000 [--sleep N] - millisecond sleep period between sends or receives; default 0 [--transactionBatchSize N] - use send transaction batches of size N; default 0, no jms transactions - [--parallelThreads N] - number of threads to run in parallel; default 1 \ No newline at end of file + [--parallelThreads N] - number of threads to run in parallel; default 1 + [--bytesAsText true|false] - try to treat a BytesMessage as a text string \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/df3ff9c6/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt ---------------------------------------------------------------------- diff --git a/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt b/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt index 0e183fb..65f437a 100644 --- a/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt +++ b/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt @@ -13,4 +13,6 @@ Options : [--msgTTL N] - message TTL in milliseconds [--messageSize N] - size in bytes of a BytesMessage; default 0, a simple TextMessage is used [--textMessageSize N] - size in bytes of a TextMessage, a Lorem ipsum demo TextMessage is used + [--message ..] - a text string to use as the message body + [--url URL] - a url pointing to a document to use as the message body [--msgGroupID ..] - JMS message group identifier \ No newline at end of file
