Repository: activemq Updated Branches: refs/heads/master 3051882f9 -> ebb3df768
https://issues.apache.org/jira/browse/AMQ-5558 - support durable consumers and ack modes for consumers Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ebb3df76 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ebb3df76 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ebb3df76 Branch: refs/heads/master Commit: ebb3df7681dcf4a17ed0036ec14cab22d2ab3bdf Parents: 3051882 Author: Dejan Bosanac <[email protected]> Authored: Tue Mar 24 13:17:50 2015 +0100 Committer: Dejan Bosanac <[email protected]> Committed: Tue Mar 24 13:18:10 2015 +0100 ---------------------------------------------------------------------- .../apache/activemq/util/ConsumerThread.java | 39 +++++++++--- .../apache/activemq/util/ProducerThread.java | 14 ++--- .../console/command/ConsumerCommand.java | 65 +++++++++++++++++--- .../console/command/ProducerCommand.java | 12 ++-- .../activemq/console/command/consumer.txt | 5 +- .../activemq/console/command/producer.txt | 2 +- 6 files changed, 103 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/ebb3df76/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java b/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java index 86bcadb..18dd20a 100644 --- a/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java +++ b/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java @@ -30,9 +30,10 @@ public class ConsumerThread extends Thread { int receiveTimeOut = 3000; Destination destination; Session session; + boolean durable; boolean breakOnNull = true; int sleep; - int transactionBatchSize; + int batchSize; int received = 0; int transactions = 0; @@ -52,7 +53,11 @@ public class ConsumerThread extends Thread { String threadName = Thread.currentThread().getName(); LOG.info(threadName + " wait until " + messageCount + " messages are consumed"); try { - consumer = session.createConsumer(destination); + if (durable && destination instanceof Topic) { + consumer = session.createDurableSubscriber((Topic) destination, getName()); + } else { + consumer = session.createConsumer(destination); + } while (running && received < messageCount) { Message msg = consumer.receive(receiveTimeOut); if (msg != null) { @@ -70,11 +75,17 @@ public class ConsumerThread extends Thread { } } - if (transactionBatchSize > 0 && received > 0 && received % transactionBatchSize == 0) { - LOG.info(threadName + " Committing transaction: " + transactions++); - session.commit(); + if (session.getTransacted()) { + if (batchSize > 0 && received > 0 && received % batchSize == 0) { + LOG.info(threadName + " Committing transaction: " + transactions++); + session.commit(); + } + } else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { + if (batchSize > 0 && received > 0 && received % batchSize == 0) { + LOG.info("Acknowledging last " + batchSize + " messages; messages so far = " + received); + msg.acknowledge(); + } } - if (sleep > 0) { Thread.sleep(sleep); } @@ -103,6 +114,14 @@ public class ConsumerThread extends Thread { return received; } + public boolean isDurable() { + return durable; + } + + public void setDurable(boolean durable) { + this.durable = durable; + } + public void setMessageCount(int messageCount) { this.messageCount = messageCount; } @@ -111,12 +130,12 @@ public class ConsumerThread extends Thread { this.breakOnNull = breakOnNull; } - public int getTransactionBatchSize() { - return transactionBatchSize; + public int getBatchSize() { + return batchSize; } - public void setTransactionBatchSize(int transactionBatchSize) { - this.transactionBatchSize = transactionBatchSize; + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; } public int getMessageCount() { http://git-wip-us.apache.org/repos/asf/activemq/blob/ebb3df76/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java b/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java index 638f60b..ffaa735 100644 --- a/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java +++ b/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java @@ -43,7 +43,7 @@ public class ProducerThread extends Thread { int sentCount = 0; String message; String messageText = null; - String url = null; + String payloadUrl = null; byte[] payload = null; boolean running = false; CountDownLatch finished; @@ -123,8 +123,8 @@ public class ProducerThread extends Thread { if (messageText == null) { messageText = readInputStream(getClass().getResourceAsStream("demo.txt"), textMessageSize, i); } - } else if (url != null) { - messageText = readInputStream(new URL(url).openStream(), -1, i); + } else if (payloadUrl != null) { + messageText = readInputStream(new URL(payloadUrl).openStream(), -1, i); } else if (message != null) { messageText = message; } else { @@ -249,12 +249,12 @@ public class ProducerThread extends Thread { this.finished = finished; } - public String getUrl() { - return url; + public String getPayloadUrl() { + return payloadUrl; } - public void setUrl(String url) { - this.url = url; + public void setPayloadUrl(String payloadUrl) { + this.payloadUrl = payloadUrl; } public String getMessage() { http://git-wip-us.apache.org/repos/asf/activemq/blob/ebb3df76/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java ---------------------------------------------------------------------- diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java index 58f37b8..9439f95 100644 --- a/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java +++ b/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java @@ -19,7 +19,6 @@ package org.apache.activemq.console.command; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.util.ConsumerThread; -import org.apache.activemq.util.IntrospectionSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +36,11 @@ public class ConsumerCommand extends AbstractCommand { String destination = "queue://TEST"; int messageCount = 1000; int sleep; - int transactionBatchSize; + boolean transacted; + private boolean durable; + private String clientId; + int batchSize = 10; + int ackMode = Session.AUTO_ACKNOWLEDGE; int parallelThreads = 1; boolean bytesAsText; @@ -52,13 +55,16 @@ public class ConsumerCommand extends AbstractCommand { Connection conn = null; try { conn = factory.createConnection(user, password); + if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) { + conn.setClientID(clientId); + } conn.start(); Session sess; - if (transactionBatchSize != 0) { + if (transacted) { sess = conn.createSession(true, Session.SESSION_TRANSACTED); } else { - sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + sess = conn.createSession(false, ackMode); } @@ -67,10 +73,11 @@ public class ConsumerCommand extends AbstractCommand { for (int i = 1; i <= parallelThreads; i++) { ConsumerThread consumer = new ConsumerThread(sess, ActiveMQDestination.createDestination(destination, ActiveMQDestination.QUEUE_TYPE)); consumer.setName("consumer-" + i); + consumer.setDurable(durable); consumer.setBreakOnNull(false); consumer.setMessageCount(messageCount); consumer.setSleep(sleep); - consumer.setTransactionBatchSize(transactionBatchSize); + consumer.setBatchSize(batchSize); consumer.setFinished(active); consumer.setBytesAsText(bytesAsText); consumer.start(); @@ -132,12 +139,12 @@ public class ConsumerCommand extends AbstractCommand { this.sleep = sleep; } - public int getTransactionBatchSize() { - return transactionBatchSize; + public int getBatchSize() { + return batchSize; } - public void setTransactionBatchSize(int transactionBatchSize) { - this.transactionBatchSize = transactionBatchSize; + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; } public int getParallelThreads() { @@ -156,6 +163,46 @@ public class ConsumerCommand extends AbstractCommand { this.bytesAsText = bytesAsText; } + public boolean isTransacted() { + return transacted; + } + + public void setTransacted(boolean transacted) { + this.transacted = transacted; + } + + public int getAckMode() { + return ackMode; + } + + public void setAckMode(String ackMode) { + if ("CLIENT_ACKNOWLEDGE".equals(ackMode)) { + this.ackMode = Session.CLIENT_ACKNOWLEDGE; + } + if ("AUTO_ACKNOWLEDGE".equals(ackMode)) { + this.ackMode = Session.AUTO_ACKNOWLEDGE; + } + if ("DUPS_OK_ACKNOWLEDGE".equals(ackMode)) { + this.ackMode = Session.DUPS_OK_ACKNOWLEDGE; + } + } + + public boolean isDurable() { + return durable; + } + + public void setDurable(boolean durable) { + this.durable = durable; + } + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + @Override protected void printHelp() { printHelpFromFile(); http://git-wip-us.apache.org/repos/asf/activemq/blob/ebb3df76/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java ---------------------------------------------------------------------- diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java index e4c53fe..8138a2c 100644 --- a/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java +++ b/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java @@ -38,7 +38,7 @@ public class ProducerCommand extends AbstractCommand { int sleep = 0; boolean persistent = true; String message = null; - String url = null; + String payloadUrl = null; int messageSize = 0; int textMessageSize; long msgTTL = 0L; @@ -78,7 +78,7 @@ public class ProducerCommand extends AbstractCommand { producer.setPersistent(persistent); producer.setTransactionBatchSize(transactionBatchSize); producer.setMessage(message); - producer.setUrl(url); + producer.setPayloadUrl(payloadUrl); producer.setMessageSize(messageSize); producer.setMsgGroupID(msgGroupID); producer.setTextMessageSize(textMessageSize); @@ -198,12 +198,12 @@ public class ProducerCommand extends AbstractCommand { this.parallelThreads = parallelThreads; } - public String getUrl() { - return url; + public String getPayloadUrl() { + return payloadUrl; } - public void setUrl(String url) { - this.url = url; + public void setPayloadUrl(String payloadUrl) { + this.payloadUrl = payloadUrl; } public String getMessage() { http://git-wip-us.apache.org/repos/asf/activemq/blob/ebb3df76/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt ---------------------------------------------------------------------- diff --git a/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt b/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt index 986a771..9cf7c55 100644 --- a/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt +++ b/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt @@ -7,6 +7,9 @@ Options : [--destination queue://..|topic://..] - consumer destination; default queue://TEST [--messageCount N] - number of messages to send; default 1000 [--sleep N] - millisecond sleep period between sends or receives; default 0 - [--transactionBatchSize N] - use send transaction batches of size N; default 0, no jms transactions + [--ackMode AUTO_ACKNOWLEDGE|CLIENT_ACKNOWLEDGE] - the type of message acknowledgement to use; default auto acknowledge + [--batchSize N] - batch size for transactions and client acknowledgment (default 10) + [--durable true|false] - create durable topic + [--clientId ..] - connection client id; must be set for durable topics [--parallelThreads N] - number of threads to run in parallel; default 1 [--bytesAsText true|false] - try to treat a BytesMessage as a text string \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/ebb3df76/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt ---------------------------------------------------------------------- diff --git a/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt b/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt index 65f437a..1f37586 100644 --- a/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt +++ b/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt @@ -14,5 +14,5 @@ Options : [--messageSize N] - size in bytes of a BytesMessage; default 0, a simple TextMessage is used [--textMessageSize N] - size in bytes of a TextMessage, a Lorem ipsum demo TextMessage is used [--message ..] - a text string to use as the message body - [--url URL] - a url pointing to a document to use as the message body + [--payloadUrl URL] - a url pointing to a document to use as the message body [--msgGroupID ..] - JMS message group identifier \ No newline at end of file
