This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1333ea7c4d7 IoTConsensus: Skip retry sending batch caused by 
TApplicationException (#12243)
1333ea7c4d7 is described below

commit 1333ea7c4d7a579320b376451dbb954c5fe92359
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Wed Mar 27 18:29:29 2024 +0800

    IoTConsensus: Skip retry sending batch caused by TApplicationException 
(#12243)
---
 .../consensus/iot/client/DispatchLogHandler.java   | 23 +++++++++++++++++-----
 1 file changed, 18 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
index 88a8d90ebaa..0e03b97a111 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.consensus.iot.thrift.TSyncLogEntriesRes;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.thrift.TApplicationException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,10 +74,7 @@ public class DispatchLogHandler implements 
AsyncMethodCallback<TSyncLogEntriesRe
           messages);
       sleepCorrespondingTimeAndRetryAsynchronous();
     } else {
-      thread.getSyncStatus().removeBatch(batch);
-      // update safely deleted search index after last flushed sync index may 
be updated by
-      // removeBatch
-      thread.updateSafelyDeletedSearchIndex();
+      completeBatch(batch);
     }
     logDispatcherThreadMetrics.recordSyncLogTimePerRequest(System.nanoTime() - 
createTime);
   }
@@ -91,12 +89,20 @@ public class DispatchLogHandler implements 
AsyncMethodCallback<TSyncLogEntriesRe
   public void onError(Exception exception) {
     ++retryCount;
     if (logger.isWarnEnabled()) {
+      Throwable rootCause = ExceptionUtils.getRootCause(exception);
       logger.warn(
           "Can not send {} to peer for {} times {} because {}",
           batch,
           thread.getPeer(),
           retryCount,
-          ExceptionUtils.getRootCause(exception).toString());
+          rootCause.toString());
+      // skip TApplicationException caused by follower
+      if (rootCause instanceof TApplicationException) {
+        completeBatch(batch);
+        logger.warn("Skip retrying this Batch {} because of 
TApplicationException.", batch);
+        
logDispatcherThreadMetrics.recordSyncLogTimePerRequest(System.nanoTime() - 
createTime);
+        return;
+      }
     }
     sleepCorrespondingTimeAndRetryAsynchronous();
   }
@@ -127,4 +133,11 @@ public class DispatchLogHandler implements 
AsyncMethodCallback<TSyncLogEntriesRe
             sleepTime,
             TimeUnit.MILLISECONDS);
   }
+
+  private void completeBatch(Batch batch) {
+    thread.getSyncStatus().removeBatch(batch);
+    // update safely deleted search index after last flushed sync index may be 
updated by
+    // removeBatch
+    thread.updateSafelyDeletedSearchIndex();
+  }
 }

Reply via email to