This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b09fd872aa61138fc979e05868096c21fdcf0509 Author: Xiangying Meng <[email protected]> AuthorDate: Wed Dec 15 12:48:41 2021 +0800 [Transaction] Remove request if can not send (#13308) (cherry picked from commit eb42df7126ac4015c67f6989ec083ee173dce3f4) --- .../client/impl/TransactionMetaStoreHandler.java | 44 +++++++++++++++------- 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java index b2b756a..3c6286b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java @@ -200,7 +200,9 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect internalPinnedExecutor.execute(() -> { pendingRequests.put(requestId, op); timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId)); - checkStateAndSendRequest(op); + if (!checkStateAndSendRequest(op)) { + pendingRequests.remove(requestId); + } }); return callback; } @@ -249,7 +251,9 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect } return; } - checkStateAndSendRequest(op); + if (!checkStateAndSendRequest(op)) { + pendingRequests.remove(requestId); + } }); } , op.backoff.next(), TimeUnit.MILLISECONDS); @@ -279,7 +283,9 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect internalPinnedExecutor.execute(() -> { pendingRequests.put(requestId, op); timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId)); - checkStateAndSendRequest(op); + if (!checkStateAndSendRequest(op)) { + pendingRequests.remove(requestId); + } }); return callback; @@ -329,7 +335,9 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect } return; } - checkStateAndSendRequest(op); + if (!checkStateAndSendRequest(op)) { + pendingRequests.remove(requestId); + } }); } , op.backoff.next(), TimeUnit.MILLISECONDS); @@ -360,7 +368,9 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect internalPinnedExecutor.execute(() -> { pendingRequests.put(requestId, op); timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId)); - checkStateAndSendRequest(op); + if (!checkStateAndSendRequest(op)) { + pendingRequests.remove(requestId); + } }); return callback; } @@ -408,7 +418,9 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect } return; } - checkStateAndSendRequest(op); + if (!checkStateAndSendRequest(op)) { + pendingRequests.remove(requestId); + } }); } , op.backoff.next(), TimeUnit.MILLISECONDS); @@ -437,7 +449,9 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect internalPinnedExecutor.execute(() -> { pendingRequests.put(requestId, op); timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId)); - checkStateAndSendRequest(op); + if (!checkStateAndSendRequest(op)) { + pendingRequests.remove(requestId); + } }); return callback; } @@ -486,7 +500,9 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect } return; } - checkStateAndSendRequest(op); + if (!checkStateAndSendRequest(op)) { + pendingRequests.remove(requestId); + } }); } , op.backoff.next(), TimeUnit.MILLISECONDS); @@ -634,7 +650,7 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect return true; } - private void checkStateAndSendRequest(OpBase<?> op) { + private boolean checkStateAndSendRequest(OpBase<?> op) { switch (getState()) { case Ready: ClientCnx cnx = cnx(); @@ -644,9 +660,9 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect } else { LOG.error("The cnx was null when the TC handler was ready", new NullPointerException()); } - break; + return true; case Connecting: - break; + return true; case Closing: case Closed: op.callback.completeExceptionally( @@ -655,7 +671,7 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect + transactionCoordinatorId + " is closing or closed.")); onResponse(op); - break; + return false; case Failed: case Uninitialized: op.callback.completeExceptionally( @@ -664,13 +680,13 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect + transactionCoordinatorId + " not connected.")); onResponse(op); - break; + return false; default: op.callback.completeExceptionally( new TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException( transactionCoordinatorId)); onResponse(op); - break; + return false; } }
