codelipenghui commented on a change in pull request #14894:
URL: https://github.com/apache/pulsar/pull/14894#discussion_r838543270



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
##########
@@ -87,7 +96,18 @@ public TransactionBufferHandlerImpl(PulsarClient 
pulsarClient,
         long requestId = requestIdGenerator.getAndIncrement();
         ByteBuf cmd = Commands.newEndTxnOnPartition(requestId, txnIdLeastBits, 
txnIdMostBits,
                 topic, action, lowWaterMark);
-        return endTxn(requestId, topic, cmd, cb);
+
+        try {

Review comment:
       lookupCache.get(topic) will throw ExecutionException which means some 
errors happened in the lookup cache, we need to catch the exception and 
invalidate the cache.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
##########
@@ -102,79 +122,101 @@ public TransactionBufferHandlerImpl(PulsarClient 
pulsarClient,
         long requestId = requestIdGenerator.getAndIncrement();
         ByteBuf cmd = Commands.newEndTxnOnSubscription(requestId, 
txnIdLeastBits, txnIdMostBits,
                 topic, subscription, action, lowWaterMark);
-        return endTxn(requestId, topic, cmd, cb);
-    }
-
-    private CompletableFuture<TxnID> endTxn(long requestId, String topic, 
ByteBuf cmd, CompletableFuture<TxnID> cb) {
-        OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb);
         try {
-            cache.get(topic).whenComplete((clientCnx, throwable) -> {
-                if (throwable == null) {
-                    if (clientCnx.ctx().channel().isActive()) {
-                        
clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this);
-                        pendingRequests.put(requestId, op);
-                        timer.newTimeout(timeout -> {
-                            OpRequestSend peek = 
pendingRequests.remove(requestId);
-                            if (peek != null && !peek.cb.isDone() && 
!peek.cb.isCompletedExceptionally()) {
-                                peek.cb.completeExceptionally(new 
TransactionBufferClientException
-                                        .RequestTimeoutException());
-                                onResponse(peek);
-                            }
-                        }, operationTimeoutInMills, TimeUnit.MILLISECONDS);
-                        cmd.retain();
-                        clientCnx.ctx().writeAndFlush(cmd, 
clientCnx.ctx().voidPromise());
-                    } else {
-                        cache.invalidate(topic);
-                        cb.completeExceptionally(
-                                new 
PulsarClientException.LookupException(topic + " endTxn channel is not active"));
-                        op.recycle();
-                    }
-                } else {
-                    log.error("endTxn error topic: [{}]", topic, throwable);
-                    cache.invalidate(topic);
-                    cb.completeExceptionally(
-                            new 
PulsarClientException.LookupException(throwable.getMessage()));
-                    op.recycle();
-                }
-            });
+            OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb, 
lookupCache.get(topic));
+            if (checkRequestCredits(op)) {
+                endTxn(op);
+            }
         } catch (ExecutionException e) {
-            log.error("endTxn channel is not active exception", e);
-            cache.invalidate(topic);
+            log.error("[{}] failed to get client cnx from lookup cache", 
topic, e);
+            lookupCache.invalidate(topic);
             cb.completeExceptionally(new 
PulsarClientException.LookupException(e.getCause().getMessage()));
-            op.recycle();
         }
         return cb;
     }
 
+    private boolean checkRequestCredits(OpRequestSend op) {
+        int currentPermits = REQUEST_CREDITS_UPDATER.get(this);
+        if (currentPermits > 0 && pendingRequests.peek() == null) {
+            if (REQUEST_CREDITS_UPDATER.compareAndSet(this, currentPermits, 
currentPermits - 1)) {
+                return true;
+            } else {
+                return checkRequestCredits(op);
+            }
+        } else {
+            pendingRequests.add(op);
+            return false;
+        }
+    }
+
+    public void endTxn(OpRequestSend op) {
+        op.cnx.whenComplete((clientCnx, throwable) -> {
+            if (throwable == null) {
+                if (clientCnx.ctx().channel().isActive()) {
+                    
clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this);
+                    outstandingRequests.put(op.requestId, op);
+                    timer.newTimeout(timeout -> {
+                        OpRequestSend peek = 
outstandingRequests.remove(op.requestId);
+                        if (peek != null && !peek.cb.isDone() && 
!peek.cb.isCompletedExceptionally()) {
+                            peek.cb.completeExceptionally(new 
TransactionBufferClientException
+                                    .RequestTimeoutException());
+                            onResponse(peek);
+                        }
+                    }, operationTimeoutInMills, TimeUnit.MILLISECONDS);
+                    op.cmd.retain();
+                    clientCnx.ctx().writeAndFlush(op.cmd, 
clientCnx.ctx().voidPromise());
+                } else {
+                    invalidateLookupCache(op);
+                    op.cb.completeExceptionally(
+                            new PulsarClientException.LookupException(op.topic 
+ " endTxn channel is not active"));
+                    onResponse(op);
+                }
+            } else {
+                log.error("endTxn error topic: [{}]", op.topic, throwable);
+                invalidateLookupCache(op);
+                op.cb.completeExceptionally(
+                        new 
PulsarClientException.LookupException(throwable.getMessage()));
+                onResponse(op);
+            }
+        });
+    }
+
     @Override
     public void handleEndTxnOnTopicResponse(long requestId, 
CommandEndTxnOnPartitionResponse response) {
-        OpRequestSend op = pendingRequests.remove(requestId);
+        OpRequestSend op = outstandingRequests.remove(requestId);
         if (op == null) {
             if (log.isDebugEnabled()) {
                 log.debug("Got end txn on topic response for timeout {} - {}", 
response.getTxnidMostBits(),
                         response.getTxnidLeastBits());
             }
             return;
         }
-
-        if (!response.hasError()) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Got end txn on topic response for for request 
{}", op.topic, response.getRequestId());
+        try {
+            if (!response.hasError()) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Got end txn on topic response for for 
request {}", op.topic,
+                            response.getRequestId());
+                }
+                op.cb.complete(new TxnID(response.getTxnidMostBits(), 
response.getTxnidLeastBits()));
+            } else {
+                log.error("[{}] Got end txn on topic response for request {} 
error {}", op.topic,
+                        response.getRequestId(),
+                        response.getError());
+                invalidateLookupCache(op);
+                
op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(),
+                        response.getMessage()));
             }
-            op.cb.complete(new TxnID(response.getTxnidMostBits(), 
response.getTxnidLeastBits()));
-        } else {
-            log.error("[{}] Got end txn on topic response for request {} error 
{}", op.topic, response.getRequestId(),
-                    response.getError());
-            cache.invalidate(op.topic);
-            
op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(),
 response.getMessage()));
+        } catch (Exception e) {

Review comment:
       Just want to add an error log if there are some exceptions when 
completing the callback, the final block ensure the onResponse() can be 
complete.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
##########
@@ -183,41 +225,98 @@ public void handleEndTxnOnSubscriptionResponse(long 
requestId,
             return;
         }
 
-        if (!response.hasError()) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Got end txn on subscription response for for 
request {}",
-                        op.topic, response.getRequestId());
+        try {
+            if (!response.hasError()) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Got end txn on subscription response for 
for request {}",
+                            op.topic, response.getRequestId());
+                }
+                op.cb.complete(new TxnID(response.getTxnidMostBits(), 
response.getTxnidLeastBits()));
+            } else {
+                log.error("[{}] Got end txn on subscription response for 
request {} error {}",
+                        op.topic, response.getRequestId(), 
response.getError());
+                invalidateLookupCache(op);
+                
op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(),
+                        response.getMessage()));
             }
-            op.cb.complete(new TxnID(response.getTxnidMostBits(), 
response.getTxnidLeastBits()));
-        } else {
-            log.error("[{}] Got end txn on subscription response for request 
{} error {}",
-                    op.topic, response.getRequestId(), response.getError());
-            cache.invalidate(op.topic);
-            
op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(),
 response.getMessage()));
+        } catch (Exception e) {
+            log.error("[{}] Got exception when complete EndTxnOnSub op for 
request {}", op.topic, e);
+        } finally {
+            onResponse(op);
         }
-        onResponse(op);
     }
 
-    void onResponse(OpRequestSend op) {
-        ReferenceCountUtil.safeRelease(op.byteBuf);
-        op.recycle();
+    public void onResponse(OpRequestSend op) {
+        REQUEST_CREDITS_UPDATER.incrementAndGet(this);
+        if (op != null) {
+            ReferenceCountUtil.safeRelease(op.cmd);
+            op.recycle();
+        }
+        checkPendingRequests();
     }
 
-    private static final class OpRequestSend {
+    private void checkPendingRequests() {
+        while (true) {
+            int permits = REQUEST_CREDITS_UPDATER.get(this);
+            if (permits > 0 && pendingRequests.peek() != null) {
+                if (REQUEST_CREDITS_UPDATER.compareAndSet(this, permits, 
permits - 1)) {
+                    OpRequestSend polled = pendingRequests.poll();
+                    if (polled != null) {
+                        try {
+                            if (polled.cnx != lookupCache.get(polled.topic)) {
+                                OpRequestSend invalid = polled;
+                                polled = 
OpRequestSend.create(invalid.requestId, invalid.topic, invalid.cmd, invalid.cb,
+                                        lookupCache.get(invalid.topic));
+                                invalid.recycle();
+                            }
+                            endTxn(polled);
+                        } catch (ExecutionException e) {
+                            log.error("[{}] failed to get client cnx from 
lookup cache", polled.topic, e);
+                            lookupCache.invalidate(polled.topic);
+                            polled.cb.completeExceptionally(new 
PulsarClientException.LookupException(
+                                    e.getCause().getMessage()));
+                            REQUEST_CREDITS_UPDATER.incrementAndGet(this);
+                        }
+                    } else {
+                        REQUEST_CREDITS_UPDATER.incrementAndGet(this);
+                    }
+                } else {
+                    checkPendingRequests();

Review comment:
       Nice catch!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to