This is an automated email from the ASF dual-hosted git repository. yuanbo pushed a commit to branch TUBEMQ-421 in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit 6ce60a894b1bed29d8bd1602d936cf9c32581b77 Author: gosonzhang <[email protected]> AuthorDate: Wed Dec 9 15:38:34 2020 +0800 [TUBEMQ-444]Add consume and produce Cli commands (#343) Co-authored-by: gosonzhang <[email protected]> --- ...nsumer-perf-test.sh => tubemq-consumer-test.sh} | 2 +- ...oducer-perf-test.sh => tubemq-producer-test.sh} | 2 +- .../apache/tubemq/corebase/TErrCodeConstants.java | 6 + .../apache/tubemq/corebase/utils/MixedUtils.java | 59 +++ tubemq-example/pom.xml | 10 - tubemq-example/src/main/assembly/assembly.xml | 3 - .../apache/tubemq/example/ArgsParserHelper.java | 48 --- .../tubemq/example/MAMessageProducerExample.java | 100 ++---- .../tubemq/example/MessageConsumerExample.java | 125 +++---- .../tubemq/server/common/fielddef/CliArgDef.java | 70 ++-- .../tubemq/server/tools/cli/CliAbstractBase.java | 76 ++++ .../tubemq/server/tools/cli/CliConsumer.java | 394 +++++++++++++++++++++ .../tubemq/server/tools/cli/CliProducer.java | 385 ++++++++++++++++++++ 13 files changed, 1037 insertions(+), 243 deletions(-) diff --git a/bin/tubemq-consumer-perf-test.sh b/bin/tubemq-consumer-test.sh similarity index 94% rename from bin/tubemq-consumer-perf-test.sh rename to bin/tubemq-consumer-test.sh index 54189a2..23292fb 100644 --- a/bin/tubemq-consumer-perf-test.sh +++ b/bin/tubemq-consumer-test.sh @@ -37,4 +37,4 @@ if [ -z "$BASE_DIR" ] ; then #echo "TubeMQ master is at $BASE_DIR" fi source $BASE_DIR/bin/env.sh -$JAVA $TOOLS_ARGS org.apache.tubemq.example.MessageConsumerExample $@ +$JAVA $TOOLS_ARGS org.apache.tubemq.server.tools.cli.CliConsumer $@ diff --git a/bin/tubemq-producer-perf-test.sh b/bin/tubemq-producer-test.sh similarity index 94% rename from bin/tubemq-producer-perf-test.sh rename to bin/tubemq-producer-test.sh index ed76c3a..0f96033 100644 --- a/bin/tubemq-producer-perf-test.sh +++ b/bin/tubemq-producer-test.sh @@ -37,4 +37,4 @@ if [ -z "$BASE_DIR" ] ; then #echo "TubeMQ master is at $BASE_DIR" fi source $BASE_DIR/bin/env.sh -$JAVA $TOOLS_ARGS org.apache.tubemq.example.MAMessageProducerExample $@ +$JAVA $TOOLS_ARGS org.apache.tubemq.server.tools.cli.CliProducer $@ diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/TErrCodeConstants.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/TErrCodeConstants.java index 441af38..8bdafa0 100644 --- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/TErrCodeConstants.java +++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/TErrCodeConstants.java @@ -18,6 +18,9 @@ package org.apache.tubemq.corebase; +import java.util.Arrays; +import java.util.List; + public class TErrCodeConstants { public static final int SUCCESS = 200; public static final int NOT_READY = 201; @@ -44,4 +47,7 @@ public class TErrCodeConstants { public static final int SERVICE_UNAVAILABLE = 503; public static final int INTERNAL_SERVER_ERROR_MSGSET_NULL = 510; + public static final List<Integer> IGNORE_ERROR_SET = + Arrays.asList(BAD_REQUEST, NOT_FOUND, ALL_PARTITION_FROZEN, + NO_PARTITION_ASSIGNED, ALL_PARTITION_WAITING, ALL_PARTITION_INUSE); } diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java index 17ccb5e..bfbedad 100644 --- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java +++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java @@ -17,6 +17,16 @@ package org.apache.tubemq.corebase.utils; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeSet; + +import org.apache.commons.codec.binary.StringUtils; +import org.apache.tubemq.corebase.TokenConstants; + + + public class MixedUtils { // java version cache private static String javaVersion = ""; @@ -33,4 +43,53 @@ public class MixedUtils { return javaVersion.substring(0, maxLen); } } + + /** + * parse topic Parameter with format topic_1[,topic_2[:filterCond_2.1[;filterCond_2.2]]] + * topicParam->set(filterCond) map + * @param topicParam - composite string + * @return - map of topic->set(filterCond) + */ + public static Map<String, TreeSet<String>> parseTopicParam(String topicParam) { + Map<String, TreeSet<String>> topicAndFiltersMap = new HashMap<>(); + if (TStringUtils.isBlank(topicParam)) { + return topicAndFiltersMap; + } + String[] topicFilterStrs = topicParam.split(TokenConstants.ARRAY_SEP); + for (String topicFilterStr : topicFilterStrs) { + if (TStringUtils.isBlank(topicFilterStr)) { + continue; + } + String[] topicFilters = topicFilterStr.split(TokenConstants.ATTR_SEP); + if (TStringUtils.isBlank(topicFilters[0])) { + continue; + } + TreeSet<String> filterSet = new TreeSet<>(); + if (topicFilters.length > 1 + && TStringUtils.isNotBlank(topicFilters[1])) { + String[] filterItems = topicFilters[1].split(TokenConstants.LOG_SEG_SEP); + for (String filterItem : filterItems) { + if (TStringUtils.isBlank(filterItem)) { + continue; + } + filterSet.add(filterItem.trim()); + } + } + topicAndFiltersMap.put(topicFilters[0].trim(), filterSet); + } + return topicAndFiltersMap; + } + + public static byte[] buildTestData(int bodySize) { + final byte[] transmitData = + StringUtils.getBytesUtf8("This is a test data!"); + final ByteBuffer dataBuffer = ByteBuffer.allocate(bodySize); + while (dataBuffer.hasRemaining()) { + int offset = dataBuffer.arrayOffset(); + dataBuffer.put(transmitData, offset, + Math.min(dataBuffer.remaining(), transmitData.length)); + } + dataBuffer.flip(); + return dataBuffer.array(); + } } diff --git a/tubemq-example/pom.xml b/tubemq-example/pom.xml index 319acab..9ab6775 100644 --- a/tubemq-example/pom.xml +++ b/tubemq-example/pom.xml @@ -65,16 +65,6 @@ <groupId>org.apache.tubemq</groupId> <artifactId>tubemq-client</artifactId> </dependency> - <dependency> - <groupId>commons-cli</groupId> - <artifactId>commons-cli</artifactId> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>${junit.version}</version> - <scope>test</scope> - </dependency> </dependencies> </project> diff --git a/tubemq-example/src/main/assembly/assembly.xml b/tubemq-example/src/main/assembly/assembly.xml index 596af24..11edd32 100644 --- a/tubemq-example/src/main/assembly/assembly.xml +++ b/tubemq-example/src/main/assembly/assembly.xml @@ -32,9 +32,6 @@ <directory>../</directory> <includes> <include>./conf/tools.log4j.properties</include> - <include>./bin/tubemq-consumer-perf-test.sh</include> - <include>./bin/tubemq-producer-perf-test.sh</include> - <include>./bin/env.sh</include> <include>LICENSE</include> <include>NOTICE</include> <include>DISCLAIMER-WIP</include> diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/ArgsParserHelper.java b/tubemq-example/src/main/java/org/apache/tubemq/example/ArgsParserHelper.java deleted file mode 100644 index c507ae4..0000000 --- a/tubemq-example/src/main/java/org/apache/tubemq/example/ArgsParserHelper.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tubemq.example; - -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Options; - -public class ArgsParserHelper { - - /** - * Print help information and exit. - * - * @param opts - options - */ - public static void help(String commandName, Options opts) { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(commandName, opts); - System.exit(0); - } - - /** - * Init common options when parsing args. - * @return - options - */ - public static Options initCommonOptions() { - Options options = new Options(); - options.addOption(null, "help", false, "show help"); - options.addOption(null, "master-list", true, "master address like: host1:8000,host2:8000"); - options.addOption(null, "topic", true, "topic list, topic1,topic2 or " - + "topic1:tid11;tid12,topic2:tid21;tid22(consumer only)"); - return options; - } -} diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java index 9c76ce1..f76b077 100644 --- a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java +++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java @@ -30,13 +30,8 @@ import java.util.TreeSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.DefaultParser; -import org.apache.commons.cli.Options; import org.apache.commons.codec.binary.StringUtils; import org.apache.tubemq.client.config.TubeClientConfig; import org.apache.tubemq.client.exception.TubeClientException; @@ -64,7 +59,7 @@ public class MAMessageProducerExample { private static final int SESSION_FACTORY_NUM = 10; private static Set<String> topicSet; - private static int batchCount; + private static int msgCount; private static int producerCount; private static byte[] sendData; @@ -94,72 +89,45 @@ public class MAMessageProducerExample { } } - /** - * Init options - * - * @return options - */ - public static Options initOptions() { - Options options = ArgsParserHelper.initCommonOptions(); - options.addOption(null, "batch-size", true, "number of messages in single batch, default is 100000"); - options.addOption(null, "max-batch", true, "max batch number, default is 1024"); - options.addOption(null, "thread-num", true, "thread number of producers, default is 1, max is 100"); - return options; - } - public static void main(String[] args) { - Options options = null; - try { - CommandLineParser parser = new DefaultParser(); - options = initOptions(); - CommandLine cl = parser.parse(options, args); - if (cl != null) { - final String masterHostAndPort = cl.getOptionValue("master-list"); - final String topics = cl.getOptionValue("topic"); - final List<String> topicList = Arrays.asList(topics.split(",")); - topicSet = new TreeSet<>(topicList); - - batchCount = Integer.parseInt(cl.getOptionValue("max-batch", "100000")); - int batchSize = Integer.parseInt(cl.getOptionValue("batch-size", "1024")); - producerCount = Math.min(Integer.parseInt(cl.getOptionValue( - "thread-num", "1")), MAX_PRODUCER_NUM); - logger.info("MAMessageProducerExample.main started..."); - final byte[] transmitData = StringUtils - .getBytesUtf8("This is a test message from multi-session factory."); - final ByteBuffer dataBuffer = ByteBuffer.allocate(batchSize); - - while (dataBuffer.hasRemaining()) { - int offset = dataBuffer.arrayOffset(); - dataBuffer.put(transmitData, offset, - Math.min(dataBuffer.remaining(), transmitData.length)); - } + final String masterHostAndPort = args[0]; - dataBuffer.flip(); - sendData = dataBuffer.array(); + final String topics = args[1]; + final List<String> topicList = Arrays.asList(topics.split(",")); - try { - MAMessageProducerExample messageProducer = new MAMessageProducerExample( - masterHostAndPort); + topicSet = new TreeSet<>(topicList); - messageProducer.startService(); + msgCount = Integer.parseInt(args[2]); + producerCount = Math.min(args.length > 4 ? Integer.parseInt(args[3]) : 10, MAX_PRODUCER_NUM); - while (SENT_SUCC_COUNTER.get() < (long) batchCount * producerCount * topicSet.size()) { - TimeUnit.MILLISECONDS.sleep(1000); - } - messageProducer.producerMap.clear(); - messageProducer.shutdown(); + logger.info("MAMessageProducerExample.main started..."); - } catch (TubeClientException e) { - logger.error("TubeClientException: ", e); - } catch (Throwable e) { - logger.error("Throwable: ", e); - } - } - } catch (Exception ex) { - logger.error(ex.getMessage()); - if (options != null) { - ArgsParserHelper.help("./tubemq-producer-perf-test.sh", options); + final byte[] transmitData = StringUtils.getBytesUtf8("This is a test message from multi-session factory."); + final ByteBuffer dataBuffer = ByteBuffer.allocate(1024); + + while (dataBuffer.hasRemaining()) { + int offset = dataBuffer.arrayOffset(); + dataBuffer.put(transmitData, offset, Math.min(dataBuffer.remaining(), transmitData.length)); + } + + dataBuffer.flip(); + sendData = dataBuffer.array(); + + try { + MAMessageProducerExample messageProducer = new MAMessageProducerExample(masterHostAndPort); + + messageProducer.startService(); + + while (SENT_SUCC_COUNTER.get() < msgCount * producerCount * topicSet.size()) { + Thread.sleep(1000); } + messageProducer.producerMap.clear(); + messageProducer.shutdown(); + + } catch (TubeClientException e) { + logger.error("TubeClientException: ", e); + } catch (Throwable e) { + logger.error("Throwable: ", e); } } @@ -205,7 +173,7 @@ public class MAMessageProducerExample { } catch (Throwable t) { logger.error("publish exception: ", t); } - for (int i = 0; i < batchCount; i++) { + for (int i = 0; i < msgCount; i++) { long millis = System.currentTimeMillis(); for (String topic : topicSet) { try { diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java index 3b99943..d9aeb8a 100644 --- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java +++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java @@ -26,10 +26,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.DefaultParser; -import org.apache.commons.cli.Options; import org.apache.tubemq.client.common.PeerInfo; import org.apache.tubemq.client.config.ConsumerConfig; import org.apache.tubemq.client.consumer.ConsumePosition; @@ -63,45 +59,29 @@ public final class MessageConsumerExample { private static final MsgRecvStats msgRecvStats = new MsgRecvStats(); private final PushMessageConsumer messageConsumer; + private final MessageSessionFactory messageSessionFactory; - public MessageConsumerExample(String masterHostAndPort, String group, - int fetchCount, boolean isFromBegin) throws Exception { + public MessageConsumerExample(String masterHostAndPort, String group, int fetchCount) throws Exception { ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group); - if (isFromBegin) { - consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_FIRST_OFFSET); - } else { - consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET); - } + consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET); if (fetchCount > 0) { consumerConfig.setPushFetchThreadCnt(fetchCount); } - MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(consumerConfig); + this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig); this.messageConsumer = messageSessionFactory.createPushConsumer(consumerConfig); } - /** - * Init options - * @return options - */ - public static Options initOptions() { - - Options options = ArgsParserHelper.initCommonOptions(); - options.addOption(null, "batch-size", true, "max number of fetching message in one batch"); - options.addOption(null, "thread-num", true, "thread number of consumers"); - options.addOption(null, "group", true, "consumer group"); - options.addOption(null, "from-begin", false, "default is consuming from latest, " - + "if option is clarified, then consume from begin"); - return options; - - } + public static void main(String[] args) { + final String masterHostAndPort = args[0]; + final String topics = args[1]; + final String group = args[2]; + final int consumerCount = Integer.parseInt(args[3]); + int fetchCount = -1; + if (args.length > 5) { + fetchCount = Integer.parseInt(args[4]); + } + final Map<String, TreeSet<String>> topicTidsMap = new HashMap<>(); - /** - * init topic->set(tid) map - * @param topics - topics string - * @return - map of topic->set(tid) - */ - private static Map<String, TreeSet<String>> initTopicList(String topics) { - Map<String, TreeSet<String>> topicTidsMap = new HashMap<>(); String[] topicTidsList = topics.split(","); for (String topicTids : topicTidsList) { String[] topicTidStr = topicTids.split(":"); @@ -115,60 +95,35 @@ public final class MessageConsumerExample { } topicTidsMap.put(topicTidStr[0], tids); } - return topicTidsMap; - } + final int startFetchCount = fetchCount; + final ExecutorService executorService = Executors.newFixedThreadPool(fetchCount); + for (int i = 0; i < consumerCount; i++) { + executorService.submit(new Runnable() { + @Override + public void run() { + try { + MessageConsumerExample messageConsumer = new MessageConsumerExample( + masterHostAndPort, + group, + startFetchCount + ); + messageConsumer.subscribe(topicTidsMap); + } catch (Exception e) { + logger.error("Create consumer failed!", e); + } + } + }); + } + final Thread statisticThread = new Thread(msgRecvStats, "Received Statistic Thread"); + statisticThread.start(); - public static void main(String[] args) { - Options options = null; + executorService.shutdown(); try { - CommandLineParser parser = new DefaultParser(); - options = initOptions(); - CommandLine cl = parser.parse(options, args); - if (cl != null) { - final String masterHostAndPort = cl.getOptionValue("master-list"); - final Map<String, TreeSet<String>> topicTidsMap = initTopicList( - cl.getOptionValue("topic")); - final String group = cl.getOptionValue("group"); - int threadNum = Integer.parseInt(cl.getOptionValue("thread-num", "1")); - final int fetchCount = Integer.parseInt(cl.getOptionValue("batch-size", "-1")); - final boolean isFromBegin = cl.hasOption("from-begin"); - ExecutorService executorService = Executors.newFixedThreadPool(threadNum); - for (int i = 0; i < threadNum; i++) { - executorService.submit(new Runnable() { - @Override - public void run() { - try { - MessageConsumerExample messageConsumer = new MessageConsumerExample( - masterHostAndPort, - group, - fetchCount, - isFromBegin - ); - messageConsumer.subscribe(topicTidsMap); - } catch (Exception e) { - logger.error("Create consumer failed!", e); - } - } - }); - } - final Thread statisticThread = new Thread(msgRecvStats, - "Received Statistic Thread"); - statisticThread.start(); - - executorService.shutdown(); - try { - executorService.awaitTermination(60 * 1000, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - logger.error("Thread Pool shutdown has been interrupted!"); - } - msgRecvStats.stopStats(); - } - } catch (Exception ex) { - logger.error(ex.getMessage(), ex.getMessage()); - if (options != null) { - ArgsParserHelper.help("./tubemq-consumer-perf-test.sh", options); - } + executorService.awaitTermination(60 * 1000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + logger.error("Thread Pool shutdown has been interrupted!"); } + msgRecvStats.stopStats(); } public void subscribe(Map<String, TreeSet<String>> topicTidsMap) throws TubeClientException { diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/CliArgDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/CliArgDef.java index b2d9327..abb2e2a 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/CliArgDef.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/CliArgDef.java @@ -27,64 +27,76 @@ public enum CliArgDef { HELP("h", "help", "Print usage information."), VERSION("v", "version", "Display TubeMQ version."), - MASTERSERVER("master-servers", "master-servers", + MASTERSERVER(null, "master-servers", "String: format is master1_ip:port[,master2_ip:port]", "The master address(es) to connect to."), - MASTERURL("master-url", "master-url", + MASTERURL(null, "master-url", "String: format is http://master_ip:master_webport/", "Master Service URL to which to connect.(default: http://localhost:8080/)"), - BROKERURL("broker-url", "broker-url", + BROKERURL(null, "broker-url", "String: format is http://broker_ip:broker_webport/", "Broker Service URL to which to connect.(default: http://localhost:8081/)"), - MESSAGES("messages", "messages", + MESSAGES(null, "messages", "Long: count", "The number of messages to send or consume, If not set, production or consumption is continual."), - MSGDATASIZE("msg-data-size", "message-data-size", - "Int: message size", + MSGDATASIZE(null, "message-data-size", + "Int: message size,(0, 1024 * 1024)", "message's data size in bytes. Note that you must provide exactly" + " one of --msg-data-size or --payload-file."), - PAYLOADFILE("payload-file", "payload-file", + PAYLOADFILE(null, "payload-file", "String: payload file path", "file to read the message payloads from. This works only for" + " UTF-8 encoded text files. Payloads will be read from this" + " file and a payload will be randomly selected when sending" + " messages. Note that you must provide exactly one" + " of --msg-data-size or --payload-file."), - PAYLOADDELIM("payload-delimiter", "payload-delimiter", + PAYLOADDELIM(null, "payload-delimiter", "String: payload data's delimiter", "provides delimiter to be used when --payload-file is provided." + " Defaults to new line. Note that this parameter will be" + " ignored if --payload-file is not provided. (default: \\n)"), PRDTOPIC("topic", "topicName", - "String: topic, format is topic_1[,topic_2[:filterCond_2.1[;filterCond_2.2]]]", + "String: topic, format is topic_1[,topic_2[:filterCond_2.1[\\;filterCond_2.2]]]", "The topic(s) to produce messages to."), CNSTOPIC("topic", "topicName", - "String: topic, format is topic_1[[:filterCond_1.1[;filterCond_1.2]][,topic_2]]", + "String: topic, format is topic_1[[:filterCond_1.1[\\;filterCond_1.2]][,topic_2]]", "The topic(s) to consume on."), - RPCTIMEOUT("timeout", "timeout", + RPCTIMEOUT(null, "rpc-timeout", "Long: milliseconds", "The maximum duration between request and response in milliseconds. (default: 10000)"), + CONNREUSE(null, "conn-reuse", + "bool: true or false", + "Different clients reuse TCP connections. (default: true)"), GROUP("group", "groupName", "String: consumer group", - "The consumer group name of the consumer."), - CLIENTCOUNT("client-num", "client-num", - "Int: client count", - "Number of consumers to started."), - PULLMODEL("pull-model", "pull-model", - "Pull consumption model."), - PUSHMODEL("push-model", "push-model", - "Push consumption model."), - FETCHTHREADS("num-fetch-threads", "num-fetch-threads", - "Integer: count", + "The consumer group name of the consumer. (default: test_consume)"), + CLIENTCOUNT(null, "client-count", + "Int: client count, [1, 100]", + "Number of producers or consumers to started."), + PUSHCONSUME(null, "consume-push", + "Push consumption action.(default: pull)"), + FETCHTHREADS(null, "num-fetch-threads", + "Integer: count, [1,100]", "Number of fetch threads, default: num of cpu count."), - FROMLATEST("from-latest", "from-latest", - "Start to consume from the latest message present in the log."), - FROMBEGINNING("from-beginning", "from-beginning", - "If the consumer does not already have an established offset to consume from," - + " start with the earliest message present in the log rather than the latest message."), - OUTPUTINTERVAL("output-interval", "output-interval", - "Integer: interval_ms", - "Interval in milliseconds at which to print progress info. (default: 5000)"); + SENDTHREADS(null, "num-send-threads", + "Integer: count, [1,200]", + "Number of send message threads, default: num of cpu count."), + CONSUMEPOS(null, "consume-position", + "Integer: [-1,0, 1]", + "Set the start position of the consumer group. The value can be [-1, 0, 1]." + + " Default value is 0. -1: Start from 0 for the first time." + + " Otherwise start from last consume position." + + " 0: Start from the latest position for the first time." + + " Otherwise start from last consume position." + + " 1: Start from the latest consume position."), + OUTPUTINTERVAL(null, "output-interval", + "Integer: interval_ms, [5000, +)", + "Interval in milliseconds at which to print progress info. (default: 5000)"), + SYNCPRODUCE(null, "sync-produce", + "Synchronous production. (default: false)"), + WITHOUTDELAY(null, "without-delay", + "Production without delay. (default: false)"); + CliArgDef(String opt, String longOpt, String optDesc) { diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliAbstractBase.java b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliAbstractBase.java new file mode 100644 index 0000000..c0903f2 --- /dev/null +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliAbstractBase.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tubemq.server.tools.cli; + +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.tubemq.server.common.TubeServerVersion; +import org.apache.tubemq.server.common.fielddef.CliArgDef; + + +public abstract class CliAbstractBase { + + protected final String commandName; + protected Options options = new Options(); + protected CommandLineParser parser = new DefaultParser(); + private HelpFormatter formatter = new HelpFormatter(); + + public CliAbstractBase(String commandName) { + this.commandName = commandName; + addCommandOption(CliArgDef.HELP); + addCommandOption(CliArgDef.VERSION); + formatter.setWidth(500); + } + + /** + * Print help information and exit. + * + */ + public void help() { + formatter.printHelp(commandName, options); + System.exit(0); + } + + /** + * Print tubemq server version. + * + */ + public void version() { + System.out.println("TubeMQ " + TubeServerVersion.BROKER_VERSION); + System.exit(0); + } + + public void addCommandOption(CliArgDef cliArgDef) { + Option option = new Option(cliArgDef.opt, + cliArgDef.longOpt, cliArgDef.hasArg, cliArgDef.optDesc); + if (cliArgDef.hasArg) { + option.setArgName(cliArgDef.argDesc); + } + options.addOption(option); + } + + + protected abstract void initCommandOptions(); + + + public abstract boolean parseParams(String[] args) throws Exception; + +} diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliConsumer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliConsumer.java new file mode 100644 index 0000000..666e6f9 --- /dev/null +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliConsumer.java @@ -0,0 +1,394 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tubemq.server.tools.cli; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.ParseException; +import org.apache.tubemq.client.common.PeerInfo; +import org.apache.tubemq.client.config.ConsumerConfig; +import org.apache.tubemq.client.consumer.ConsumePosition; +import org.apache.tubemq.client.consumer.ConsumerResult; +import org.apache.tubemq.client.consumer.MessageConsumer; +import org.apache.tubemq.client.consumer.MessageV2Listener; +import org.apache.tubemq.client.consumer.PullMessageConsumer; +import org.apache.tubemq.client.consumer.PushMessageConsumer; +import org.apache.tubemq.client.exception.TubeClientException; +import org.apache.tubemq.client.factory.MessageSessionFactory; +import org.apache.tubemq.client.factory.TubeMultiSessionFactory; +import org.apache.tubemq.client.factory.TubeSingleSessionFactory; +import org.apache.tubemq.corebase.Message; +import org.apache.tubemq.corebase.TBaseConstants; +import org.apache.tubemq.corebase.TErrCodeConstants; +import org.apache.tubemq.corebase.utils.MixedUtils; +import org.apache.tubemq.corebase.utils.TStringUtils; +import org.apache.tubemq.corebase.utils.ThreadUtils; +import org.apache.tubemq.server.common.fielddef.CliArgDef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + + +/** + * This class is use to process CLI Consumer process. + * + * + */ +public class CliConsumer extends CliAbstractBase { + + private static final Logger logger = + LoggerFactory.getLogger(CliConsumer.class); + // statistic data index + private static final AtomicLong TOTAL_COUNTER = new AtomicLong(0); + // sent data content + private final Map<String, TreeSet<String>> topicAndFiltersMap = new HashMap<>(); + private final List<MessageSessionFactory> sessionFactoryList = new ArrayList<>(); + private final Map<MessageConsumer, TupleValue> consumerMap = new HashMap<>(); + // cli parameters + private String masterServers; + private String groupName = "test_consume"; + private ConsumePosition consumePos = + ConsumePosition.CONSUMER_FROM_LATEST_OFFSET; + private long msgCount = TBaseConstants.META_VALUE_UNDEFINED; + private long rpcTimeoutMs = TBaseConstants.META_VALUE_UNDEFINED; + private boolean reuseConn = false; + private int clientCount = 1; + private int fetchThreadCnt = + Runtime.getRuntime().availableProcessors(); + private long printIntervalMs = 5000; + private boolean isPushConsume = false; + + private boolean isStarted = false; + + + public CliConsumer() { + super("tubemq-consumer-test.sh"); + initCommandOptions(); + } + + /** + * Init command options + */ + protected void initCommandOptions() { + // add the cli required parameters + addCommandOption(CliArgDef.MASTERSERVER); + addCommandOption(CliArgDef.MESSAGES); + addCommandOption(CliArgDef.CNSTOPIC); + addCommandOption(CliArgDef.RPCTIMEOUT); + addCommandOption(CliArgDef.GROUP); + addCommandOption(CliArgDef.CONNREUSE); + addCommandOption(CliArgDef.PUSHCONSUME); + addCommandOption(CliArgDef.CONSUMEPOS); + addCommandOption(CliArgDef.FETCHTHREADS); + addCommandOption(CliArgDef.CLIENTCOUNT); + addCommandOption(CliArgDef.OUTPUTINTERVAL); + addCommandOption(CliArgDef.WITHOUTDELAY); + } + + public boolean parseParams(String[] args) throws Exception { + // parse parameters and check value + CommandLine cli = parser.parse(options, args); + if (cli == null) { + throw new ParseException("Parse args failure"); + } + if (cli.hasOption(CliArgDef.VERSION.longOpt)) { + version(); + } + if (cli.hasOption(CliArgDef.HELP.longOpt)) { + help(); + } + masterServers = cli.getOptionValue(CliArgDef.MASTERSERVER.longOpt); + if (TStringUtils.isBlank(masterServers)) { + throw new Exception(CliArgDef.MASTERSERVER.longOpt + " is required!"); + } + String topicStr = cli.getOptionValue(CliArgDef.CNSTOPIC.longOpt); + if (TStringUtils.isBlank(topicStr)) { + throw new Exception(CliArgDef.CNSTOPIC.longOpt + " is required!"); + } + topicAndFiltersMap.putAll(MixedUtils.parseTopicParam(topicStr)); + if (topicAndFiltersMap.isEmpty()) { + throw new Exception("Invalid " + CliArgDef.CNSTOPIC.longOpt + " parameter value!"); + } + String msgCntStr = cli.getOptionValue(CliArgDef.MESSAGES.longOpt); + if (TStringUtils.isNotBlank(msgCntStr)) { + msgCount = Long.parseLong(msgCntStr); + } + String groupNameStr = cli.getOptionValue(CliArgDef.GROUP.longOpt); + if (TStringUtils.isNotBlank(groupNameStr)) { + groupName = cli.getOptionValue(CliArgDef.GROUP.longOpt); + } + String reuseConnStr = cli.getOptionValue(CliArgDef.CONNREUSE.longOpt); + if (TStringUtils.isNotBlank(reuseConnStr)) { + reuseConn = Boolean.parseBoolean(reuseConnStr); + } + String rpcTimeoutStr = cli.getOptionValue(CliArgDef.RPCTIMEOUT.longOpt); + if (TStringUtils.isNotBlank(rpcTimeoutStr)) { + rpcTimeoutMs = Long.parseLong(rpcTimeoutStr); + } + String clientCntStr = cli.getOptionValue(CliArgDef.CLIENTCOUNT.longOpt); + if (TStringUtils.isNotBlank(clientCntStr)) { + clientCount = Integer.parseInt(clientCntStr); + } + String printIntMsStr = cli.getOptionValue(CliArgDef.OUTPUTINTERVAL.longOpt); + if (TStringUtils.isNotBlank(printIntMsStr)) { + printIntervalMs = Long.parseLong(printIntMsStr); + if (printIntervalMs < 5000) { + throw new Exception("Invalid " + + CliArgDef.OUTPUTINTERVAL.longOpt + + " parameter value!"); + } + } + String consumePosStr = cli.getOptionValue(CliArgDef.CONSUMEPOS.longOpt); + if (TStringUtils.isNotBlank(consumePosStr)) { + int tmpPosId = Integer.parseInt(consumePosStr); + if (tmpPosId > 0) { + consumePos = ConsumePosition.CONSUMER_FROM_MAX_OFFSET_ALWAYS; + } else if (tmpPosId < 0) { + consumePos = ConsumePosition.CONSUMER_FROM_FIRST_OFFSET; + } else { + consumePos = ConsumePosition.CONSUMER_FROM_LATEST_OFFSET; + } + } + if (cli.hasOption(CliArgDef.PUSHCONSUME.longOpt)) { + isPushConsume = true; + } + String fetchThreadCntStr = cli.getOptionValue(CliArgDef.FETCHTHREADS.longOpt); + if (TStringUtils.isNotBlank(fetchThreadCntStr)) { + int tmpFetchThreadCnt = Integer.parseInt(fetchThreadCntStr); + tmpFetchThreadCnt = (tmpFetchThreadCnt < 1) ? 1 : Math.min(tmpFetchThreadCnt, 100); + fetchThreadCnt = tmpFetchThreadCnt; + } + return true; + } + + // initial tubemq client order by caller required + public void initTask() throws Exception { + // initial consumer configure + ConsumerConfig consumerConfig = + new ConsumerConfig(masterServers, groupName); + consumerConfig.setRpcTimeoutMs(rpcTimeoutMs); + consumerConfig.setPushFetchThreadCnt(fetchThreadCnt); + consumerConfig.setConsumePosition(consumePos); + // initial consumer object + if (isPushConsume) { + DefaultMessageListener msgListener = + new DefaultMessageListener(); + if (reuseConn) { + // if reuse connection, need use TubeSingleSessionFactory class + MessageSessionFactory msgSessionFactory = + new TubeSingleSessionFactory(consumerConfig); + this.sessionFactoryList.add(msgSessionFactory); + for (int i = 0; i < clientCount; i++) { + PushMessageConsumer consumer1 = + msgSessionFactory.createPushConsumer(consumerConfig); + for (Map.Entry<String, TreeSet<String>> entry + : topicAndFiltersMap.entrySet()) { + consumer1.subscribe(entry.getKey(), entry.getValue(), msgListener); + } + consumer1.completeSubscribe(); + consumerMap.put(consumer1, null); + } + } else { + for (int i = 0; i < clientCount; i++) { + MessageSessionFactory msgSessionFactory = + new TubeMultiSessionFactory(consumerConfig); + this.sessionFactoryList.add(msgSessionFactory); + PushMessageConsumer consumer1 = + msgSessionFactory.createPushConsumer(consumerConfig); + for (Map.Entry<String, TreeSet<String>> entry + : topicAndFiltersMap.entrySet()) { + consumer1.subscribe(entry.getKey(), entry.getValue(), msgListener); + } + consumer1.completeSubscribe(); + consumerMap.put(consumer1, null); + } + } + } else { + if (reuseConn) { + MessageSessionFactory msgSessionFactory = + new TubeSingleSessionFactory(consumerConfig); + this.sessionFactoryList.add(msgSessionFactory); + for (int i = 0; i < clientCount; i++) { + PullMessageConsumer consumer2 = + msgSessionFactory.createPullConsumer(consumerConfig); + for (Map.Entry<String, TreeSet<String>> entry + : topicAndFiltersMap.entrySet()) { + consumer2.subscribe(entry.getKey(), entry.getValue()); + } + consumer2.completeSubscribe(); + consumerMap.put(consumer2, + new TupleValue(consumer2, msgCount, fetchThreadCnt)); + } + } else { + for (int i = 0; i < clientCount; i++) { + MessageSessionFactory msgSessionFactory = + new TubeMultiSessionFactory(consumerConfig); + this.sessionFactoryList.add(msgSessionFactory); + PullMessageConsumer consumer2 = + msgSessionFactory.createPullConsumer(consumerConfig); + for (Map.Entry<String, TreeSet<String>> entry + : topicAndFiltersMap.entrySet()) { + consumer2.subscribe(entry.getKey(), entry.getValue()); + } + consumer2.completeSubscribe(); + consumerMap.put(consumer2, + new TupleValue(consumer2, msgCount, fetchThreadCnt)); + } + } + } + isStarted = true; + } + + public void shutdown() throws Throwable { + // stop process + ThreadUtils.sleep(20); + for (MessageConsumer consumer : consumerMap.keySet()) { + consumer.shutdown(); + } + for (MessageSessionFactory messageSessionFactory : sessionFactoryList) { + messageSessionFactory.shutdown(); + } + } + + + private static class TupleValue { + public Thread[] fetchRunners = null; + + public TupleValue(PullMessageConsumer consumer, long msgCount, int fetchThreadCnt) { + fetchRunners = new Thread[fetchThreadCnt]; + for (int i = 0; i < fetchRunners.length; i++) { + fetchRunners[i] = new Thread(new FetchRequestRunner(consumer, msgCount)); + fetchRunners[i].setName("_fetch_runner_" + i); + } + for (Thread thread : fetchRunners) { + thread.start(); + } + } + + } + + + // for push consumer callback process + private static class DefaultMessageListener implements MessageV2Listener { + + public DefaultMessageListener() { + } + + @Override + public void receiveMessages(PeerInfo peerInfo, List<Message> messages) { + if (messages != null && !messages.isEmpty()) { + TOTAL_COUNTER.addAndGet(messages.size()); + } + } + + @Override + public void receiveMessages(List<Message> messages) { + // deprecated + } + + @Override + public Executor getExecutor() { + return null; + } + + @Override + public void stop() { + } + } + + // for push consumer process + private static class FetchRequestRunner implements Runnable { + + private final PullMessageConsumer messageConsumer; + private final long msgConsumeCnt; + + FetchRequestRunner(PullMessageConsumer messageConsumer, long msgConsumeCnt) { + this.messageConsumer = messageConsumer; + this.msgConsumeCnt = msgConsumeCnt; + } + + @Override + public void run() { + try { + do { + ConsumerResult result = messageConsumer.getMessage(); + if (result.isSuccess()) { + List<Message> messageList = result.getMessageList(); + if (messageList != null && !messageList.isEmpty()) { + TOTAL_COUNTER.addAndGet(messageList.size()); + } + messageConsumer.confirmConsume(result.getConfirmContext(), true); + } else { + if (!TErrCodeConstants.IGNORE_ERROR_SET.contains(result.getErrCode())) { + logger.info( + "Receive messages errorCode is {}, Error message is {}", + result.getErrCode(), + result.getErrMsg()); + if (messageConsumer.isShutdown()) { + break; + } + } + } + if (msgConsumeCnt >= 0) { + if (TOTAL_COUNTER.get() >= msgConsumeCnt) { + break; + } + } + } while (true); + } catch (TubeClientException e) { + logger.error("Create consumer failed!", e); + } + } + } + + public static void main(String[] args) { + CliConsumer cliConsumer = new CliConsumer(); + try { + boolean result = cliConsumer.parseParams(args); + if (!result) { + throw new Exception("Parse parameters failure!"); + } + cliConsumer.initTask(); + ThreadUtils.sleep(1000); + while (cliConsumer.msgCount < 0 + || TOTAL_COUNTER.get() < cliConsumer.msgCount * cliConsumer.clientCount) { + ThreadUtils.sleep(cliConsumer.printIntervalMs); + System.out.println("Required received count VS received message count = " + + (cliConsumer.msgCount * cliConsumer.clientCount) + + " : " + TOTAL_COUNTER.get()); + } + cliConsumer.shutdown(); + System.out.println("Finished, received count VS received message count = " + + (cliConsumer.msgCount * cliConsumer.clientCount) + + " : " + TOTAL_COUNTER.get()); + } catch (Throwable ex) { + ex.printStackTrace(); + logger.error(ex.getMessage()); + cliConsumer.help(); + } + + } + + +} diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java new file mode 100644 index 0000000..2734ac1 --- /dev/null +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java @@ -0,0 +1,385 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tubemq.server.tools.cli; + +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.ParseException; +import org.apache.tubemq.client.config.TubeClientConfig; +import org.apache.tubemq.client.factory.MessageSessionFactory; +import org.apache.tubemq.client.factory.TubeMultiSessionFactory; +import org.apache.tubemq.client.factory.TubeSingleSessionFactory; +import org.apache.tubemq.client.producer.MessageProducer; +import org.apache.tubemq.client.producer.MessageSentCallback; +import org.apache.tubemq.client.producer.MessageSentResult; +import org.apache.tubemq.corebase.Message; +import org.apache.tubemq.corebase.TBaseConstants; +import org.apache.tubemq.corebase.utils.MixedUtils; +import org.apache.tubemq.corebase.utils.TStringUtils; +import org.apache.tubemq.corebase.utils.ThreadUtils; +import org.apache.tubemq.server.common.fielddef.CliArgDef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This class is use to process CLI Producer process. + * + * + */ +public class CliProducer extends CliAbstractBase { + + private static final Logger logger = + LoggerFactory.getLogger(CliProducer.class); + // statistic data index + private static final AtomicLong TOTAL_COUNTER = new AtomicLong(0); + private static final AtomicLong SENT_SUCC_COUNTER = new AtomicLong(0); + private static final AtomicLong SENT_FAIL_COUNTER = new AtomicLong(0); + private static final AtomicLong SENT_EXCEPT_COUNTER = new AtomicLong(0); + // sent data content + private static byte[] sentData; + private final Map<String, TreeSet<String>> topicAndFiltersMap = new HashMap<>(); + private final List<TupleValue> topicSendRounds = new ArrayList<>(); + private final List<MessageSessionFactory> sessionFactoryList = new ArrayList<>(); + private final Map<MessageProducer, MsgSender> producerMap = new HashMap<>(); + // cli parameters + private String masterServers; + private long msgCount = TBaseConstants.META_VALUE_UNDEFINED; + private boolean useRandData = true; + private int msgDataSize = 1000; + private String payloadFilePath = null; + private String payloadDelim = null; + private long rpcTimeoutMs = TBaseConstants.META_VALUE_UNDEFINED; + private boolean reuseConn = false; + private int clientCount = 1; + private int sendThreadCnt = 100; + private long printIntervalMs = 5000; + private boolean syncProduction = false; + private boolean withoutDelay = false; + private boolean isStarted = false; + private ExecutorService sendExecutorService = null; + + + public CliProducer() { + super("tubemq-producer-test.sh"); + initCommandOptions(); + } + + /** + * Init command options + */ + protected void initCommandOptions() { + // add the cli required parameters + addCommandOption(CliArgDef.MASTERSERVER); + addCommandOption(CliArgDef.MESSAGES); + addCommandOption(CliArgDef.MSGDATASIZE); + //addCommandOption(CliArgDef.PAYLOADFILE); + //addCommandOption(CliArgDef.PAYLOADDELIM); + addCommandOption(CliArgDef.PRDTOPIC); + addCommandOption(CliArgDef.RPCTIMEOUT); + addCommandOption(CliArgDef.CONNREUSE); + addCommandOption(CliArgDef.CLIENTCOUNT); + addCommandOption(CliArgDef.OUTPUTINTERVAL); + addCommandOption(CliArgDef.SYNCPRODUCE); + addCommandOption(CliArgDef.SENDTHREADS); + addCommandOption(CliArgDef.WITHOUTDELAY); + } + + public boolean parseParams(String[] args) throws Exception { + // parse parameters and check value + CommandLine cli = parser.parse(options, args); + if (cli == null) { + throw new ParseException("Parse args failure"); + } + if (cli.hasOption(CliArgDef.VERSION.longOpt)) { + version(); + } + if (cli.hasOption(CliArgDef.HELP.longOpt)) { + help(); + } + masterServers = cli.getOptionValue(CliArgDef.MASTERSERVER.longOpt); + if (TStringUtils.isBlank(masterServers)) { + throw new Exception(CliArgDef.MASTERSERVER.longOpt + " is required!"); + } + String topicStr = cli.getOptionValue(CliArgDef.PRDTOPIC.longOpt); + if (TStringUtils.isBlank(topicStr)) { + throw new Exception(CliArgDef.PRDTOPIC.longOpt + " is required!"); + } + topicAndFiltersMap.putAll(MixedUtils.parseTopicParam(topicStr)); + if (topicAndFiltersMap.isEmpty()) { + throw new Exception("Invalid " + CliArgDef.PRDTOPIC.longOpt + " parameter value!"); + } + String msgCntStr = cli.getOptionValue(CliArgDef.MESSAGES.longOpt); + if (TStringUtils.isNotBlank(msgCntStr)) { + msgCount = Long.parseLong(msgCntStr); + } + String msgDataSizeStr = cli.getOptionValue(CliArgDef.MSGDATASIZE.longOpt); + if (TStringUtils.isNotBlank(msgDataSizeStr)) { + msgDataSize = Integer.parseInt(msgDataSizeStr); + } + String reuseConnStr = cli.getOptionValue(CliArgDef.CONNREUSE.longOpt); + if (TStringUtils.isNotBlank(reuseConnStr)) { + reuseConn = Boolean.parseBoolean(reuseConnStr); + } + String sendThreadCntStr = cli.getOptionValue(CliArgDef.SENDTHREADS.longOpt); + if (TStringUtils.isNotBlank(sendThreadCntStr)) { + int tmpThreadCnt = Integer.parseInt(sendThreadCntStr); + tmpThreadCnt = (tmpThreadCnt < 1) ? 1 : Math.min(tmpThreadCnt, 200); + sendThreadCnt = tmpThreadCnt; + } + String rpcTimeoutStr = cli.getOptionValue(CliArgDef.RPCTIMEOUT.longOpt); + if (TStringUtils.isNotBlank(rpcTimeoutStr)) { + rpcTimeoutMs = Long.parseLong(rpcTimeoutStr); + } + String clientCntStr = cli.getOptionValue(CliArgDef.CLIENTCOUNT.longOpt); + if (TStringUtils.isNotBlank(clientCntStr)) { + clientCount = Integer.parseInt(clientCntStr); + } + String printIntMsStr = cli.getOptionValue(CliArgDef.OUTPUTINTERVAL.longOpt); + if (TStringUtils.isNotBlank(printIntMsStr)) { + printIntervalMs = Long.parseLong(printIntMsStr); + if (printIntervalMs < 5000) { + throw new Exception("Invalid " + + CliArgDef.OUTPUTINTERVAL.longOpt + + " parameter value!"); + } + } + if (cli.hasOption(CliArgDef.SYNCPRODUCE.longOpt)) { + syncProduction = true; + } + if (cli.hasOption(CliArgDef.WITHOUTDELAY.longOpt)) { + withoutDelay = true; + } + return true; + } + // initial tubemq client order by caller required + public void initTask() throws Exception { + // initial sent data + sentData = MixedUtils.buildTestData(msgDataSize); + // initial client configure + TubeClientConfig clientConfig = new TubeClientConfig(masterServers); + clientConfig.setRpcTimeoutMs(rpcTimeoutMs); + // initial topic send round + for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) { + if (entry.getValue().isEmpty()) { + topicSendRounds.add(new TupleValue(entry.getKey())); + } else { + for (String filter : entry.getValue()) { + topicSendRounds.add(new TupleValue(entry.getKey(), filter)); + } + } + } + // initial send thread service + sendExecutorService = + Executors.newFixedThreadPool(sendThreadCnt, new ThreadFactory() { + @Override + public Thread newThread(Runnable runnable) { + return new Thread(runnable, "sender_" + producerMap.size()); + } + }); + // initial producer object + if (reuseConn) { + // if resue connection, use TubeSingleSessionFactory class + MessageSessionFactory msgSessionFactory = + new TubeSingleSessionFactory(clientConfig); + this.sessionFactoryList.add(msgSessionFactory); + for (int i = 0; i < clientCount; i++) { + MessageProducer producer = msgSessionFactory.createProducer(); + producer.publish(topicAndFiltersMap.keySet()); + producerMap.put(producer, new MsgSender(producer)); + // send send task + sendExecutorService.submit(producerMap.get(producer)); + } + } else { + for (int i = 0; i < clientCount; i++) { + // if not resue connection, use TubeMultiSessionFactory class + MessageSessionFactory msgSessionFactory = + new TubeMultiSessionFactory(clientConfig); + this.sessionFactoryList.add(msgSessionFactory); + MessageProducer producer = msgSessionFactory.createProducer(); + producer.publish(topicAndFiltersMap.keySet()); + producerMap.put(producer, new MsgSender(producer)); + // send send task + sendExecutorService.submit(producerMap.get(producer)); + } + } + isStarted = true; + } + + public void shutdown() throws Throwable { + // stop process + if (sendExecutorService != null) { + sendExecutorService.shutdownNow(); + } + ThreadUtils.sleep(20); + for (MessageProducer producer : producerMap.keySet()) { + producer.shutdown(); + } + for (MessageSessionFactory messageSessionFactory : sessionFactoryList) { + messageSessionFactory.shutdown(); + } + } + + // process message send + public class MsgSender implements Runnable { + + private final MessageProducer producer; + + public MsgSender(MessageProducer producer) { + this.producer = producer; + } + + @Override + public void run() { + int topicAndCondCnt = topicSendRounds.size(); + SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm"); + long sentCount = 0; + int roundIndex = 0; + while (msgCount < 0 || sentCount < msgCount) { + roundIndex = (int) (sentCount++ % topicAndCondCnt); + try { + long millis = System.currentTimeMillis(); + TupleValue tupleValue = topicSendRounds.get(roundIndex); + Message message = new Message(tupleValue.topic, sentData); + if (tupleValue.filter != null) { + // if include filter, add filter item + message.putSystemHeader(tupleValue.filter, sdf.format(new Date(millis))); + } + // use sync or async process + if (syncProduction) { + MessageSentResult procResult = + producer.sendMessage(message); + TOTAL_COUNTER.incrementAndGet(); + if (procResult.isSuccess()) { + SENT_SUCC_COUNTER.incrementAndGet(); + } else { + SENT_FAIL_COUNTER.incrementAndGet(); + } + } else { + producer.sendMessage(message, new DefaultSendCallback()); + } + } catch (Throwable e1) { + TOTAL_COUNTER.incrementAndGet(); + SENT_EXCEPT_COUNTER.incrementAndGet(); + logger.error("sendMessage exception: ", e1); + } + // Limit sending flow control to avoid frequent errors + // caused by too many inflight messages being sent + if (!withoutDelay) { + if (sentCount % 5000 == 0) { + ThreadUtils.sleep(3000); + } else if (sentCount % 4000 == 0) { + ThreadUtils.sleep(2000); + } else if (sentCount % 2000 == 0) { + ThreadUtils.sleep(800); + } else if (sentCount % 1000 == 0) { + ThreadUtils.sleep(400); + } + } + } + // finished, close client + try { + producer.shutdown(); + } catch (Throwable e) { + logger.error("producer shutdown error: ", e); + } + } + } + + private class DefaultSendCallback implements MessageSentCallback { + @Override + public void onMessageSent(MessageSentResult result) { + TOTAL_COUNTER.incrementAndGet(); + if (result.isSuccess()) { + SENT_SUCC_COUNTER.incrementAndGet(); + } else { + SENT_FAIL_COUNTER.incrementAndGet(); + } + } + + @Override + public void onException(Throwable e) { + TOTAL_COUNTER.incrementAndGet(); + SENT_EXCEPT_COUNTER.incrementAndGet(); + logger.error("Send message error!", e); + } + } + + private static class TupleValue { + public String topic = null; + public String filter = null; + + public TupleValue(String topic) { + this.topic = topic; + } + + public TupleValue(String topic, String filter) { + this.topic = topic; + this.filter = filter; + } + + } + + public static void main(String[] args) { + CliProducer cliProducer = new CliProducer(); + try { + boolean result = cliProducer.parseParams(args); + if (!result) { + throw new Exception("Parse parameters failure!"); + } + cliProducer.initTask(); + ThreadUtils.sleep(1000); + while (cliProducer.msgCount < 0 + || TOTAL_COUNTER.get() < cliProducer.msgCount * cliProducer.clientCount) { + ThreadUtils.sleep(cliProducer.printIntervalMs); + System.out.println("Required send count VS sent message count = " + + (cliProducer.msgCount * cliProducer.clientCount) + + " : " + TOTAL_COUNTER.get() + + " (" + SENT_SUCC_COUNTER.get() + + ":" + SENT_FAIL_COUNTER.get() + + ":" + SENT_EXCEPT_COUNTER.get() + + ")"); + } + cliProducer.shutdown(); + System.out.println("Finished, required send count VS sent message count = " + + (cliProducer.msgCount * cliProducer.clientCount) + + " : " + TOTAL_COUNTER.get() + + " (" + SENT_SUCC_COUNTER.get() + + ":" + SENT_FAIL_COUNTER.get() + + ":" + SENT_EXCEPT_COUNTER.get() + + ")"); + } catch (Throwable ex) { + ex.printStackTrace(); + logger.error(ex.getMessage()); + cliProducer.help(); + } + + } + + +}
