This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b09fd872aa61138fc979e05868096c21fdcf0509
Author: Xiangying Meng <[email protected]>
AuthorDate: Wed Dec 15 12:48:41 2021 +0800

    [Transaction] Remove request if can not send (#13308)
    
    (cherry picked from commit eb42df7126ac4015c67f6989ec083ee173dce3f4)
---
 .../client/impl/TransactionMetaStoreHandler.java   | 44 +++++++++++++++-------
 1 file changed, 30 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index b2b756a..3c6286b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -200,7 +200,9 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
         internalPinnedExecutor.execute(() -> {
             pendingRequests.put(requestId, op);
             timeoutQueue.add(new RequestTime(System.currentTimeMillis(), 
requestId));
-            checkStateAndSendRequest(op);
+            if (!checkStateAndSendRequest(op)) {
+                pendingRequests.remove(requestId);
+            }
         });
         return callback;
     }
@@ -249,7 +251,9 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
                                         }
                                         return;
                                     }
-                                    checkStateAndSendRequest(op);
+                                    if (!checkStateAndSendRequest(op)) {
+                                        pendingRequests.remove(requestId);
+                                    }
                                 });
                             }
                             , op.backoff.next(), TimeUnit.MILLISECONDS);
@@ -279,7 +283,9 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
         internalPinnedExecutor.execute(() -> {
             pendingRequests.put(requestId, op);
             timeoutQueue.add(new RequestTime(System.currentTimeMillis(), 
requestId));
-            checkStateAndSendRequest(op);
+            if (!checkStateAndSendRequest(op)) {
+                pendingRequests.remove(requestId);
+            }
         });
 
         return callback;
@@ -329,7 +335,9 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
                                         }
                                         return;
                                     }
-                                    checkStateAndSendRequest(op);
+                                    if (!checkStateAndSendRequest(op)) {
+                                        pendingRequests.remove(requestId);
+                                    }
                                 });
                             }
                             , op.backoff.next(), TimeUnit.MILLISECONDS);
@@ -360,7 +368,9 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
         internalPinnedExecutor.execute(() -> {
             pendingRequests.put(requestId, op);
             timeoutQueue.add(new RequestTime(System.currentTimeMillis(), 
requestId));
-            checkStateAndSendRequest(op);
+            if (!checkStateAndSendRequest(op)) {
+                pendingRequests.remove(requestId);
+            }
         });
         return callback;
     }
@@ -408,7 +418,9 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
                                         }
                                         return;
                                     }
-                                    checkStateAndSendRequest(op);
+                                    if (!checkStateAndSendRequest(op)) {
+                                        pendingRequests.remove(requestId);
+                                    }
                                 });
                             }
                             , op.backoff.next(), TimeUnit.MILLISECONDS);
@@ -437,7 +449,9 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
         internalPinnedExecutor.execute(() -> {
             pendingRequests.put(requestId, op);
             timeoutQueue.add(new RequestTime(System.currentTimeMillis(), 
requestId));
-            checkStateAndSendRequest(op);
+            if (!checkStateAndSendRequest(op)) {
+                pendingRequests.remove(requestId);
+            }
         });
         return callback;
     }
@@ -486,7 +500,9 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
                                         }
                                         return;
                                     }
-                                    checkStateAndSendRequest(op);
+                                    if (!checkStateAndSendRequest(op)) {
+                                        pendingRequests.remove(requestId);
+                                    }
                                 });
                             }
                             , op.backoff.next(), TimeUnit.MILLISECONDS);
@@ -634,7 +650,7 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
         return true;
     }
 
-    private void checkStateAndSendRequest(OpBase<?> op) {
+    private boolean checkStateAndSendRequest(OpBase<?> op) {
         switch (getState()) {
             case Ready:
                 ClientCnx cnx = cnx();
@@ -644,9 +660,9 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
                 } else {
                     LOG.error("The cnx was null when the TC handler was 
ready", new NullPointerException());
                 }
-                break;
+                return true;
             case Connecting:
-                break;
+                return true;
             case Closing:
             case Closed:
                 op.callback.completeExceptionally(
@@ -655,7 +671,7 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
                                         + transactionCoordinatorId
                                         + " is closing or closed."));
                 onResponse(op);
-                break;
+                return false;
             case Failed:
             case Uninitialized:
                 op.callback.completeExceptionally(
@@ -664,13 +680,13 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
                                         + transactionCoordinatorId
                                         + " not connected."));
                 onResponse(op);
-                break;
+                return false;
             default:
                 op.callback.completeExceptionally(
                         new 
TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException(
                                 transactionCoordinatorId));
                 onResponse(op);
-                break;
+                return false;
         }
     }
 

Reply via email to