This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new bcb18d24a01 IoTConsensusV2: Retry with power-increasing interval when
follower is read-only (#15075)
bcb18d24a01 is described below
commit bcb18d24a01eadea7369d884d2dace7563552d0c
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Mar 14 17:32:05 2025 +0800
IoTConsensusV2: Retry with power-increasing interval when follower is
read-only (#15075)
* retry with power-increasing interval when follower is read-only
* fix review
* fix review
* fix review
* fix review
* fix review
---
...sensusRetryWithIncreasingIntervalException.java | 29 +++-------------------
.../consensus/iot/client/DispatchLogHandler.java | 27 +++++++++-----------
.../task/subtask/PipeAbstractConnectorSubtask.java | 4 ++-
.../agent/task/subtask/PipeReportableSubtask.java | 13 +++++++++-
.../pipe/receiver/PipeReceiverStatusHandler.java | 8 ++++++
.../org/apache/iotdb/commons/utils/RetryUtils.java | 8 ++++++
6 files changed, 47 insertions(+), 42 deletions(-)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConsensusRetryWithIncreasingIntervalException.java
similarity index 58%
copy from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
copy to
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConsensusRetryWithIncreasingIntervalException.java
index 19a6456ec30..8bc68ef9158 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConsensusRetryWithIncreasingIntervalException.java
@@ -17,32 +17,11 @@
* under the License.
*/
-package org.apache.iotdb.commons.utils;
+package org.apache.iotdb.pipe.api.exception;
-public class RetryUtils {
+public class PipeConsensusRetryWithIncreasingIntervalException extends
PipeException {
- public interface CallableWithException<T, E extends Exception> {
- T call() throws E;
- }
-
- public static final int MAX_RETRIES = 3;
-
- public static <T, E extends Exception> T retryOnException(
- final CallableWithException<T, E> callable) throws E {
- int attempt = 0;
- while (true) {
- try {
- return callable.call();
- } catch (Exception e) {
- attempt++;
- if (attempt >= MAX_RETRIES) {
- throw e;
- }
- }
- }
- }
-
- private RetryUtils() {
- // utility class
+ public PipeConsensusRetryWithIncreasingIntervalException(String message) {
+ super(message);
}
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
index 9b0979bf6be..d587e25ee19 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.consensus.iot.client;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.consensus.iot.logdispatcher.Batch;
import
org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher.LogDispatcherThread;
import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcherThreadMetrics;
@@ -45,6 +46,7 @@ public class DispatchLogHandler implements
AsyncMethodCallback<TSyncLogEntriesRe
private final long createTime;
private final LogDispatcherThreadMetrics logDispatcherThreadMetrics;
private int retryCount;
+ private long retryInterval;
public DispatchLogHandler(
LogDispatcherThread thread,
@@ -54,14 +56,16 @@ public class DispatchLogHandler implements
AsyncMethodCallback<TSyncLogEntriesRe
this.logDispatcherThreadMetrics = logDispatcherThreadMetrics;
this.batch = batch;
this.createTime = System.nanoTime();
+ this.retryInterval =
thread.getConfig().getReplication().getBasicRetryWaitTimeMs();
}
@Override
public void onComplete(TSyncLogEntriesRes response) {
- if (response.getStatuses().stream().anyMatch(status ->
needRetry(status.getCode()))) {
+ if (response.getStatuses().stream()
+ .anyMatch(status ->
RetryUtils.needRetryForConsensus(status.getCode()))) {
List<String> retryStatusMessages =
response.getStatuses().stream()
- .filter(status -> needRetry(status.getCode()))
+ .filter(status ->
RetryUtils.needRetryForConsensus(status.getCode()))
.map(TSStatus::getMessage)
.collect(Collectors.toList());
@@ -92,12 +96,6 @@ public class DispatchLogHandler implements
AsyncMethodCallback<TSyncLogEntriesRe
logDispatcherThreadMetrics.recordSyncLogTimePerRequest(System.nanoTime() -
createTime);
}
- public static boolean needRetry(int statusCode) {
- return statusCode == TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()
- || statusCode == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
- || statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode();
- }
-
@Override
public void onError(Exception exception) {
++retryCount;
@@ -119,12 +117,11 @@ public class DispatchLogHandler implements
AsyncMethodCallback<TSyncLogEntriesRe
}
private void sleepCorrespondingTimeAndRetryAsynchronous() {
- long sleepTime =
- Math.min(
- (long)
- (thread.getConfig().getReplication().getBasicRetryWaitTimeMs()
- * Math.pow(2, retryCount)),
- thread.getConfig().getReplication().getMaxRetryWaitTimeMs());
+ if (retryInterval !=
thread.getConfig().getReplication().getMaxRetryWaitTimeMs()) {
+ retryInterval =
+ Math.min(retryInterval * 2,
thread.getConfig().getReplication().getMaxRetryWaitTimeMs());
+ }
+
thread
.getImpl()
.getBackgroundTaskService()
@@ -141,7 +138,7 @@ public class DispatchLogHandler implements
AsyncMethodCallback<TSyncLogEntriesRe
thread.sendBatchAsync(batch, this);
}
},
- sleepTime,
+ retryInterval,
TimeUnit.MILLISECONDS);
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
index cfd987758e4..9382aa0f671 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
+import
org.apache.iotdb.pipe.api.exception.PipeConsensusRetryWithIncreasingIntervalException;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -123,7 +124,8 @@ public abstract class PipeAbstractConnectorSubtask extends
PipeReportableSubtask
// Notice that the PipeRuntimeConnectorCriticalException must be thrown
here
// because the upper layer relies on this to stop all the related pipe
tasks
// Other exceptions may cause the subtask to stop forever and can not be
restarted
- if (throwable instanceof PipeRuntimeConnectorCriticalException) {
+ if (throwable instanceof PipeRuntimeConnectorCriticalException
+ || throwable instanceof
PipeConsensusRetryWithIncreasingIntervalException) {
super.onFailure(throwable);
} else {
// Print stack trace for better debugging
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
index aa50bdd7576..77681522a8f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorRetryTimesCon
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import
org.apache.iotdb.pipe.api.exception.PipeConsensusRetryWithIncreasingIntervalException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -151,7 +152,17 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
throwable.getMessage(),
throwable);
try {
- Thread.sleep(Math.min(1000L * retryCount.get(), 10000));
+ long sleepInterval = Math.min(1000L * retryCount.get(), 10000);
+ // if receiver is read-only/internal-error/write-reject, connector will
retry will
+ // power-increasing interval
+ if (throwable instanceof
PipeConsensusRetryWithIncreasingIntervalException) {
+ if (retryCount.get() >= 5) {
+ sleepInterval = 1000L * 20;
+ } else {
+ sleepInterval = 1000L * retryCount.get() * retryCount.get();
+ }
+ }
+ Thread.sleep(sleepInterval);
} catch (final InterruptedException e) {
LOGGER.warn(
"Interrupted when retrying to execute subtask {} (creation time: {},
simple class: {})",
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index 0f87d361bb0..dca8563d79c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -23,7 +23,9 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorRetryTimesConfigurableException;
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeSubtask;
+import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.pipe.api.event.Event;
+import
org.apache.iotdb.pipe.api.exception.PipeConsensusRetryWithIncreasingIntervalException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -90,6 +92,12 @@ public class PipeReceiverStatusHandler {
*/
public void handle(
final TSStatus status, final String exceptionMessage, final String
recordMessage) {
+
+ if (RetryUtils.needRetryForConsensus(status.getCode())) {
+ LOGGER.info("IoTConsensusV2: will retry with increasing interval.
status: {}", status);
+ throw new
PipeConsensusRetryWithIncreasingIntervalException(exceptionMessage);
+ }
+
switch (status.getCode()) {
case 200: // SUCCESS_STATUS
case 400: // REDIRECTION_RECOMMEND
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
index 19a6456ec30..c0ea52e6b74 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
@@ -19,12 +19,20 @@
package org.apache.iotdb.commons.utils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
public class RetryUtils {
public interface CallableWithException<T, E extends Exception> {
T call() throws E;
}
+ public static boolean needRetryForConsensus(int statusCode) {
+ return statusCode == TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()
+ || statusCode == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
+ || statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode();
+ }
+
public static final int MAX_RETRIES = 3;
public static <T, E extends Exception> T retryOnException(