This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4c68404 [INLONG-2151][Improve] Add time and sort statistics by topic
(#2152)
4c68404 is described below
commit 4c6840481936e4c4a071116368a45d084e7c4683
Author: gosonzhang <[email protected]>
AuthorDate: Thu Jan 13 12:14:17 2022 +0800
[INLONG-2151][Improve] Add time and sort statistics by topic (#2152)
---
.../tubemq/server/tools/cli/CliConsumer.java | 38 +++++++++++++++++++---
.../tubemq/server/tools/cli/CliProducer.java | 11 +++++--
2 files changed, 42 insertions(+), 7 deletions(-)
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliConsumer.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliConsumer.java
index b60cf83..8417863 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliConsumer.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliConsumer.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine;
@@ -59,6 +60,9 @@ public class CliConsumer extends CliAbstractBase {
LoggerFactory.getLogger(CliConsumer.class);
// statistic data index
private static final AtomicLong TOTAL_COUNTER = new AtomicLong(0);
+ private static final ConcurrentHashMap<String, AtomicLong> TOPIC_COUNT_MAP
=
+ new ConcurrentHashMap();
+ private long startTime = System.currentTimeMillis();
// sent data content
private final Map<String, TreeSet<String>> topicAndFiltersMap = new
HashMap<>();
private final List<MessageSessionFactory> sessionFactoryList = new
ArrayList<>();
@@ -188,6 +192,7 @@ public class CliConsumer extends CliAbstractBase {
consumerConfig.setRpcTimeoutMs(rpcTimeoutMs);
consumerConfig.setPushFetchThreadCnt(fetchThreadCnt);
consumerConfig.setConsumePosition(consumePos);
+ startTime = System.currentTimeMillis();
// initial consumer object
if (isPushConsume) {
DefaultMessageListener msgListener =
@@ -203,6 +208,7 @@ public class CliConsumer extends CliAbstractBase {
for (Map.Entry<String, TreeSet<String>> entry
: topicAndFiltersMap.entrySet()) {
consumer1.subscribe(entry.getKey(), entry.getValue(),
msgListener);
+ TOPIC_COUNT_MAP.put(entry.getKey(), new AtomicLong(0));
}
consumer1.completeSubscribe();
consumerMap.put(consumer1, null);
@@ -217,6 +223,7 @@ public class CliConsumer extends CliAbstractBase {
for (Map.Entry<String, TreeSet<String>> entry
: topicAndFiltersMap.entrySet()) {
consumer1.subscribe(entry.getKey(), entry.getValue(),
msgListener);
+ TOPIC_COUNT_MAP.put(entry.getKey(), new AtomicLong(0));
}
consumer1.completeSubscribe();
consumerMap.put(consumer1, null);
@@ -233,6 +240,7 @@ public class CliConsumer extends CliAbstractBase {
for (Map.Entry<String, TreeSet<String>> entry
: topicAndFiltersMap.entrySet()) {
consumer2.subscribe(entry.getKey(), entry.getValue());
+ TOPIC_COUNT_MAP.put(entry.getKey(), new AtomicLong(0));
}
consumer2.completeSubscribe();
consumerMap.put(consumer2,
@@ -248,6 +256,7 @@ public class CliConsumer extends CliAbstractBase {
for (Map.Entry<String, TreeSet<String>> entry
: topicAndFiltersMap.entrySet()) {
consumer2.subscribe(entry.getKey(), entry.getValue());
+ TOPIC_COUNT_MAP.put(entry.getKey(), new AtomicLong(0));
}
consumer2.completeSubscribe();
consumerMap.put(consumer2,
@@ -282,7 +291,6 @@ public class CliConsumer extends CliAbstractBase {
thread.start();
}
}
-
}
// for push consumer callback process
@@ -294,7 +302,11 @@ public class CliConsumer extends CliAbstractBase {
@Override
public void receiveMessages(PeerInfo peerInfo, List<Message> messages)
{
if (messages != null && !messages.isEmpty()) {
- TOTAL_COUNTER.addAndGet(messages.size());
+ int msgCnt = messages.size();
+ Message message = messages.get(0);
+ TOTAL_COUNTER.addAndGet(msgCnt);
+ AtomicLong accCount = TOPIC_COUNT_MAP.get(message.getTopic());
+ accCount.addAndGet(msgCnt);
}
}
@@ -327,7 +339,11 @@ public class CliConsumer extends CliAbstractBase {
if (result.isSuccess()) {
List<Message> messageList = result.getMessageList();
if (messageList != null && !messageList.isEmpty()) {
- TOTAL_COUNTER.addAndGet(messageList.size());
+ int msgCnt = messageList.size();
+ TOTAL_COUNTER.addAndGet(msgCnt);
+ AtomicLong accCount =
+ TOPIC_COUNT_MAP.get(result.getTopicName());
+ accCount.addAndGet(msgCnt);
}
messageConsumer.confirmConsume(result.getConfirmContext(), true);
} else {
@@ -365,14 +381,26 @@ public class CliConsumer extends CliAbstractBase {
while (cliConsumer.msgCount < 0
|| TOTAL_COUNTER.get() < cliConsumer.msgCount *
cliConsumer.clientCount) {
ThreadUtils.sleep(cliConsumer.printIntervalMs);
- System.out.println("Required received count VS received
message count = "
+ System.out.println("Continue, cost time: "
+ + (System.currentTimeMillis() - cliConsumer.startTime)
+ + " ms, required count VS received count = "
+ (cliConsumer.msgCount * cliConsumer.clientCount)
+ " : " + TOTAL_COUNTER.get());
+ for (Map.Entry<String, AtomicLong> entry :
TOPIC_COUNT_MAP.entrySet()) {
+ System.out.println("Topic Name = " + entry.getKey()
+ + ", count=" + entry.getValue().get());
+ }
}
cliConsumer.shutdown();
- System.out.println("Finished, received count VS received message
count = "
+ System.out.println("Finished, cost time: "
+ + (System.currentTimeMillis() - cliConsumer.startTime)
+ + " ms, required count VS received count = "
+ (cliConsumer.msgCount * cliConsumer.clientCount)
+ " : " + TOTAL_COUNTER.get());
+ for (Map.Entry<String, AtomicLong> entry :
TOPIC_COUNT_MAP.entrySet()) {
+ System.out.println("Topic Name = " + entry.getKey()
+ + ", count=" + entry.getValue().get());
+ }
} catch (Throwable ex) {
ex.printStackTrace();
logger.error(ex.getMessage());
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java
index 0ac63a5..1a706f1 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java
@@ -55,6 +55,8 @@ public class CliProducer extends CliAbstractBase {
private static final Logger logger =
LoggerFactory.getLogger(CliProducer.class);
+ // start time
+ private long startTime = System.currentTimeMillis();
// statistic data index
private static final AtomicLong TOTAL_COUNTER = new AtomicLong(0);
private static final AtomicLong SENT_SUCC_COUNTER = new AtomicLong(0);
@@ -187,6 +189,7 @@ public class CliProducer extends CliAbstractBase {
sentData = MixedUtils.buildTestData(msgDataSize);
// initial topic send round
topicSendRounds =
MixedUtils.buildTopicFilterTupleList(topicAndFiltersMap);
+ startTime = System.currentTimeMillis();
// initial send thread service
sendExecutorService =
Executors.newFixedThreadPool(sendThreadCnt, new
ThreadFactory() {
@@ -323,7 +326,9 @@ public class CliProducer extends CliAbstractBase {
while (cliProducer.msgCount < 0
|| TOTAL_COUNTER.get() < cliProducer.msgCount *
cliProducer.clientCount) {
ThreadUtils.sleep(cliProducer.printIntervalMs);
- System.out.println("Required send count VS sent message count
= "
+ System.out.println("Continue, cost time: "
+ + (System.currentTimeMillis() - cliProducer.startTime)
+ + "ms, required count VS sent count = "
+ (cliProducer.msgCount * cliProducer.clientCount)
+ " : " + TOTAL_COUNTER.get()
+ " (" + SENT_SUCC_COUNTER.get()
@@ -332,7 +337,9 @@ public class CliProducer extends CliAbstractBase {
+ ")");
}
cliProducer.shutdown();
- System.out.println("Finished, required send count VS sent message
count = "
+ System.out.println("Finished, cost time: "
+ + (System.currentTimeMillis() - cliProducer.startTime)
+ + "ms, required count VS sent count = "
+ (cliProducer.msgCount * cliProducer.clientCount)
+ " : " + TOTAL_COUNTER.get()
+ " (" + SENT_SUCC_COUNTER.get()