liangyepianzhou commented on a change in pull request #11933:
URL: https://github.com/apache/pulsar/pull/11933#discussion_r716157771



##########
File path: 
pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
##########
@@ -340,9 +344,123 @@ public static void main(String[] args) throws Exception {
         if (isNotBlank(arguments.listenerName)) {
             clientBuilder.listenerName(arguments.listenerName);
         }
-
         PulsarClient pulsarClient = clientBuilder.build();
 
+        AtomicReference<Transaction> atomicReference = 
buildTransaction(pulsarClient, arguments.isEnableTransaction,
+                arguments.transactionTimeout);
+
+        AtomicLong messageAckedCount = new AtomicLong();
+        Semaphore messageReceiveLimiter = new 
Semaphore(arguments.numMessagesPerTransaction);
+        Thread thread = Thread.currentThread();
+        MessageListener<ByteBuffer> listener = (consumer, msg) -> {
+                if(arguments.isEnableTransaction){
+                    try {
+                        messageReceiveLimiter.acquire();
+                    }catch (InterruptedException e){
+                        log.error("Got error: ", e);
+                    }
+                    }
+                if (arguments.testTime > 0) {
+                    if (System.nanoTime() > testEndTime) {
+                        log.info("------------------- DONE 
-----------------------");
+                        printAggregatedStats();
+                        PerfClientUtils.exit(0);
+                        thread.interrupt();
+                    }
+                }
+                if(arguments.totalNumTxn > 0) {
+                    if (totalEndTxnOpFailNum.sum() + 
totalEndTxnOpSuccessNum.sum() >= arguments.totalNumTxn) {
+                        log.info("------------------- DONE 
-----------------------");
+                        printAggregatedStats();
+                        PerfClientUtils.exit(0);
+                        thread.interrupt();
+                    }
+                }
+                messagesReceived.increment();
+                bytesReceived.add(msg.size());
+
+                totalMessagesReceived.increment();
+                totalBytesReceived.add(msg.size());
+
+                if (limiter != null) {
+                    limiter.acquire();
+                }
+
+                long latencyMillis = System.currentTimeMillis() - 
msg.getPublishTime();
+                if (latencyMillis >= 0) {
+                    recorder.recordValue(latencyMillis);
+                    cumulativeRecorder.recordValue(latencyMillis);
+                }
+                if (arguments.isEnableTransaction) {
+                    consumer.acknowledgeAsync(msg.getMessageId(), 
atomicReference.get()).thenRun(() -> {
+                        totalMessageAck.increment();
+                        messageAck.increment();
+                    }).exceptionally(throwable ->{
+                        log.error("Ack message {} failed with exception", msg, 
throwable);
+                        totalMessageAckFailed.increment();
+                        return null;
+                    });
+                } else {
+                    consumer.acknowledgeAsync(msg).thenRun(()->{
+                        totalMessageAck.increment();
+                        messageAck.increment();
+                    }
+                    ).exceptionally(throwable ->{
+                                log.error("Ack message {} failed with 
exception", msg, throwable);
+                                totalMessageAckFailed.increment();
+                                return null;
+                            }
+                    );
+                }
+                if(arguments.poolMessages) {
+                    msg.release();
+                }
+                if (arguments.isEnableTransaction

Review comment:
       It is not good to write the Ack of the message and the commit of the 
transaction together. In this way if is too far away from else, it feels that 
the segmentation is not very good




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to