This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new bcb18d24a01 IoTConsensusV2: Retry with power-increasing interval when 
follower is read-only (#15075)
bcb18d24a01 is described below

commit bcb18d24a01eadea7369d884d2dace7563552d0c
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Mar 14 17:32:05 2025 +0800

    IoTConsensusV2: Retry with power-increasing interval when follower is 
read-only (#15075)
    
    * retry with power-increasing interval when follower is read-only
    
    * fix review
    
    * fix review
    
    * fix review
    
    * fix review
    
    * fix review
---
 ...sensusRetryWithIncreasingIntervalException.java | 29 +++-------------------
 .../consensus/iot/client/DispatchLogHandler.java   | 27 +++++++++-----------
 .../task/subtask/PipeAbstractConnectorSubtask.java |  4 ++-
 .../agent/task/subtask/PipeReportableSubtask.java  | 13 +++++++++-
 .../pipe/receiver/PipeReceiverStatusHandler.java   |  8 ++++++
 .../org/apache/iotdb/commons/utils/RetryUtils.java |  8 ++++++
 6 files changed, 47 insertions(+), 42 deletions(-)

diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConsensusRetryWithIncreasingIntervalException.java
similarity index 58%
copy from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
copy to 
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConsensusRetryWithIncreasingIntervalException.java
index 19a6456ec30..8bc68ef9158 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConsensusRetryWithIncreasingIntervalException.java
@@ -17,32 +17,11 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.utils;
+package org.apache.iotdb.pipe.api.exception;
 
-public class RetryUtils {
+public class PipeConsensusRetryWithIncreasingIntervalException extends 
PipeException {
 
-  public interface CallableWithException<T, E extends Exception> {
-    T call() throws E;
-  }
-
-  public static final int MAX_RETRIES = 3;
-
-  public static <T, E extends Exception> T retryOnException(
-      final CallableWithException<T, E> callable) throws E {
-    int attempt = 0;
-    while (true) {
-      try {
-        return callable.call();
-      } catch (Exception e) {
-        attempt++;
-        if (attempt >= MAX_RETRIES) {
-          throw e;
-        }
-      }
-    }
-  }
-
-  private RetryUtils() {
-    // utility class
+  public PipeConsensusRetryWithIncreasingIntervalException(String message) {
+    super(message);
   }
 }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
index 9b0979bf6be..d587e25ee19 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.consensus.iot.client;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.consensus.iot.logdispatcher.Batch;
 import 
org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher.LogDispatcherThread;
 import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcherThreadMetrics;
@@ -45,6 +46,7 @@ public class DispatchLogHandler implements 
AsyncMethodCallback<TSyncLogEntriesRe
   private final long createTime;
   private final LogDispatcherThreadMetrics logDispatcherThreadMetrics;
   private int retryCount;
+  private long retryInterval;
 
   public DispatchLogHandler(
       LogDispatcherThread thread,
@@ -54,14 +56,16 @@ public class DispatchLogHandler implements 
AsyncMethodCallback<TSyncLogEntriesRe
     this.logDispatcherThreadMetrics = logDispatcherThreadMetrics;
     this.batch = batch;
     this.createTime = System.nanoTime();
+    this.retryInterval = 
thread.getConfig().getReplication().getBasicRetryWaitTimeMs();
   }
 
   @Override
   public void onComplete(TSyncLogEntriesRes response) {
-    if (response.getStatuses().stream().anyMatch(status -> 
needRetry(status.getCode()))) {
+    if (response.getStatuses().stream()
+        .anyMatch(status -> 
RetryUtils.needRetryForConsensus(status.getCode()))) {
       List<String> retryStatusMessages =
           response.getStatuses().stream()
-              .filter(status -> needRetry(status.getCode()))
+              .filter(status -> 
RetryUtils.needRetryForConsensus(status.getCode()))
               .map(TSStatus::getMessage)
               .collect(Collectors.toList());
 
@@ -92,12 +96,6 @@ public class DispatchLogHandler implements 
AsyncMethodCallback<TSyncLogEntriesRe
     logDispatcherThreadMetrics.recordSyncLogTimePerRequest(System.nanoTime() - 
createTime);
   }
 
-  public static boolean needRetry(int statusCode) {
-    return statusCode == TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()
-        || statusCode == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
-        || statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode();
-  }
-
   @Override
   public void onError(Exception exception) {
     ++retryCount;
@@ -119,12 +117,11 @@ public class DispatchLogHandler implements 
AsyncMethodCallback<TSyncLogEntriesRe
   }
 
   private void sleepCorrespondingTimeAndRetryAsynchronous() {
-    long sleepTime =
-        Math.min(
-            (long)
-                (thread.getConfig().getReplication().getBasicRetryWaitTimeMs()
-                    * Math.pow(2, retryCount)),
-            thread.getConfig().getReplication().getMaxRetryWaitTimeMs());
+    if (retryInterval != 
thread.getConfig().getReplication().getMaxRetryWaitTimeMs()) {
+      retryInterval =
+          Math.min(retryInterval * 2, 
thread.getConfig().getReplication().getMaxRetryWaitTimeMs());
+    }
+
     thread
         .getImpl()
         .getBackgroundTaskService()
@@ -141,7 +138,7 @@ public class DispatchLogHandler implements 
AsyncMethodCallback<TSyncLogEntriesRe
                 thread.sendBatchAsync(batch, this);
               }
             },
-            sleepTime,
+            retryInterval,
             TimeUnit.MILLISECONDS);
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
index cfd987758e4..9382aa0f671 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
+import 
org.apache.iotdb.pipe.api.exception.PipeConsensusRetryWithIncreasingIntervalException;
 
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -123,7 +124,8 @@ public abstract class PipeAbstractConnectorSubtask extends 
PipeReportableSubtask
     // Notice that the PipeRuntimeConnectorCriticalException must be thrown 
here
     // because the upper layer relies on this to stop all the related pipe 
tasks
     // Other exceptions may cause the subtask to stop forever and can not be 
restarted
-    if (throwable instanceof PipeRuntimeConnectorCriticalException) {
+    if (throwable instanceof PipeRuntimeConnectorCriticalException
+        || throwable instanceof 
PipeConsensusRetryWithIncreasingIntervalException) {
       super.onFailure(throwable);
     } else {
       // Print stack trace for better debugging
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
index aa50bdd7576..77681522a8f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorRetryTimesCon
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import 
org.apache.iotdb.pipe.api.exception.PipeConsensusRetryWithIncreasingIntervalException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -151,7 +152,17 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
         throwable.getMessage(),
         throwable);
     try {
-      Thread.sleep(Math.min(1000L * retryCount.get(), 10000));
+      long sleepInterval = Math.min(1000L * retryCount.get(), 10000);
+      // if receiver is read-only/internal-error/write-reject, connector will 
retry will
+      // power-increasing interval
+      if (throwable instanceof 
PipeConsensusRetryWithIncreasingIntervalException) {
+        if (retryCount.get() >= 5) {
+          sleepInterval = 1000L * 20;
+        } else {
+          sleepInterval = 1000L * retryCount.get() * retryCount.get();
+        }
+      }
+      Thread.sleep(sleepInterval);
     } catch (final InterruptedException e) {
       LOGGER.warn(
           "Interrupted when retrying to execute subtask {} (creation time: {}, 
simple class: {})",
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index 0f87d361bb0..dca8563d79c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -23,7 +23,9 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorRetryTimesConfigurableException;
 import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeSubtask;
+import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.pipe.api.event.Event;
+import 
org.apache.iotdb.pipe.api.exception.PipeConsensusRetryWithIncreasingIntervalException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -90,6 +92,12 @@ public class PipeReceiverStatusHandler {
    */
   public void handle(
       final TSStatus status, final String exceptionMessage, final String 
recordMessage) {
+
+    if (RetryUtils.needRetryForConsensus(status.getCode())) {
+      LOGGER.info("IoTConsensusV2: will retry with increasing interval. 
status: {}", status);
+      throw new 
PipeConsensusRetryWithIncreasingIntervalException(exceptionMessage);
+    }
+
     switch (status.getCode()) {
       case 200: // SUCCESS_STATUS
       case 400: // REDIRECTION_RECOMMEND
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
index 19a6456ec30..c0ea52e6b74 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
@@ -19,12 +19,20 @@
 
 package org.apache.iotdb.commons.utils;
 
+import org.apache.iotdb.rpc.TSStatusCode;
+
 public class RetryUtils {
 
   public interface CallableWithException<T, E extends Exception> {
     T call() throws E;
   }
 
+  public static boolean needRetryForConsensus(int statusCode) {
+    return statusCode == TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()
+        || statusCode == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
+        || statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode();
+  }
+
   public static final int MAX_RETRIES = 3;
 
   public static <T, E extends Exception> T retryOnException(

Reply via email to