This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fa549f6db79f035a6ec74174dbf00aaedf175ec6 Author: Xiangying Meng <[email protected]> AuthorDate: Fri Jan 28 04:10:12 2022 +0800 [Transaction] Resolve the performance bottleneck of TransactionBufferHandle (#13988) ### Motivation Previously, synchronization locks were frequently used in TransactionBufferHandleImpl in order to achieve request timeouts. For this reason, the performance of TC has been affected. ### Modification Each request uses a Timeout to do a timeout check (cherry picked from commit f48b53d33ee1be9b8436593119dfbce38be2c81f) --- .../buffer/impl/TransactionBufferHandlerImpl.java | 55 ++++++---------------- 1 file changed, 15 insertions(+), 40 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java index 54f77a3..cb74bf3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java @@ -25,9 +25,6 @@ import io.netty.buffer.ByteBuf; import io.netty.util.HashedWheelTimer; import io.netty.util.Recycler; import io.netty.util.ReferenceCountUtil; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutionException; @@ -49,12 +46,11 @@ import org.apache.pulsar.common.api.proto.TxnAction; import org.apache.pulsar.common.protocol.Commands; @Slf4j -public class TransactionBufferHandlerImpl implements TransactionBufferHandler, TimerTask { +public class TransactionBufferHandlerImpl implements TransactionBufferHandler { private final ConcurrentSkipListMap<Long, OpRequestSend> pendingRequests; private final AtomicLong requestIdGenerator = new AtomicLong(); private final long operationTimeoutInMills; - private Timeout requestTimeout; private final HashedWheelTimer timer; private final Semaphore semaphore; private final boolean blockIfReachMaxPendingOps; @@ -84,11 +80,10 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler, T this.semaphore = new Semaphore(10000); this.blockIfReachMaxPendingOps = true; this.timer = timer; - this.requestTimeout = timer.newTimeout(this, operationTimeoutInMills, TimeUnit.MILLISECONDS); } @Override - public synchronized CompletableFuture<TxnID> endTxnOnTopic(String topic, long txnIdMostBits, long txnIdLeastBits, + public CompletableFuture<TxnID> endTxnOnTopic(String topic, long txnIdMostBits, long txnIdLeastBits, TxnAction action, long lowWaterMark) { if (log.isDebugEnabled()) { log.debug("[{}] endTxnOnTopic txnId: [{}], txnAction: [{}]", @@ -105,7 +100,7 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler, T } @Override - public synchronized CompletableFuture<TxnID> endTxnOnSubscription(String topic, String subscription, + public CompletableFuture<TxnID> endTxnOnSubscription(String topic, String subscription, long txnIdMostBits, long txnIdLeastBits, TxnAction action, long lowWaterMark) { if (log.isDebugEnabled()) { @@ -129,10 +124,16 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler, T if (throwable == null) { if (clientCnx.ctx().channel().isActive()) { clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this); - synchronized (TransactionBufferHandlerImpl.this) { - pendingRequests.put(requestId, op); - cmd.retain(); - } + pendingRequests.put(requestId, op); + timer.newTimeout(timeout -> { + OpRequestSend peek = pendingRequests.remove(requestId); + if (peek != null && !peek.cb.isDone() && !peek.cb.isCompletedExceptionally()) { + peek.cb.completeExceptionally(new TransactionBufferClientException + .RequestTimeoutException()); + onResponse(peek); + } + }, operationTimeoutInMills, TimeUnit.MILLISECONDS); + cmd.retain(); clientCnx.ctx().writeAndFlush(cmd, clientCnx.ctx().voidPromise()); } else { cache.invalidate(topic); @@ -158,7 +159,7 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler, T } @Override - public synchronized void handleEndTxnOnTopicResponse(long requestId, CommandEndTxnOnPartitionResponse response) { + public void handleEndTxnOnTopicResponse(long requestId, CommandEndTxnOnPartitionResponse response) { OpRequestSend op = pendingRequests.remove(requestId); if (op == null) { if (log.isDebugEnabled()) { @@ -183,7 +184,7 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler, T } @Override - public synchronized void handleEndTxnOnSubscriptionResponse(long requestId, + public void handleEndTxnOnSubscriptionResponse(long requestId, CommandEndTxnOnSubscriptionResponse response) { OpRequestSend op = pendingRequests.remove(requestId); if (op == null) { @@ -227,32 +228,6 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler, T return true; } - public synchronized void run(Timeout timeout) throws Exception { - if (timeout.isCancelled()) { - return; - } - long timeToWaitMs; - OpRequestSend peeked; - Map.Entry<Long, OpRequestSend> firstEntry = pendingRequests.firstEntry(); - peeked = firstEntry == null ? null : firstEntry.getValue(); - while (peeked != null && peeked.createdAt + operationTimeoutInMills - System.currentTimeMillis() <= 0) { - if (!peeked.cb.isDone()) { - peeked.cb.completeExceptionally(new TransactionBufferClientException.RequestTimeoutException()); - onResponse(peeked); - } else { - break; - } - firstEntry = pendingRequests.firstEntry(); - pendingRequests.remove(pendingRequests.firstKey()); - peeked = firstEntry == null ? null : firstEntry.getValue(); - } - if (peeked == null) { - timeToWaitMs = operationTimeoutInMills; - } else { - timeToWaitMs = (peeked.createdAt + operationTimeoutInMills) - System.currentTimeMillis(); - } - requestTimeout = timer.newTimeout(this, timeToWaitMs, TimeUnit.MILLISECONDS); - } void onResponse(OpRequestSend op) { ReferenceCountUtil.safeRelease(op.byteBuf);
