This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1510cf627d7e90c656d908296ad2f2cd7a66fa39 Author: Xiangying Meng <[email protected]> AuthorDate: Mon Dec 13 13:09:18 2021 +0800 [Transaction] Fix performance (#13253) ### Motivation There is one omission and several irregular log formats. ### Modification 1. messagesSent.increment() 2. log format (cherry picked from commit c531c1ca0d74e0c771786ffb9930611c82a0aefe) --- .../org/apache/pulsar/testclient/PerformanceConsumer.java | 4 ++-- .../org/apache/pulsar/testclient/PerformanceProducer.java | 3 ++- .../apache/pulsar/testclient/PerformanceTransaction.java | 14 +++++++------- .../pulsar/testclient/PerformanceTransactionTest.java | 2 +- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java index 134ffdd..9c15a0e 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java @@ -573,7 +573,7 @@ public class PerformanceConsumer { dec.format(rateAck)); } log.info( - "Throughput received: {} msg --- {} msg/s -- {} Mbit/s " + "Throughput received: {} msg --- {} msg/s --- {} Mbit/s " + "--- Latency: mean: {} ms - med: {} " + "- 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}", intFormat.format(total), @@ -622,7 +622,7 @@ public class PerformanceConsumer { } log.info( "Aggregated throughput stats --- {} records received --- {} msg/s --- {} Mbit/s" - + "--- AckRate: {} msg/s --- ack failed {} msg", + + " --- AckRate: {} msg/s --- ack failed {} msg", totalMessagesReceived.sum(), dec.format(rate), dec.format(throughput), diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index cfd5568..20eb8f3 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -509,7 +509,7 @@ public class PerformanceProducer { totalTxnOpSuccess = totalEndTxnOpSuccessNum.sum(); totalTxnOpFail = totalEndTxnOpFailNum.sum(); rateOpenTxn = numTxnOpSuccess.sumThenReset() / elapsed; - log.info("--- Transaction : {} transaction end successfully ---{} transaction end failed " + log.info("--- Transaction : {} transaction end successfully --- {} transaction end failed " + "--- {} Txn/s", totalTxnOpSuccess, totalTxnOpFail, totalFormat.format(rateOpenTxn)); } @@ -728,6 +728,7 @@ public class PerformanceProducer { PulsarClient pulsarClient = client; messageBuilder.sendAsync().thenRun(() -> { bytesSent.add(payloadData.length); + messagesSent.increment(); totalMessagesSent.increment(); totalBytesSent.add(payloadData.length); diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java index 49d441e..5127f85 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java @@ -533,8 +533,8 @@ public class PerformanceTransaction { ? "Throughput transaction: {} transaction executes --- {} transaction/s" : "Throughput task: {} task executes --- {} task/s"; log.info( - txnOrTaskLog + " ---send Latency: mean: {} ms - med: {} " - + "- 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}" + "---ack Latency: " + txnOrTaskLog + " --- send Latency: mean: {} ms - med: {} " + + "- 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}" + " --- ack Latency: " + "mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}", intFormat.format(total), dec.format(rate), @@ -582,8 +582,8 @@ public class PerformanceTransaction { "Aggregated throughput stats --- {} transaction executed --- {} transaction/s " + " --- {} transaction open successfully --- {} transaction open failed" + " --- {} transaction end successfully --- {} transaction end failed" - + "--- {} message ack failed --- {} message send failed" - + "--- {} message ack success --- {} message send success ", + + " --- {} message ack failed --- {} message send failed" + + " --- {} message ack success --- {} message send success ", total, dec.format(rate), numTransactionOpenSuccess, @@ -606,9 +606,9 @@ public class PerformanceTransaction { long numMessageSendFailed = numMessagesSendFailed.sum(); long numMessageSendSuccess = numMessagesSendSuccess.sum(); log.info( - "Aggregated throughput stats --- {} task executed --- {} task/s " - + "--- {} message ack failed --- {} message send failed" - + "--- {} message ack success --- {} message send success ", + "Aggregated throughput stats --- {} task executed --- {} task/s" + + " --- {} message ack failed --- {} message send failed" + + " --- {} message ack success --- {} message send success", total, totalFormat.format(rate), numMessageAckFailed, diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java index e04ea04..c5e62f7 100644 --- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java +++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java @@ -212,7 +212,7 @@ public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest { .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); for (int i = 0; i < 505; i++) { - producer.newMessage().send(); + producer.newMessage().value("messages for test transaction consumer".getBytes()).send(); } Thread thread = new Thread(() -> { try {
