congbobo184 commented on a change in pull request #14894:
URL: https://github.com/apache/pulsar/pull/14894#discussion_r838472624



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
##########
@@ -87,7 +96,18 @@ public TransactionBufferHandlerImpl(PulsarClient 
pulsarClient,
         long requestId = requestIdGenerator.getAndIncrement();
         ByteBuf cmd = Commands.newEndTxnOnPartition(requestId, txnIdLeastBits, 
txnIdMostBits,
                 topic, action, lowWaterMark);
-        return endTxn(requestId, topic, cmd, cb);
+
+        try {

Review comment:
       this try catch can be deleted, below all

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
##########
@@ -102,79 +122,101 @@ public TransactionBufferHandlerImpl(PulsarClient 
pulsarClient,
         long requestId = requestIdGenerator.getAndIncrement();
         ByteBuf cmd = Commands.newEndTxnOnSubscription(requestId, 
txnIdLeastBits, txnIdMostBits,
                 topic, subscription, action, lowWaterMark);
-        return endTxn(requestId, topic, cmd, cb);
-    }
-
-    private CompletableFuture<TxnID> endTxn(long requestId, String topic, 
ByteBuf cmd, CompletableFuture<TxnID> cb) {
-        OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb);
         try {
-            cache.get(topic).whenComplete((clientCnx, throwable) -> {
-                if (throwable == null) {
-                    if (clientCnx.ctx().channel().isActive()) {
-                        
clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this);
-                        pendingRequests.put(requestId, op);
-                        timer.newTimeout(timeout -> {
-                            OpRequestSend peek = 
pendingRequests.remove(requestId);
-                            if (peek != null && !peek.cb.isDone() && 
!peek.cb.isCompletedExceptionally()) {
-                                peek.cb.completeExceptionally(new 
TransactionBufferClientException
-                                        .RequestTimeoutException());
-                                onResponse(peek);
-                            }
-                        }, operationTimeoutInMills, TimeUnit.MILLISECONDS);
-                        cmd.retain();
-                        clientCnx.ctx().writeAndFlush(cmd, 
clientCnx.ctx().voidPromise());
-                    } else {
-                        cache.invalidate(topic);
-                        cb.completeExceptionally(
-                                new 
PulsarClientException.LookupException(topic + " endTxn channel is not active"));
-                        op.recycle();
-                    }
-                } else {
-                    log.error("endTxn error topic: [{}]", topic, throwable);
-                    cache.invalidate(topic);
-                    cb.completeExceptionally(
-                            new 
PulsarClientException.LookupException(throwable.getMessage()));
-                    op.recycle();
-                }
-            });
+            OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb, 
lookupCache.get(topic));
+            if (checkRequestCredits(op)) {
+                endTxn(op);
+            }
         } catch (ExecutionException e) {
-            log.error("endTxn channel is not active exception", e);
-            cache.invalidate(topic);
+            log.error("[{}] failed to get client cnx from lookup cache", 
topic, e);
+            lookupCache.invalidate(topic);
             cb.completeExceptionally(new 
PulsarClientException.LookupException(e.getCause().getMessage()));
-            op.recycle();
         }
         return cb;
     }
 
+    private boolean checkRequestCredits(OpRequestSend op) {
+        int currentPermits = REQUEST_CREDITS_UPDATER.get(this);
+        if (currentPermits > 0 && pendingRequests.peek() == null) {
+            if (REQUEST_CREDITS_UPDATER.compareAndSet(this, currentPermits, 
currentPermits - 1)) {
+                return true;
+            } else {
+                return checkRequestCredits(op);
+            }
+        } else {
+            pendingRequests.add(op);
+            return false;
+        }
+    }
+
+    public void endTxn(OpRequestSend op) {
+        op.cnx.whenComplete((clientCnx, throwable) -> {
+            if (throwable == null) {
+                if (clientCnx.ctx().channel().isActive()) {
+                    
clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this);
+                    outstandingRequests.put(op.requestId, op);
+                    timer.newTimeout(timeout -> {
+                        OpRequestSend peek = 
outstandingRequests.remove(op.requestId);
+                        if (peek != null && !peek.cb.isDone() && 
!peek.cb.isCompletedExceptionally()) {
+                            peek.cb.completeExceptionally(new 
TransactionBufferClientException
+                                    .RequestTimeoutException());
+                            onResponse(peek);
+                        }
+                    }, operationTimeoutInMills, TimeUnit.MILLISECONDS);
+                    op.cmd.retain();
+                    clientCnx.ctx().writeAndFlush(op.cmd, 
clientCnx.ctx().voidPromise());
+                } else {
+                    invalidateLookupCache(op);
+                    op.cb.completeExceptionally(
+                            new PulsarClientException.LookupException(op.topic 
+ " endTxn channel is not active"));
+                    onResponse(op);
+                }
+            } else {
+                log.error("endTxn error topic: [{}]", op.topic, throwable);
+                invalidateLookupCache(op);
+                op.cb.completeExceptionally(
+                        new 
PulsarClientException.LookupException(throwable.getMessage()));
+                onResponse(op);
+            }
+        });
+    }
+
     @Override
     public void handleEndTxnOnTopicResponse(long requestId, 
CommandEndTxnOnPartitionResponse response) {
-        OpRequestSend op = pendingRequests.remove(requestId);
+        OpRequestSend op = outstandingRequests.remove(requestId);
         if (op == null) {
             if (log.isDebugEnabled()) {
                 log.debug("Got end txn on topic response for timeout {} - {}", 
response.getTxnidMostBits(),
                         response.getTxnidLeastBits());
             }
             return;
         }
-
-        if (!response.hasError()) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Got end txn on topic response for for request 
{}", op.topic, response.getRequestId());
+        try {
+            if (!response.hasError()) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Got end txn on topic response for for 
request {}", op.topic,
+                            response.getRequestId());
+                }
+                op.cb.complete(new TxnID(response.getTxnidMostBits(), 
response.getTxnidLeastBits()));
+            } else {
+                log.error("[{}] Got end txn on topic response for request {} 
error {}", op.topic,
+                        response.getRequestId(),
+                        response.getError());
+                invalidateLookupCache(op);
+                
op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(),
+                        response.getMessage()));
             }
-            op.cb.complete(new TxnID(response.getTxnidMostBits(), 
response.getTxnidLeastBits()));
-        } else {
-            log.error("[{}] Got end txn on topic response for request {} error 
{}", op.topic, response.getRequestId(),
-                    response.getError());
-            cache.invalidate(op.topic);
-            
op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(),
 response.getMessage()));
+        } catch (Exception e) {

Review comment:
       does we need this try catch?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
##########
@@ -183,41 +225,98 @@ public void handleEndTxnOnSubscriptionResponse(long 
requestId,
             return;
         }
 
-        if (!response.hasError()) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Got end txn on subscription response for for 
request {}",
-                        op.topic, response.getRequestId());
+        try {
+            if (!response.hasError()) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Got end txn on subscription response for 
for request {}",
+                            op.topic, response.getRequestId());
+                }
+                op.cb.complete(new TxnID(response.getTxnidMostBits(), 
response.getTxnidLeastBits()));
+            } else {
+                log.error("[{}] Got end txn on subscription response for 
request {} error {}",
+                        op.topic, response.getRequestId(), 
response.getError());
+                invalidateLookupCache(op);
+                
op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(),
+                        response.getMessage()));
             }
-            op.cb.complete(new TxnID(response.getTxnidMostBits(), 
response.getTxnidLeastBits()));
-        } else {
-            log.error("[{}] Got end txn on subscription response for request 
{} error {}",
-                    op.topic, response.getRequestId(), response.getError());
-            cache.invalidate(op.topic);
-            
op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(),
 response.getMessage()));
+        } catch (Exception e) {
+            log.error("[{}] Got exception when complete EndTxnOnSub op for 
request {}", op.topic, e);
+        } finally {
+            onResponse(op);
         }
-        onResponse(op);
     }
 
-    void onResponse(OpRequestSend op) {
-        ReferenceCountUtil.safeRelease(op.byteBuf);
-        op.recycle();
+    public void onResponse(OpRequestSend op) {
+        REQUEST_CREDITS_UPDATER.incrementAndGet(this);
+        if (op != null) {
+            ReferenceCountUtil.safeRelease(op.cmd);
+            op.recycle();
+        }
+        checkPendingRequests();
     }
 
-    private static final class OpRequestSend {
+    private void checkPendingRequests() {
+        while (true) {
+            int permits = REQUEST_CREDITS_UPDATER.get(this);
+            if (permits > 0 && pendingRequests.peek() != null) {
+                if (REQUEST_CREDITS_UPDATER.compareAndSet(this, permits, 
permits - 1)) {
+                    OpRequestSend polled = pendingRequests.poll();
+                    if (polled != null) {
+                        try {
+                            if (polled.cnx != lookupCache.get(polled.topic)) {
+                                OpRequestSend invalid = polled;
+                                polled = 
OpRequestSend.create(invalid.requestId, invalid.topic, invalid.cmd, invalid.cb,
+                                        lookupCache.get(invalid.topic));
+                                invalid.recycle();
+                            }
+                            endTxn(polled);
+                        } catch (ExecutionException e) {
+                            log.error("[{}] failed to get client cnx from 
lookup cache", polled.topic, e);
+                            lookupCache.invalidate(polled.topic);
+                            polled.cb.completeExceptionally(new 
PulsarClientException.LookupException(
+                                    e.getCause().getMessage()));
+                            REQUEST_CREDITS_UPDATER.incrementAndGet(this);
+                        }
+                    } else {
+                        REQUEST_CREDITS_UPDATER.incrementAndGet(this);
+                    }
+                } else {
+                    checkPendingRequests();

Review comment:
       only `continue;` is enough




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to