This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1510cf627d7e90c656d908296ad2f2cd7a66fa39
Author: Xiangying Meng <[email protected]>
AuthorDate: Mon Dec 13 13:09:18 2021 +0800

    [Transaction] Fix performance (#13253)
    
    ### Motivation
    There is one omission and several irregular log formats.
    ### Modification
    1. messagesSent.increment()
    2. log format
    
    (cherry picked from commit c531c1ca0d74e0c771786ffb9930611c82a0aefe)
---
 .../org/apache/pulsar/testclient/PerformanceConsumer.java  |  4 ++--
 .../org/apache/pulsar/testclient/PerformanceProducer.java  |  3 ++-
 .../apache/pulsar/testclient/PerformanceTransaction.java   | 14 +++++++-------
 .../pulsar/testclient/PerformanceTransactionTest.java      |  2 +-
 4 files changed, 12 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 134ffdd..9c15a0e 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -573,7 +573,7 @@ public class PerformanceConsumer {
                         dec.format(rateAck));
             }
             log.info(
-                    "Throughput received: {} msg --- {}  msg/s -- {} Mbit/s  "
+                    "Throughput received: {} msg --- {}  msg/s --- {} Mbit/s  "
                             + "--- Latency: mean: {} ms - med: {} "
                             + "- 95pct: {} - 99pct: {} - 99.9pct: {} - 
99.99pct: {} - Max: {}",
                     intFormat.format(total),
@@ -622,7 +622,7 @@ public class PerformanceConsumer {
         }
         log.info(
             "Aggregated throughput stats --- {} records received --- {} msg/s 
--- {} Mbit/s"
-                 + "--- AckRate: {}  msg/s --- ack failed {} msg",
+                 + " --- AckRate: {}  msg/s --- ack failed {} msg",
             totalMessagesReceived.sum(),
             dec.format(rate),
             dec.format(throughput),
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index cfd5568..20eb8f3 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -509,7 +509,7 @@ public class PerformanceProducer {
                 totalTxnOpSuccess = totalEndTxnOpSuccessNum.sum();
                 totalTxnOpFail = totalEndTxnOpFailNum.sum();
                 rateOpenTxn = numTxnOpSuccess.sumThenReset() / elapsed;
-                log.info("--- Transaction : {} transaction end successfully 
---{} transaction end failed "
+                log.info("--- Transaction : {} transaction end successfully 
--- {} transaction end failed "
                                 + "--- {} Txn/s",
                         totalTxnOpSuccess, totalTxnOpFail, 
totalFormat.format(rateOpenTxn));
             }
@@ -728,6 +728,7 @@ public class PerformanceProducer {
                     PulsarClient pulsarClient = client;
                     messageBuilder.sendAsync().thenRun(() -> {
                         bytesSent.add(payloadData.length);
+                        messagesSent.increment();
 
                         totalMessagesSent.increment();
                         totalBytesSent.add(payloadData.length);
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
index 49d441e..5127f85 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
@@ -533,8 +533,8 @@ public class PerformanceTransaction {
                     ? "Throughput transaction: {} transaction executes --- {} 
transaction/s"
                     : "Throughput task: {} task executes --- {} task/s";
             log.info(
-                    txnOrTaskLog + "  ---send Latency: mean: {} ms - med: {} "
-                            + "- 95pct: {} - 99pct: {} - 99.9pct: {} - 
99.99pct: {} - Max: {}" + "---ack Latency: "
+                    txnOrTaskLog + "  --- send Latency: mean: {} ms - med: {} "
+                            + "- 95pct: {} - 99pct: {} - 99.9pct: {} - 
99.99pct: {} - Max: {}" + " --- ack Latency: "
                             + "mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 
99.9pct: {} - 99.99pct: {} - Max: {}",
                     intFormat.format(total),
                     dec.format(rate),
@@ -582,8 +582,8 @@ public class PerformanceTransaction {
                 "Aggregated throughput stats --- {} transaction executed --- 
{} transaction/s "
                         + " --- {} transaction open successfully --- {} 
transaction open failed"
                         + " --- {} transaction end successfully --- {} 
transaction end failed"
-                        + "--- {} message ack failed --- {} message send 
failed"
-                        + "--- {} message ack success --- {} message send 
success ",
+                        + " --- {} message ack failed --- {} message send 
failed"
+                        + " --- {} message ack success --- {} message send 
success ",
                 total,
                 dec.format(rate),
                 numTransactionOpenSuccess,
@@ -606,9 +606,9 @@ public class PerformanceTransaction {
         long numMessageSendFailed = numMessagesSendFailed.sum();
         long numMessageSendSuccess = numMessagesSendSuccess.sum();
         log.info(
-                "Aggregated throughput stats --- {} task executed --- {} 
task/s "
-                        + "--- {} message ack failed --- {} message send 
failed"
-                        + "--- {} message ack success --- {} message send 
success ",
+                "Aggregated throughput stats --- {} task executed --- {} 
task/s"
+                        + " --- {} message ack failed --- {} message send 
failed"
+                        + " --- {} message ack success --- {} message send 
success",
                 total,
                 totalFormat.format(rate),
                 numMessageAckFailed,
diff --git 
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
 
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
index e04ea04..c5e62f7 100644
--- 
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
+++ 
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
@@ -212,7 +212,7 @@ public class PerformanceTransactionTest extends 
MockedPulsarServiceBaseTest {
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
         for (int i = 0; i < 505; i++) {
-            producer.newMessage().send();
+            producer.newMessage().value("messages for test transaction 
consumer".getBytes()).send();
         }
         Thread thread = new Thread(() -> {
             try {

Reply via email to