liangyepianzhou commented on a change in pull request #11933:
URL: https://github.com/apache/pulsar/pull/11933#discussion_r716157771
##########
File path:
pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
##########
@@ -340,9 +344,123 @@ public static void main(String[] args) throws Exception {
if (isNotBlank(arguments.listenerName)) {
clientBuilder.listenerName(arguments.listenerName);
}
-
PulsarClient pulsarClient = clientBuilder.build();
+ AtomicReference<Transaction> atomicReference =
buildTransaction(pulsarClient, arguments.isEnableTransaction,
+ arguments.transactionTimeout);
+
+ AtomicLong messageAckedCount = new AtomicLong();
+ Semaphore messageReceiveLimiter = new
Semaphore(arguments.numMessagesPerTransaction);
+ Thread thread = Thread.currentThread();
+ MessageListener<ByteBuffer> listener = (consumer, msg) -> {
+ if(arguments.isEnableTransaction){
+ try {
+ messageReceiveLimiter.acquire();
+ }catch (InterruptedException e){
+ log.error("Got error: ", e);
+ }
+ }
+ if (arguments.testTime > 0) {
+ if (System.nanoTime() > testEndTime) {
+ log.info("------------------- DONE
-----------------------");
+ printAggregatedStats();
+ PerfClientUtils.exit(0);
+ thread.interrupt();
+ }
+ }
+ if(arguments.totalNumTxn > 0) {
+ if (totalEndTxnOpFailNum.sum() +
totalEndTxnOpSuccessNum.sum() >= arguments.totalNumTxn) {
+ log.info("------------------- DONE
-----------------------");
+ printAggregatedStats();
+ PerfClientUtils.exit(0);
+ thread.interrupt();
+ }
+ }
+ messagesReceived.increment();
+ bytesReceived.add(msg.size());
+
+ totalMessagesReceived.increment();
+ totalBytesReceived.add(msg.size());
+
+ if (limiter != null) {
+ limiter.acquire();
+ }
+
+ long latencyMillis = System.currentTimeMillis() -
msg.getPublishTime();
+ if (latencyMillis >= 0) {
+ recorder.recordValue(latencyMillis);
+ cumulativeRecorder.recordValue(latencyMillis);
+ }
+ if (arguments.isEnableTransaction) {
+ consumer.acknowledgeAsync(msg.getMessageId(),
atomicReference.get()).thenRun(() -> {
+ totalMessageAck.increment();
+ messageAck.increment();
+ }).exceptionally(throwable ->{
+ log.error("Ack message {} failed with exception", msg,
throwable);
+ totalMessageAckFailed.increment();
+ return null;
+ });
+ } else {
+ consumer.acknowledgeAsync(msg).thenRun(()->{
+ totalMessageAck.increment();
+ messageAck.increment();
+ }
+ ).exceptionally(throwable ->{
+ log.error("Ack message {} failed with
exception", msg, throwable);
+ totalMessageAckFailed.increment();
+ return null;
+ }
+ );
+ }
+ if(arguments.poolMessages) {
+ msg.release();
+ }
+ if (arguments.isEnableTransaction
Review comment:
It is not good to write the Ack of the message and the commit of the
transaction together. In this way if is too far away from else, it feels that
the segmentation is not very good
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]