This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 74c3b577c7c [improve][client]Improve transaction log when a TXN
command timeout (#24230)
74c3b577c7c is described below
commit 74c3b577c7c78a5f9d72fa28c99db72dda4460b7
Author: fengyubiao <[email protected]>
AuthorDate: Wed Apr 30 14:33:57 2025 +0800
[improve][client]Improve transaction log when a TXN command timeout (#24230)
---
.../client/impl/TransactionMetaStoreHandler.java | 44 ++++++++++++++++++----
1 file changed, 37 insertions(+), 7 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index c8c2fa83f94..a1fe78d7290 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -234,7 +234,8 @@ public class TransactionMetaStoreHandler extends
HandlerState
}
long requestId = client.newRequestId();
ByteBuf cmd = Commands.newTxn(transactionCoordinatorId, requestId,
unit.toMillis(timeout));
- OpForTxnIdCallBack op = OpForTxnIdCallBack.create(cmd, callback,
client);
+ String description = String.format("Create new transaction %s",
transactionCoordinatorId);
+ OpForTxnIdCallBack op = OpForTxnIdCallBack.create(cmd, callback,
client, description, cnx());
internalPinnedExecutor.execute(() -> {
pendingRequests.put(requestId, op);
timeoutQueue.add(new RequestTime(System.currentTimeMillis(),
requestId));
@@ -315,8 +316,10 @@ public class TransactionMetaStoreHandler extends
HandlerState
long requestId = client.newRequestId();
ByteBuf cmd = Commands.newAddPartitionToTxn(
requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
partitions);
+ String description = String.format("Add partition %s to TXN %s",
String.valueOf(partitions),
+ String.valueOf(txnID));
OpForVoidCallBack op = OpForVoidCallBack
- .create(cmd, callback, client);
+ .create(cmd, callback, client, description, cnx());
internalPinnedExecutor.execute(() -> {
pendingRequests.put(requestId, op);
timeoutQueue.add(new RequestTime(System.currentTimeMillis(),
requestId));
@@ -400,7 +403,9 @@ public class TransactionMetaStoreHandler extends
HandlerState
long requestId = client.newRequestId();
ByteBuf cmd = Commands.newAddSubscriptionToTxn(
requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
subscriptionList);
- OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback, client);
+ String description = String.format("Add subscription %s to TXN %s",
toStringSubscriptionList(subscriptionList),
+ String.valueOf(txnID));
+ OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback, client,
description, cnx());
internalPinnedExecutor.execute(() -> {
pendingRequests.put(requestId, op);
timeoutQueue.add(new RequestTime(System.currentTimeMillis(),
requestId));
@@ -411,6 +416,17 @@ public class TransactionMetaStoreHandler extends
HandlerState
return callback;
}
+ private String toStringSubscriptionList(List<Subscription> list) {
+ if (list == null || list.isEmpty()) {
+ return "[]";
+ }
+ StringBuilder builder = new StringBuilder("[");
+ for (Subscription subscription : list) {
+ builder.append(String.format("%s %s", subscription.getTopic(),
subscription.getSubscription()));
+ }
+ return builder.append("]").toString();
+ }
+
public void
handleAddSubscriptionToTxnResponse(CommandAddSubscriptionToTxnResponse
response) {
final boolean hasError = response.hasError();
final ServerError error;
@@ -482,7 +498,8 @@ public class TransactionMetaStoreHandler extends
HandlerState
long requestId = client.newRequestId();
BaseCommand cmd = Commands.newEndTxn(requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits(), action);
ByteBuf buf = Commands.serializeWithSize(cmd);
- OpForVoidCallBack op = OpForVoidCallBack.create(buf, callback, client);
+ String description = String.format("End [%s] TXN %s",
String.valueOf(action), String.valueOf(txnID));
+ OpForVoidCallBack op = OpForVoidCallBack.create(buf, callback, client,
description, cnx());
internalPinnedExecutor.execute(() -> {
pendingRequests.put(requestId, op);
timeoutQueue.add(new RequestTime(System.currentTimeMillis(),
requestId));
@@ -572,13 +589,16 @@ public class TransactionMetaStoreHandler extends
HandlerState
protected ByteBuf cmd;
protected CompletableFuture<T> callback;
protected Backoff backoff;
+ protected String description;
+ protected ClientCnx clientCnx;
abstract void recycle();
}
private static class OpForTxnIdCallBack extends OpBase<TxnID> {
- static OpForTxnIdCallBack create(ByteBuf cmd, CompletableFuture<TxnID>
callback, PulsarClientImpl client) {
+ static OpForTxnIdCallBack create(ByteBuf cmd, CompletableFuture<TxnID>
callback, PulsarClientImpl client,
+ String description, ClientCnx
clientCnx) {
OpForTxnIdCallBack op = RECYCLER.get();
op.callback = callback;
op.cmd = cmd;
@@ -588,6 +608,8 @@ public class TransactionMetaStoreHandler extends
HandlerState
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos() / 10,
TimeUnit.NANOSECONDS)
.setMandatoryStop(0, TimeUnit.MILLISECONDS)
.create();
+ op.description = description;
+ op.clientCnx = clientCnx;
return op;
}
@@ -600,6 +622,8 @@ public class TransactionMetaStoreHandler extends
HandlerState
this.backoff = null;
this.cmd = null;
this.callback = null;
+ this.description = null;
+ this.clientCnx = null;
recyclerHandle.recycle(this);
}
@@ -615,7 +639,8 @@ public class TransactionMetaStoreHandler extends
HandlerState
private static class OpForVoidCallBack extends OpBase<Void> {
- static OpForVoidCallBack create(ByteBuf cmd, CompletableFuture<Void>
callback, PulsarClientImpl client) {
+ static OpForVoidCallBack create(ByteBuf cmd, CompletableFuture<Void>
callback, PulsarClientImpl client,
+ String description, ClientCnx
clientCnx) {
OpForVoidCallBack op = RECYCLER.get();
op.callback = callback;
op.cmd = cmd;
@@ -625,6 +650,8 @@ public class TransactionMetaStoreHandler extends
HandlerState
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos() / 10,
TimeUnit.NANOSECONDS)
.setMandatoryStop(0, TimeUnit.MILLISECONDS)
.create();
+ op.description = description;
+ op.clientCnx = clientCnx;
return op;
}
@@ -637,6 +664,8 @@ public class TransactionMetaStoreHandler extends
HandlerState
this.backoff = null;
this.cmd = null;
this.callback = null;
+ this.description = null;
+ this.clientCnx = null;
recyclerHandle.recycle(this);
}
@@ -744,7 +773,8 @@ public class TransactionMetaStoreHandler extends
HandlerState
OpBase<?> op =
pendingRequests.remove(lastPolled.requestId);
if (op != null && !op.callback.isDone()) {
op.callback.completeExceptionally(new
PulsarClientException.TimeoutException(
- "Could not get response from transaction meta
store within given timeout."));
+ String.format("%s failed due to timeout.
connection: %s. pending-queue: %s",
+ op.description, op.clientCnx,
pendingRequests.size())));
if (LOG.isDebugEnabled()) {
LOG.debug("Transaction coordinator request {} is
timeout.", lastPolled.requestId);
}