This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new efa2f771079 IoTV2: Make consensus event retry forever. (#15565)
efa2f771079 is described below
commit efa2f771079c0d8c3a82cccdcda284b7a81f60e1
Author: Peng Junzhi <[email protected]>
AuthorDate: Wed May 28 11:38:28 2025 +0800
IoTV2: Make consensus event retry forever. (#15565)
---
.../pipeconsensus/PipeConsensusSyncConnector.java | 30 +++++++++++-----------
...sensusRetryWithIncreasingIntervalException.java | 9 ++++---
.../task/subtask/PipeAbstractConnectorSubtask.java | 4 +--
.../agent/task/subtask/PipeReportableSubtask.java | 30 ++++++++++++----------
.../pipe/receiver/PipeReceiverStatusHandler.java | 5 ++--
5 files changed, 41 insertions(+), 37 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
index 9c2953cf79b..d5ba9d04120 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.client.IClientManager;
import
org.apache.iotdb.commons.client.container.PipeConsensusClientMgrContainer;
import org.apache.iotdb.commons.client.sync.SyncPipeConsensusServiceClient;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorRetryTimesConfigurableException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import
org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.response.PipeConsensusTransferFilePieceResp;
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
@@ -53,7 +54,6 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
-import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -146,11 +146,11 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
pipeConsensusConnectorMetrics.recordRetryWALTransferTimer(duration);
}
} catch (final Exception e) {
- throw new PipeConnectionException(
+ throw new PipeRuntimeConnectorRetryTimesConfigurableException(
String.format(
"Failed to transfer tablet insertion event %s, because %s.",
tabletInsertionEvent, e.getMessage()),
- e);
+ Integer.MAX_VALUE);
}
}
@@ -169,11 +169,11 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
final long duration = System.nanoTime() - startTime;
pipeConsensusConnectorMetrics.recordRetryTsFileTransferTimer(duration);
} catch (Exception e) {
- throw new PipeConnectionException(
+ throw new PipeRuntimeConnectorRetryTimesConfigurableException(
String.format(
"Failed to transfer tsfile insertion event %s, because %s.",
tsFileInsertionEvent, e.getMessage()),
- e);
+ Integer.MAX_VALUE);
}
}
@@ -215,14 +215,14 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
tabletBatchBuilder.onSuccess();
} catch (final Exception e) {
- throw new PipeConnectionException(
+ throw new PipeRuntimeConnectorRetryTimesConfigurableException(
String.format(
PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
getFollowerUrl().getIp(),
getFollowerUrl().getPort(),
TABLET_BATCH_SCENARIO,
e.getMessage()),
- e);
+ Integer.MAX_VALUE);
}
}
@@ -265,14 +265,14 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
progressIndex,
thisDataNodeId));
} catch (final Exception e) {
- throw new PipeConnectionException(
+ throw new PipeRuntimeConnectorRetryTimesConfigurableException(
String.format(
PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
getFollowerUrl().getIp(),
getFollowerUrl().getPort(),
DELETION_SCENARIO,
e.getMessage()),
- e);
+ Integer.MAX_VALUE);
}
final TSStatus status = resp.getStatus();
@@ -344,14 +344,14 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
thisDataNodeId));
}
} catch (final Exception e) {
- throw new PipeConnectionException(
+ throw new PipeRuntimeConnectorRetryTimesConfigurableException(
String.format(
PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
getFollowerUrl().getIp(),
getFollowerUrl().getPort(),
TABLET_INSERTION_NODE_SCENARIO,
e.getMessage()),
- e);
+ Integer.MAX_VALUE);
}
final TSStatus status = resp.getStatus();
@@ -418,14 +418,14 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
thisDataNodeId));
}
} catch (final Exception e) {
- throw new PipeConnectionException(
+ throw new PipeRuntimeConnectorRetryTimesConfigurableException(
String.format(
PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
getFollowerUrl().getIp(),
getFollowerUrl().getPort(),
TSFILE_SCENARIO,
e.getMessage()),
- e);
+ Integer.MAX_VALUE);
}
final TSStatus status = resp.getStatus();
@@ -483,10 +483,10 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
tConsensusGroupId,
thisDataNodeId)));
} catch (Exception e) {
- throw new PipeConnectionException(
+ throw new PipeRuntimeConnectorRetryTimesConfigurableException(
String.format(
"Network error when transfer file %s, because %s.", file,
e.getMessage()),
- e);
+ Integer.MAX_VALUE);
}
position += readLength;
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConsensusRetryWithIncreasingIntervalException.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeConsensusRetryWithIncreasingIntervalException.java
similarity index 78%
rename from
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConsensusRetryWithIncreasingIntervalException.java
rename to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeConsensusRetryWithIncreasingIntervalException.java
index 8bc68ef9158..10a84ae520e 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConsensusRetryWithIncreasingIntervalException.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeConsensusRetryWithIncreasingIntervalException.java
@@ -17,11 +17,12 @@
* under the License.
*/
-package org.apache.iotdb.pipe.api.exception;
+package org.apache.iotdb.commons.exception.pipe;
-public class PipeConsensusRetryWithIncreasingIntervalException extends
PipeException {
+public class PipeConsensusRetryWithIncreasingIntervalException
+ extends PipeRuntimeConnectorRetryTimesConfigurableException {
- public PipeConsensusRetryWithIncreasingIntervalException(String message) {
- super(message);
+ public PipeConsensusRetryWithIncreasingIntervalException(String message, int
retryTimes) {
+ super(message, retryTimes);
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
index 9382aa0f671..cfd987758e4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
-import
org.apache.iotdb.pipe.api.exception.PipeConsensusRetryWithIncreasingIntervalException;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -124,8 +123,7 @@ public abstract class PipeAbstractConnectorSubtask extends
PipeReportableSubtask
// Notice that the PipeRuntimeConnectorCriticalException must be thrown
here
// because the upper layer relies on this to stop all the related pipe
tasks
// Other exceptions may cause the subtask to stop forever and can not be
restarted
- if (throwable instanceof PipeRuntimeConnectorCriticalException
- || throwable instanceof
PipeConsensusRetryWithIncreasingIntervalException) {
+ if (throwable instanceof PipeRuntimeConnectorCriticalException) {
super.onFailure(throwable);
} else {
// Print stack trace for better debugging
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
index 77681522a8f..228229394b7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
@@ -19,11 +19,11 @@
package org.apache.iotdb.commons.pipe.agent.task.subtask;
+import
org.apache.iotdb.commons.exception.pipe.PipeConsensusRetryWithIncreasingIntervalException;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorRetryTimesConfigurableException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import
org.apache.iotdb.pipe.api.exception.PipeConsensusRetryWithIncreasingIntervalException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +56,20 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
// is dropped or the process is running normally.
}
+ private long getSleepIntervalBasedOnThrowable(final Throwable throwable) {
+ long sleepInterval = Math.min(1000L * retryCount.get(), 10000);
+ // if receiver is read-only/internal-error/write-reject, connector will
retry with
+ // power-increasing interval
+ if (throwable instanceof
PipeConsensusRetryWithIncreasingIntervalException) {
+ if (retryCount.get() >= 5) {
+ sleepInterval = 1000L * 20;
+ } else {
+ sleepInterval = 1000L * retryCount.get() * retryCount.get();
+ }
+ }
+ return sleepInterval;
+ }
+
private void onEnrichedEventFailure(final Throwable throwable) {
final int maxRetryTimes =
throwable instanceof
PipeRuntimeConnectorRetryTimesConfigurableException
@@ -85,7 +99,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
throwable.getMessage(),
throwable);
try {
- Thread.sleep(Math.min(1000L * retryCount.get(), 10000));
+ Thread.sleep(getSleepIntervalBasedOnThrowable(throwable));
} catch (final InterruptedException e) {
LOGGER.warn(
"Interrupted when retrying to execute subtask {} (creation time:
{}, simple class: {})",
@@ -152,17 +166,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
throwable.getMessage(),
throwable);
try {
- long sleepInterval = Math.min(1000L * retryCount.get(), 10000);
- // if receiver is read-only/internal-error/write-reject, connector will
retry will
- // power-increasing interval
- if (throwable instanceof
PipeConsensusRetryWithIncreasingIntervalException) {
- if (retryCount.get() >= 5) {
- sleepInterval = 1000L * 20;
- } else {
- sleepInterval = 1000L * retryCount.get() * retryCount.get();
- }
- }
- Thread.sleep(sleepInterval);
+ Thread.sleep(getSleepIntervalBasedOnThrowable(throwable));
} catch (final InterruptedException e) {
LOGGER.warn(
"Interrupted when retrying to execute subtask {} (creation time: {},
simple class: {})",
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index f918fb644ff..83665780f77 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -20,12 +20,12 @@
package org.apache.iotdb.commons.pipe.receiver;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import
org.apache.iotdb.commons.exception.pipe.PipeConsensusRetryWithIncreasingIntervalException;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorRetryTimesConfigurableException;
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeSubtask;
import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.pipe.api.event.Event;
-import
org.apache.iotdb.pipe.api.exception.PipeConsensusRetryWithIncreasingIntervalException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -98,7 +98,8 @@ public class PipeReceiverStatusHandler {
if (RetryUtils.needRetryForConsensus(status.getCode())) {
LOGGER.info("IoTConsensusV2: will retry with increasing interval.
status: {}", status);
- throw new
PipeConsensusRetryWithIncreasingIntervalException(exceptionMessage);
+ throw new PipeConsensusRetryWithIncreasingIntervalException(
+ exceptionMessage, Integer.MAX_VALUE);
}
switch (status.getCode()) {