This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1333ea7c4d7 IoTConsensus: Skip retry sending batch caused by
TApplicationException (#12243)
1333ea7c4d7 is described below
commit 1333ea7c4d7a579320b376451dbb954c5fe92359
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Wed Mar 27 18:29:29 2024 +0800
IoTConsensus: Skip retry sending batch caused by TApplicationException
(#12243)
---
.../consensus/iot/client/DispatchLogHandler.java | 23 +++++++++++++++++-----
1 file changed, 18 insertions(+), 5 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
index 88a8d90ebaa..0e03b97a111 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.consensus.iot.thrift.TSyncLogEntriesRes;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.thrift.TApplicationException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,10 +74,7 @@ public class DispatchLogHandler implements
AsyncMethodCallback<TSyncLogEntriesRe
messages);
sleepCorrespondingTimeAndRetryAsynchronous();
} else {
- thread.getSyncStatus().removeBatch(batch);
- // update safely deleted search index after last flushed sync index may
be updated by
- // removeBatch
- thread.updateSafelyDeletedSearchIndex();
+ completeBatch(batch);
}
logDispatcherThreadMetrics.recordSyncLogTimePerRequest(System.nanoTime() -
createTime);
}
@@ -91,12 +89,20 @@ public class DispatchLogHandler implements
AsyncMethodCallback<TSyncLogEntriesRe
public void onError(Exception exception) {
++retryCount;
if (logger.isWarnEnabled()) {
+ Throwable rootCause = ExceptionUtils.getRootCause(exception);
logger.warn(
"Can not send {} to peer for {} times {} because {}",
batch,
thread.getPeer(),
retryCount,
- ExceptionUtils.getRootCause(exception).toString());
+ rootCause.toString());
+ // skip TApplicationException caused by follower
+ if (rootCause instanceof TApplicationException) {
+ completeBatch(batch);
+ logger.warn("Skip retrying this Batch {} because of
TApplicationException.", batch);
+
logDispatcherThreadMetrics.recordSyncLogTimePerRequest(System.nanoTime() -
createTime);
+ return;
+ }
}
sleepCorrespondingTimeAndRetryAsynchronous();
}
@@ -127,4 +133,11 @@ public class DispatchLogHandler implements
AsyncMethodCallback<TSyncLogEntriesRe
sleepTime,
TimeUnit.MILLISECONDS);
}
+
+ private void completeBatch(Batch batch) {
+ thread.getSyncStatus().removeBatch(batch);
+ // update safely deleted search index after last flushed sync index may be
updated by
+ // removeBatch
+ thread.updateSafelyDeletedSearchIndex();
+ }
}