This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new efa2f771079 IoTV2: Make consensus event retry forever. (#15565)
efa2f771079 is described below

commit efa2f771079c0d8c3a82cccdcda284b7a81f60e1
Author: Peng Junzhi <[email protected]>
AuthorDate: Wed May 28 11:38:28 2025 +0800

    IoTV2: Make consensus event retry forever. (#15565)
---
 .../pipeconsensus/PipeConsensusSyncConnector.java  | 30 +++++++++++-----------
 ...sensusRetryWithIncreasingIntervalException.java |  9 ++++---
 .../task/subtask/PipeAbstractConnectorSubtask.java |  4 +--
 .../agent/task/subtask/PipeReportableSubtask.java  | 30 ++++++++++++----------
 .../pipe/receiver/PipeReceiverStatusHandler.java   |  5 ++--
 5 files changed, 41 insertions(+), 37 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
index 9c2953cf79b..d5ba9d04120 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.client.IClientManager;
 import 
org.apache.iotdb.commons.client.container.PipeConsensusClientMgrContainer;
 import org.apache.iotdb.commons.client.sync.SyncPipeConsensusServiceClient;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorRetryTimesConfigurableException;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import 
org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.response.PipeConsensusTransferFilePieceResp;
 import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
@@ -53,7 +54,6 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
-import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -146,11 +146,11 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
         pipeConsensusConnectorMetrics.recordRetryWALTransferTimer(duration);
       }
     } catch (final Exception e) {
-      throw new PipeConnectionException(
+      throw new PipeRuntimeConnectorRetryTimesConfigurableException(
           String.format(
               "Failed to transfer tablet insertion event %s, because %s.",
               tabletInsertionEvent, e.getMessage()),
-          e);
+          Integer.MAX_VALUE);
     }
   }
 
@@ -169,11 +169,11 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
       final long duration = System.nanoTime() - startTime;
       pipeConsensusConnectorMetrics.recordRetryTsFileTransferTimer(duration);
     } catch (Exception e) {
-      throw new PipeConnectionException(
+      throw new PipeRuntimeConnectorRetryTimesConfigurableException(
           String.format(
               "Failed to transfer tsfile insertion event %s, because %s.",
               tsFileInsertionEvent, e.getMessage()),
-          e);
+          Integer.MAX_VALUE);
     }
   }
 
@@ -215,14 +215,14 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
 
       tabletBatchBuilder.onSuccess();
     } catch (final Exception e) {
-      throw new PipeConnectionException(
+      throw new PipeRuntimeConnectorRetryTimesConfigurableException(
           String.format(
               PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
               getFollowerUrl().getIp(),
               getFollowerUrl().getPort(),
               TABLET_BATCH_SCENARIO,
               e.getMessage()),
-          e);
+          Integer.MAX_VALUE);
     }
   }
 
@@ -265,14 +265,14 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
                   progressIndex,
                   thisDataNodeId));
     } catch (final Exception e) {
-      throw new PipeConnectionException(
+      throw new PipeRuntimeConnectorRetryTimesConfigurableException(
           String.format(
               PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
               getFollowerUrl().getIp(),
               getFollowerUrl().getPort(),
               DELETION_SCENARIO,
               e.getMessage()),
-          e);
+          Integer.MAX_VALUE);
     }
 
     final TSStatus status = resp.getStatus();
@@ -344,14 +344,14 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
                     thisDataNodeId));
       }
     } catch (final Exception e) {
-      throw new PipeConnectionException(
+      throw new PipeRuntimeConnectorRetryTimesConfigurableException(
           String.format(
               PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
               getFollowerUrl().getIp(),
               getFollowerUrl().getPort(),
               TABLET_INSERTION_NODE_SCENARIO,
               e.getMessage()),
-          e);
+          Integer.MAX_VALUE);
     }
 
     final TSStatus status = resp.getStatus();
@@ -418,14 +418,14 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
                     thisDataNodeId));
       }
     } catch (final Exception e) {
-      throw new PipeConnectionException(
+      throw new PipeRuntimeConnectorRetryTimesConfigurableException(
           String.format(
               PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
               getFollowerUrl().getIp(),
               getFollowerUrl().getPort(),
               TSFILE_SCENARIO,
               e.getMessage()),
-          e);
+          Integer.MAX_VALUE);
     }
 
     final TSStatus status = resp.getStatus();
@@ -483,10 +483,10 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
                               tConsensusGroupId,
                               thisDataNodeId)));
         } catch (Exception e) {
-          throw new PipeConnectionException(
+          throw new PipeRuntimeConnectorRetryTimesConfigurableException(
               String.format(
                   "Network error when transfer file %s, because %s.", file, 
e.getMessage()),
-              e);
+              Integer.MAX_VALUE);
         }
 
         position += readLength;
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConsensusRetryWithIncreasingIntervalException.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeConsensusRetryWithIncreasingIntervalException.java
similarity index 78%
rename from 
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConsensusRetryWithIncreasingIntervalException.java
rename to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeConsensusRetryWithIncreasingIntervalException.java
index 8bc68ef9158..10a84ae520e 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConsensusRetryWithIncreasingIntervalException.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeConsensusRetryWithIncreasingIntervalException.java
@@ -17,11 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.pipe.api.exception;
+package org.apache.iotdb.commons.exception.pipe;
 
-public class PipeConsensusRetryWithIncreasingIntervalException extends 
PipeException {
+public class PipeConsensusRetryWithIncreasingIntervalException
+    extends PipeRuntimeConnectorRetryTimesConfigurableException {
 
-  public PipeConsensusRetryWithIncreasingIntervalException(String message) {
-    super(message);
+  public PipeConsensusRetryWithIncreasingIntervalException(String message, int 
retryTimes) {
+    super(message, retryTimes);
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
index 9382aa0f671..cfd987758e4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
-import 
org.apache.iotdb.pipe.api.exception.PipeConsensusRetryWithIncreasingIntervalException;
 
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -124,8 +123,7 @@ public abstract class PipeAbstractConnectorSubtask extends 
PipeReportableSubtask
     // Notice that the PipeRuntimeConnectorCriticalException must be thrown 
here
     // because the upper layer relies on this to stop all the related pipe 
tasks
     // Other exceptions may cause the subtask to stop forever and can not be 
restarted
-    if (throwable instanceof PipeRuntimeConnectorCriticalException
-        || throwable instanceof 
PipeConsensusRetryWithIncreasingIntervalException) {
+    if (throwable instanceof PipeRuntimeConnectorCriticalException) {
       super.onFailure(throwable);
     } else {
       // Print stack trace for better debugging
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
index 77681522a8f..228229394b7 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
@@ -19,11 +19,11 @@
 
 package org.apache.iotdb.commons.pipe.agent.task.subtask;
 
+import 
org.apache.iotdb.commons.exception.pipe.PipeConsensusRetryWithIncreasingIntervalException;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorRetryTimesConfigurableException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import 
org.apache.iotdb.pipe.api.exception.PipeConsensusRetryWithIncreasingIntervalException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,6 +56,20 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
     // is dropped or the process is running normally.
   }
 
+  private long getSleepIntervalBasedOnThrowable(final Throwable throwable) {
+    long sleepInterval = Math.min(1000L * retryCount.get(), 10000);
+    // if receiver is read-only/internal-error/write-reject, connector will 
retry with
+    // power-increasing interval
+    if (throwable instanceof 
PipeConsensusRetryWithIncreasingIntervalException) {
+      if (retryCount.get() >= 5) {
+        sleepInterval = 1000L * 20;
+      } else {
+        sleepInterval = 1000L * retryCount.get() * retryCount.get();
+      }
+    }
+    return sleepInterval;
+  }
+
   private void onEnrichedEventFailure(final Throwable throwable) {
     final int maxRetryTimes =
         throwable instanceof 
PipeRuntimeConnectorRetryTimesConfigurableException
@@ -85,7 +99,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
           throwable.getMessage(),
           throwable);
       try {
-        Thread.sleep(Math.min(1000L * retryCount.get(), 10000));
+        Thread.sleep(getSleepIntervalBasedOnThrowable(throwable));
       } catch (final InterruptedException e) {
         LOGGER.warn(
             "Interrupted when retrying to execute subtask {} (creation time: 
{}, simple class: {})",
@@ -152,17 +166,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
         throwable.getMessage(),
         throwable);
     try {
-      long sleepInterval = Math.min(1000L * retryCount.get(), 10000);
-      // if receiver is read-only/internal-error/write-reject, connector will 
retry will
-      // power-increasing interval
-      if (throwable instanceof 
PipeConsensusRetryWithIncreasingIntervalException) {
-        if (retryCount.get() >= 5) {
-          sleepInterval = 1000L * 20;
-        } else {
-          sleepInterval = 1000L * retryCount.get() * retryCount.get();
-        }
-      }
-      Thread.sleep(sleepInterval);
+      Thread.sleep(getSleepIntervalBasedOnThrowable(throwable));
     } catch (final InterruptedException e) {
       LOGGER.warn(
           "Interrupted when retrying to execute subtask {} (creation time: {}, 
simple class: {})",
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index f918fb644ff..83665780f77 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -20,12 +20,12 @@
 package org.apache.iotdb.commons.pipe.receiver;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import 
org.apache.iotdb.commons.exception.pipe.PipeConsensusRetryWithIncreasingIntervalException;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorRetryTimesConfigurableException;
 import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeSubtask;
 import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.pipe.api.event.Event;
-import 
org.apache.iotdb.pipe.api.exception.PipeConsensusRetryWithIncreasingIntervalException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -98,7 +98,8 @@ public class PipeReceiverStatusHandler {
 
     if (RetryUtils.needRetryForConsensus(status.getCode())) {
       LOGGER.info("IoTConsensusV2: will retry with increasing interval. 
status: {}", status);
-      throw new 
PipeConsensusRetryWithIncreasingIntervalException(exceptionMessage);
+      throw new PipeConsensusRetryWithIncreasingIntervalException(
+          exceptionMessage, Integer.MAX_VALUE);
     }
 
     switch (status.getCode()) {

Reply via email to