This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d34054dec5e Pipe: Fix the problem that a cluster crash in the
multi-cluster receiving end causes all synchronization to be suspended (#15962)
d34054dec5e is described below
commit d34054dec5e7bdaa472ca134a9e2cebe93c1772f
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Jul 22 10:40:30 2025 +0800
Pipe: Fix the problem that a cluster crash in the multi-cluster receiving
end causes all synchronization to be suspended (#15962)
* Pipe: Fix the problem that a cluster crash in the multi-cluster receiving
end causes all synchronization to be suspended
* add IsEndPointAlive Function
* add IsEndPointAlive Function
* add IsEndPointAlive Function
* add pipeCheckSyncAllClientLiveTimeIntervalMs config
* fix
* fix
* fix
* fix
---
.../client/IoTDBDataNodeAsyncClientManager.java | 49 +++++++++++++++++++++-
.../thrift/sync/IoTDBDataRegionSyncConnector.java | 5 +++
.../apache/iotdb/commons/conf/CommonConfig.java | 16 +++++++
.../iotdb/commons/pipe/config/PipeConfig.java | 7 ++++
.../iotdb/commons/pipe/config/PipeDescriptor.java | 5 +++
.../connector/client/IoTDBSyncClientManager.java | 14 +++++++
6 files changed, 95 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 75d6a08a664..d4c044071e9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -85,6 +85,8 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
private volatile boolean isClosed = false;
+ private final Map<TEndPoint, Long> unhealthyEndPointMap = new
ConcurrentHashMap<>();
+
public IoTDBDataNodeAsyncClientManager(
final List<TEndPoint> endPoints,
/* The following parameters are used locally. */
@@ -179,7 +181,7 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
public AsyncPipeDataTransferServiceClient borrowClient(final TEndPoint
endPoint)
throws Exception {
- if (!useLeaderCache || Objects.isNull(endPoint)) {
+ if (!useLeaderCache || Objects.isNull(endPoint) || isUnhealthy(endPoint)) {
return borrowClient();
}
@@ -313,10 +315,14 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
waitHandshakeFinished(isHandshakeFinished);
}
if (exception.get() != null) {
+ markUnhealthy(targetNodeUrl);
throw new PipeConnectionException("Failed to handshake.",
exception.get());
+ } else {
+ markHealthy(targetNodeUrl);
}
} catch (TException e) {
client.resetMethodStateIfStopped();
+ markUnhealthy(targetNodeUrl);
throw e;
} finally {
if (isClosed) {
@@ -423,8 +429,14 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
@Override
public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
final int clientSize = endPointList.size();
+ long n = 0;
while (true) {
final TEndPoint targetNodeUrl = endPointList.get((int)
(currentClientIndex++ % clientSize));
+ if (isUnhealthy(targetNodeUrl) && n < clientSize) {
+ n++;
+ continue;
+ }
+
final AsyncPipeDataTransferServiceClient client =
endPoint2Client.borrowClient(targetNodeUrl);
if (handshakeIfNecessary(targetNodeUrl, client)) {
@@ -438,8 +450,15 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
@Override
public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
final int clientSize = endPointList.size();
+ long n = 0;
+
while (true) {
final TEndPoint targetNodeUrl = endPointList.get((int) (Math.random()
* clientSize));
+ if (isUnhealthy(targetNodeUrl) && n <= clientSize) {
+ n++;
+ continue;
+ }
+
final AsyncPipeDataTransferServiceClient client =
endPoint2Client.borrowClient(targetNodeUrl);
if (handshakeIfNecessary(targetNodeUrl, client)) {
@@ -452,8 +471,15 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
private class PriorityLoadBalancer implements LoadBalancer {
@Override
public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
+ final int clientSize = endPointList.size();
+ long n = 0;
while (true) {
for (final TEndPoint targetNodeUrl : endPointList) {
+ if (isUnhealthy(targetNodeUrl) && n <= clientSize) {
+ n++;
+ continue;
+ }
+
final AsyncPipeDataTransferServiceClient client =
endPoint2Client.borrowClient(targetNodeUrl);
if (handshakeIfNecessary(targetNodeUrl, client)) {
@@ -463,4 +489,25 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
}
}
}
+
+ private boolean isUnhealthy(TEndPoint endPoint) {
+ Long downTime = unhealthyEndPointMap.get(endPoint);
+ if (downTime == null) {
+ return false;
+ }
+ if (System.currentTimeMillis() - downTime
+ >
PipeConfig.getInstance().getPipeCheckAllSyncClientLiveTimeIntervalMs()) {
+ markHealthy(endPoint);
+ return false;
+ }
+ return true;
+ }
+
+ private void markUnhealthy(TEndPoint endPoint) {
+ unhealthyEndPointMap.put(endPoint, System.currentTimeMillis());
+ }
+
+ private void markHealthy(TEndPoint endPoint) {
+ unhealthyEndPointMap.remove(endPoint);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index 391b1769c2d..1075cebac60 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -26,6 +26,7 @@ import
org.apache.iotdb.commons.pipe.connector.limiter.TsFileSendRateLimiter;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.utils.RetryUtils;
+import
org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeSyncClientManager;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventBatch;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventPlainBatch;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch;
@@ -623,4 +624,8 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
super.close();
}
+
+ public IoTDBDataNodeSyncClientManager getClientManager() {
+ return clientManager;
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 78dbe312b8b..a2bd1ee0f28 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -325,6 +325,7 @@ public class CommonConfig {
private double pipeThresholdAllocationStrategyLowUsageThreshold = 0.2d;
private double pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold
= 0.8d;
private boolean pipeTransferTsFileSync = false;
+ private long pipeCheckAllSyncClientLiveTimeIntervalMs = 5 * 60 * 1000L; // 5
minutes
private long twoStageAggregateMaxCombinerLiveTimeInMs = 8 * 60 * 1000L; // 8
minutes
private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000L;
// 3 minutes
@@ -1887,6 +1888,21 @@ public class CommonConfig {
logger.info("pipeTransferTsFileSync is set to {}", pipeTransferTsFileSync);
}
+ public long getPipeCheckAllSyncClientLiveTimeIntervalMs() {
+ return pipeCheckAllSyncClientLiveTimeIntervalMs;
+ }
+
+ public void setPipeCheckAllSyncClientLiveTimeIntervalMs(
+ long pipeCheckSyncAllClientLiveTimeIntervalMs) {
+ if (this.pipeCheckAllSyncClientLiveTimeIntervalMs ==
pipeCheckSyncAllClientLiveTimeIntervalMs) {
+ return;
+ }
+ this.pipeCheckAllSyncClientLiveTimeIntervalMs =
pipeCheckSyncAllClientLiveTimeIntervalMs;
+ logger.info(
+ "pipeCheckSyncAllClientLiveTimeIntervalMs is set to {}",
+ pipeCheckSyncAllClientLiveTimeIntervalMs);
+ }
+
public double getPipeSendTsFileRateLimitBytesPerSecond() {
return pipeSendTsFileRateLimitBytesPerSecond;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index f845ca7b75c..ca99d29a1d0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -289,6 +289,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeTransferTsFileSync();
}
+ public long getPipeCheckAllSyncClientLiveTimeIntervalMs() {
+ return COMMON_CONFIG.getPipeCheckAllSyncClientLiveTimeIntervalMs();
+ }
+
/////////////////////////////// Meta Consistency
///////////////////////////////
public boolean isSeperatedPipeHeartbeatEnabled() {
@@ -512,6 +516,9 @@ public class PipeConfig {
"PipePipeRemainingInsertEventCountAverage: {}",
getPipeRemainingInsertNodeCountEMAAlpha());
LOGGER.info("PipeTsFileScanParsingThreshold(): {}",
getPipeTsFileScanParsingThreshold());
LOGGER.info("PipeTransferTsFileSync: {}", isTransferTsFileSync());
+ LOGGER.info(
+ "PipeCheckAllSyncClientLiveTimeIntervalMs: {}",
+ getPipeCheckAllSyncClientLiveTimeIntervalMs());
LOGGER.info("PipeDynamicMemoryHistoryWeight: {}",
getPipeDynamicMemoryHistoryWeight());
LOGGER.info(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 5a8c0090c8d..3e31864e3d8 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -494,6 +494,11 @@ public class PipeDescriptor {
Boolean.parseBoolean(
properties.getProperty(
"pipe_transfer_tsfile_sync",
String.valueOf(config.getPipeTransferTsFileSync()))));
+ config.setPipeCheckAllSyncClientLiveTimeIntervalMs(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_check_all_sync_client_live_time_interval_ms",
+
String.valueOf(config.getPipeCheckAllSyncClientLiveTimeIntervalMs()))));
config.setPipeRemainingTimeCommitRateAutoSwitchSeconds(
Long.parseLong(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index 17a48e1dd9d..b87b9c59060 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -59,6 +59,8 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
new ConcurrentHashMap<>();
private final Map<TEndPoint, String> endPoint2HandshakeErrorMessage = new
ConcurrentHashMap<>();
+ private volatile long lastCheckClientStatusTimestamp = 0L;
+
private final LoadBalancer loadBalancer;
protected IoTDBSyncClientManager(
@@ -113,6 +115,17 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
}
public void checkClientStatusAndTryReconstructIfNecessary() {
+ if (System.currentTimeMillis() - lastCheckClientStatusTimestamp
+ <
PipeConfig.getInstance().getPipeCheckAllSyncClientLiveTimeIntervalMs()) {
+ // Check whether any clients are available, if any client is available,
return directly
+ for (final Pair<IoTDBSyncClient, Boolean> clientAndStatus :
+ endPoint2ClientAndStatus.values()) {
+ if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
+ return;
+ }
+ }
+ }
+
// Reconstruct all dead clients
for (final Map.Entry<TEndPoint, Pair<IoTDBSyncClient, Boolean>> entry :
endPoint2ClientAndStatus.entrySet()) {
@@ -126,6 +139,7 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
// Check whether any clients are available
for (final Pair<IoTDBSyncClient, Boolean> clientAndStatus :
endPoint2ClientAndStatus.values()) {
if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
+ lastCheckClientStatusTimestamp = System.currentTimeMillis();
return;
}
}