This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2596edc12779fbc15efb8c099860ca1a09bdde47
Author: fengyubiao <[email protected]>
AuthorDate: Wed Apr 30 14:33:57 2025 +0800

    [improve][client]Improve transaction log when a TXN command timeout (#24230)
    
    (cherry picked from commit 74c3b577c7c78a5f9d72fa28c99db72dda4460b7)
---
 .../client/impl/TransactionMetaStoreHandler.java   | 44 ++++++++++++++++++----
 1 file changed, 37 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index c8c2fa83f94..a1fe78d7290 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -234,7 +234,8 @@ public class TransactionMetaStoreHandler extends 
HandlerState
         }
         long requestId = client.newRequestId();
         ByteBuf cmd = Commands.newTxn(transactionCoordinatorId, requestId, 
unit.toMillis(timeout));
-        OpForTxnIdCallBack op = OpForTxnIdCallBack.create(cmd, callback, 
client);
+        String description = String.format("Create new transaction %s", 
transactionCoordinatorId);
+        OpForTxnIdCallBack op = OpForTxnIdCallBack.create(cmd, callback, 
client, description, cnx());
         internalPinnedExecutor.execute(() -> {
             pendingRequests.put(requestId, op);
             timeoutQueue.add(new RequestTime(System.currentTimeMillis(), 
requestId));
@@ -315,8 +316,10 @@ public class TransactionMetaStoreHandler extends 
HandlerState
         long requestId = client.newRequestId();
         ByteBuf cmd = Commands.newAddPartitionToTxn(
                 requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), 
partitions);
+        String description = String.format("Add partition %s to TXN %s", 
String.valueOf(partitions),
+                String.valueOf(txnID));
         OpForVoidCallBack op = OpForVoidCallBack
-                .create(cmd, callback, client);
+                .create(cmd, callback, client, description, cnx());
         internalPinnedExecutor.execute(() -> {
             pendingRequests.put(requestId, op);
             timeoutQueue.add(new RequestTime(System.currentTimeMillis(), 
requestId));
@@ -400,7 +403,9 @@ public class TransactionMetaStoreHandler extends 
HandlerState
         long requestId = client.newRequestId();
         ByteBuf cmd = Commands.newAddSubscriptionToTxn(
                 requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), 
subscriptionList);
-        OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback, client);
+        String description = String.format("Add subscription %s to TXN %s", 
toStringSubscriptionList(subscriptionList),
+                String.valueOf(txnID));
+        OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback, client, 
description, cnx());
         internalPinnedExecutor.execute(() -> {
             pendingRequests.put(requestId, op);
             timeoutQueue.add(new RequestTime(System.currentTimeMillis(), 
requestId));
@@ -411,6 +416,17 @@ public class TransactionMetaStoreHandler extends 
HandlerState
         return callback;
     }
 
+    private String toStringSubscriptionList(List<Subscription> list) {
+        if (list == null || list.isEmpty()) {
+            return "[]";
+        }
+        StringBuilder builder = new StringBuilder("[");
+        for (Subscription subscription : list) {
+            builder.append(String.format("%s %s", subscription.getTopic(), 
subscription.getSubscription()));
+        }
+        return builder.append("]").toString();
+    }
+
     public void 
handleAddSubscriptionToTxnResponse(CommandAddSubscriptionToTxnResponse 
response) {
         final boolean hasError = response.hasError();
         final ServerError error;
@@ -482,7 +498,8 @@ public class TransactionMetaStoreHandler extends 
HandlerState
         long requestId = client.newRequestId();
         BaseCommand cmd = Commands.newEndTxn(requestId, 
txnID.getLeastSigBits(), txnID.getMostSigBits(), action);
         ByteBuf buf = Commands.serializeWithSize(cmd);
-        OpForVoidCallBack op = OpForVoidCallBack.create(buf, callback, client);
+        String description = String.format("End [%s] TXN %s", 
String.valueOf(action), String.valueOf(txnID));
+        OpForVoidCallBack op = OpForVoidCallBack.create(buf, callback, client, 
description, cnx());
         internalPinnedExecutor.execute(() -> {
             pendingRequests.put(requestId, op);
             timeoutQueue.add(new RequestTime(System.currentTimeMillis(), 
requestId));
@@ -572,13 +589,16 @@ public class TransactionMetaStoreHandler extends 
HandlerState
         protected ByteBuf cmd;
         protected CompletableFuture<T> callback;
         protected Backoff backoff;
+        protected String description;
+        protected ClientCnx clientCnx;
 
         abstract void recycle();
     }
 
     private static class OpForTxnIdCallBack extends OpBase<TxnID> {
 
-        static OpForTxnIdCallBack create(ByteBuf cmd, CompletableFuture<TxnID> 
callback, PulsarClientImpl client) {
+        static OpForTxnIdCallBack create(ByteBuf cmd, CompletableFuture<TxnID> 
callback, PulsarClientImpl client,
+                                         String description, ClientCnx 
clientCnx) {
             OpForTxnIdCallBack op = RECYCLER.get();
             op.callback = callback;
             op.cmd = cmd;
@@ -588,6 +608,8 @@ public class TransactionMetaStoreHandler extends 
HandlerState
                     
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos() / 10, 
TimeUnit.NANOSECONDS)
                     .setMandatoryStop(0, TimeUnit.MILLISECONDS)
                     .create();
+            op.description = description;
+            op.clientCnx = clientCnx;
             return op;
         }
 
@@ -600,6 +622,8 @@ public class TransactionMetaStoreHandler extends 
HandlerState
             this.backoff = null;
             this.cmd = null;
             this.callback = null;
+            this.description = null;
+            this.clientCnx = null;
             recyclerHandle.recycle(this);
         }
 
@@ -615,7 +639,8 @@ public class TransactionMetaStoreHandler extends 
HandlerState
     private static class OpForVoidCallBack extends OpBase<Void> {
 
 
-        static OpForVoidCallBack create(ByteBuf cmd, CompletableFuture<Void> 
callback, PulsarClientImpl client) {
+        static OpForVoidCallBack create(ByteBuf cmd, CompletableFuture<Void> 
callback, PulsarClientImpl client,
+                                        String description, ClientCnx 
clientCnx) {
             OpForVoidCallBack op = RECYCLER.get();
             op.callback = callback;
             op.cmd = cmd;
@@ -625,6 +650,8 @@ public class TransactionMetaStoreHandler extends 
HandlerState
                     
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos() / 10, 
TimeUnit.NANOSECONDS)
                     .setMandatoryStop(0, TimeUnit.MILLISECONDS)
                     .create();
+            op.description = description;
+            op.clientCnx = clientCnx;
             return op;
         }
 
@@ -637,6 +664,8 @@ public class TransactionMetaStoreHandler extends 
HandlerState
             this.backoff = null;
             this.cmd = null;
             this.callback = null;
+            this.description = null;
+            this.clientCnx = null;
             recyclerHandle.recycle(this);
         }
 
@@ -744,7 +773,8 @@ public class TransactionMetaStoreHandler extends 
HandlerState
                     OpBase<?> op = 
pendingRequests.remove(lastPolled.requestId);
                     if (op != null && !op.callback.isDone()) {
                         op.callback.completeExceptionally(new 
PulsarClientException.TimeoutException(
-                                "Could not get response from transaction meta 
store within given timeout."));
+                            String.format("%s failed due to timeout. 
connection: %s. pending-queue: %s",
+                                op.description, op.clientCnx, 
pendingRequests.size())));
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Transaction coordinator request {} is 
timeout.", lastPolled.requestId);
                         }

Reply via email to