This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fa549f6db79f035a6ec74174dbf00aaedf175ec6
Author: Xiangying Meng <[email protected]>
AuthorDate: Fri Jan 28 04:10:12 2022 +0800

    [Transaction] Resolve the performance bottleneck of TransactionBufferHandle 
(#13988)
    
    ### Motivation
    Previously, synchronization locks were frequently used in 
TransactionBufferHandleImpl in order to achieve request timeouts. For this 
reason, the performance of TC has been affected.
    ### Modification
    Each request uses a Timeout to do a timeout check
    
    (cherry picked from commit f48b53d33ee1be9b8436593119dfbce38be2c81f)
---
 .../buffer/impl/TransactionBufferHandlerImpl.java  | 55 ++++++----------------
 1 file changed, 15 insertions(+), 40 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
index 54f77a3..cb74bf3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
@@ -25,9 +25,6 @@ import io.netty.buffer.ByteBuf;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Recycler;
 import io.netty.util.ReferenceCountUtil;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
@@ -49,12 +46,11 @@ import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.protocol.Commands;
 
 @Slf4j
-public class TransactionBufferHandlerImpl implements TransactionBufferHandler, 
TimerTask {
+public class TransactionBufferHandlerImpl implements TransactionBufferHandler {
 
     private final ConcurrentSkipListMap<Long, OpRequestSend> pendingRequests;
     private final AtomicLong requestIdGenerator = new AtomicLong();
     private final long operationTimeoutInMills;
-    private Timeout requestTimeout;
     private final HashedWheelTimer timer;
     private final Semaphore semaphore;
     private final boolean blockIfReachMaxPendingOps;
@@ -84,11 +80,10 @@ public class TransactionBufferHandlerImpl implements 
TransactionBufferHandler, T
         this.semaphore = new Semaphore(10000);
         this.blockIfReachMaxPendingOps = true;
         this.timer = timer;
-        this.requestTimeout = timer.newTimeout(this, operationTimeoutInMills, 
TimeUnit.MILLISECONDS);
     }
 
     @Override
-    public synchronized CompletableFuture<TxnID> endTxnOnTopic(String topic, 
long txnIdMostBits, long txnIdLeastBits,
+    public CompletableFuture<TxnID> endTxnOnTopic(String topic, long 
txnIdMostBits, long txnIdLeastBits,
                                                   TxnAction action, long 
lowWaterMark) {
         if (log.isDebugEnabled()) {
             log.debug("[{}] endTxnOnTopic txnId: [{}], txnAction: [{}]",
@@ -105,7 +100,7 @@ public class TransactionBufferHandlerImpl implements 
TransactionBufferHandler, T
     }
 
     @Override
-    public synchronized CompletableFuture<TxnID> endTxnOnSubscription(String 
topic, String subscription,
+    public CompletableFuture<TxnID> endTxnOnSubscription(String topic, String 
subscription,
                                                                       long 
txnIdMostBits, long txnIdLeastBits,
                                                                       
TxnAction action, long lowWaterMark) {
         if (log.isDebugEnabled()) {
@@ -129,10 +124,16 @@ public class TransactionBufferHandlerImpl implements 
TransactionBufferHandler, T
                 if (throwable == null) {
                     if (clientCnx.ctx().channel().isActive()) {
                         
clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this);
-                        synchronized (TransactionBufferHandlerImpl.this) {
-                            pendingRequests.put(requestId, op);
-                            cmd.retain();
-                        }
+                        pendingRequests.put(requestId, op);
+                        timer.newTimeout(timeout -> {
+                            OpRequestSend peek = 
pendingRequests.remove(requestId);
+                            if (peek != null && !peek.cb.isDone() && 
!peek.cb.isCompletedExceptionally()) {
+                                peek.cb.completeExceptionally(new 
TransactionBufferClientException
+                                        .RequestTimeoutException());
+                                onResponse(peek);
+                            }
+                        }, operationTimeoutInMills, TimeUnit.MILLISECONDS);
+                        cmd.retain();
                         clientCnx.ctx().writeAndFlush(cmd, 
clientCnx.ctx().voidPromise());
                     } else {
                         cache.invalidate(topic);
@@ -158,7 +159,7 @@ public class TransactionBufferHandlerImpl implements 
TransactionBufferHandler, T
     }
 
     @Override
-    public synchronized void handleEndTxnOnTopicResponse(long requestId, 
CommandEndTxnOnPartitionResponse response) {
+    public void handleEndTxnOnTopicResponse(long requestId, 
CommandEndTxnOnPartitionResponse response) {
         OpRequestSend op = pendingRequests.remove(requestId);
         if (op == null) {
             if (log.isDebugEnabled()) {
@@ -183,7 +184,7 @@ public class TransactionBufferHandlerImpl implements 
TransactionBufferHandler, T
     }
 
     @Override
-    public synchronized void handleEndTxnOnSubscriptionResponse(long requestId,
+    public void handleEndTxnOnSubscriptionResponse(long requestId,
                                                    
CommandEndTxnOnSubscriptionResponse response) {
         OpRequestSend op = pendingRequests.remove(requestId);
         if (op == null) {
@@ -227,32 +228,6 @@ public class TransactionBufferHandlerImpl implements 
TransactionBufferHandler, T
         return true;
     }
 
-    public synchronized void run(Timeout timeout) throws Exception {
-        if (timeout.isCancelled()) {
-            return;
-        }
-        long timeToWaitMs;
-        OpRequestSend peeked;
-        Map.Entry<Long, OpRequestSend> firstEntry = 
pendingRequests.firstEntry();
-        peeked = firstEntry == null ? null : firstEntry.getValue();
-        while (peeked != null && peeked.createdAt + operationTimeoutInMills - 
System.currentTimeMillis() <= 0) {
-            if (!peeked.cb.isDone()) {
-                peeked.cb.completeExceptionally(new 
TransactionBufferClientException.RequestTimeoutException());
-                onResponse(peeked);
-            } else {
-                break;
-            }
-            firstEntry = pendingRequests.firstEntry();
-            pendingRequests.remove(pendingRequests.firstKey());
-            peeked = firstEntry == null ? null : firstEntry.getValue();
-        }
-        if (peeked == null) {
-            timeToWaitMs = operationTimeoutInMills;
-        } else {
-            timeToWaitMs = (peeked.createdAt + operationTimeoutInMills) - 
System.currentTimeMillis();
-        }
-        requestTimeout = timer.newTimeout(this, timeToWaitMs, 
TimeUnit.MILLISECONDS);
-    }
 
     void onResponse(OpRequestSend op) {
         ReferenceCountUtil.safeRelease(op.byteBuf);

Reply via email to