This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/1.3.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 42b110729bf4f2fcd038ef216beb37219fe7a774 Author: Zhenyu Luo <[email protected]> AuthorDate: Tue Jul 22 11:34:56 2025 +0800 Pipe: Fix the problem that a cluster crash in the multi-cluster receiving end causes all synchronization to be suspended (#15962) (#15991) * Pipe: Fix the problem that a cluster crash in the multi-cluster receiving end causes all synchronization to be suspended * add IsEndPointAlive Function * add IsEndPointAlive Function * add IsEndPointAlive Function * add pipeCheckSyncAllClientLiveTimeIntervalMs config * fix * fix * fix * fix (cherry picked from commit d34054dec5e7bdaa472ca134a9e2cebe93c1772f) --- .../client/IoTDBDataNodeAsyncClientManager.java | 49 +++++++++++++++++++++- .../thrift/sync/IoTDBDataRegionSyncConnector.java | 5 +++ .../apache/iotdb/commons/conf/CommonConfig.java | 16 +++++++ .../iotdb/commons/pipe/config/PipeConfig.java | 7 ++++ .../iotdb/commons/pipe/config/PipeDescriptor.java | 5 +++ .../connector/client/IoTDBSyncClientManager.java | 14 +++++++ 6 files changed, 95 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java index 2fc41a61866..522bda0f6dd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java @@ -85,6 +85,8 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager private volatile boolean isClosed = false; + private final Map<TEndPoint, Long> unhealthyEndPointMap = new ConcurrentHashMap<>(); + public IoTDBDataNodeAsyncClientManager( final List<TEndPoint> endPoints, /* The following parameters are used locally. */ @@ -179,7 +181,7 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager public AsyncPipeDataTransferServiceClient borrowClient(final TEndPoint endPoint) throws Exception { - if (!useLeaderCache || Objects.isNull(endPoint)) { + if (!useLeaderCache || Objects.isNull(endPoint) || isUnhealthy(endPoint)) { return borrowClient(); } @@ -313,10 +315,14 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager waitHandshakeFinished(isHandshakeFinished); } if (exception.get() != null) { + markUnhealthy(targetNodeUrl); throw new PipeConnectionException("Failed to handshake.", exception.get()); + } else { + markHealthy(targetNodeUrl); } } catch (TException e) { client.resetMethodStateIfStopped(); + markUnhealthy(targetNodeUrl); throw e; } finally { client.setShouldReturnSelf(true); @@ -405,8 +411,14 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager @Override public AsyncPipeDataTransferServiceClient borrowClient() throws Exception { final int clientSize = endPointList.size(); + long n = 0; while (true) { final TEndPoint targetNodeUrl = endPointList.get((int) (currentClientIndex++ % clientSize)); + if (isUnhealthy(targetNodeUrl) && n < clientSize) { + n++; + continue; + } + final AsyncPipeDataTransferServiceClient client = endPoint2Client.borrowClient(targetNodeUrl); if (handshakeIfNecessary(targetNodeUrl, client)) { @@ -420,8 +432,15 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager @Override public AsyncPipeDataTransferServiceClient borrowClient() throws Exception { final int clientSize = endPointList.size(); + long n = 0; + while (true) { final TEndPoint targetNodeUrl = endPointList.get((int) (Math.random() * clientSize)); + if (isUnhealthy(targetNodeUrl) && n <= clientSize) { + n++; + continue; + } + final AsyncPipeDataTransferServiceClient client = endPoint2Client.borrowClient(targetNodeUrl); if (handshakeIfNecessary(targetNodeUrl, client)) { @@ -434,8 +453,15 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager private class PriorityLoadBalancer implements LoadBalancer { @Override public AsyncPipeDataTransferServiceClient borrowClient() throws Exception { + final int clientSize = endPointList.size(); + long n = 0; while (true) { for (final TEndPoint targetNodeUrl : endPointList) { + if (isUnhealthy(targetNodeUrl) && n <= clientSize) { + n++; + continue; + } + final AsyncPipeDataTransferServiceClient client = endPoint2Client.borrowClient(targetNodeUrl); if (handshakeIfNecessary(targetNodeUrl, client)) { @@ -445,4 +471,25 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager } } } + + private boolean isUnhealthy(TEndPoint endPoint) { + Long downTime = unhealthyEndPointMap.get(endPoint); + if (downTime == null) { + return false; + } + if (System.currentTimeMillis() - downTime + > PipeConfig.getInstance().getPipeCheckAllSyncClientLiveTimeIntervalMs()) { + markHealthy(endPoint); + return false; + } + return true; + } + + private void markUnhealthy(TEndPoint endPoint) { + unhealthyEndPointMap.put(endPoint, System.currentTimeMillis()); + } + + private void markHealthy(TEndPoint endPoint) { + unhealthyEndPointMap.remove(endPoint); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java index d996cac2820..d7d19bdf07e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient; import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.utils.RetryUtils; +import org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeSyncClientManager; import org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventBatch; import org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventPlainBatch; import org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch; @@ -522,4 +523,8 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector { super.close(); } + + public IoTDBDataNodeSyncClientManager getClientManager() { + return clientManager; + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index dbe81b6e2ef..f716f3b302e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -331,6 +331,7 @@ public class CommonConfig { private double pipeThresholdAllocationStrategyLowUsageThreshold = 0.2d; private double pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold = 0.8d; private boolean pipeTransferTsFileSync = false; + private long pipeCheckAllSyncClientLiveTimeIntervalMs = 5 * 60 * 1000L; // 5 minutes private long twoStageAggregateMaxCombinerLiveTimeInMs = 8 * 60 * 1000L; // 8 minutes private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000L; // 3 minutes @@ -1935,6 +1936,21 @@ public class CommonConfig { logger.info("pipeTransferTsFileSync is set to {}", pipeTransferTsFileSync); } + public long getPipeCheckAllSyncClientLiveTimeIntervalMs() { + return pipeCheckAllSyncClientLiveTimeIntervalMs; + } + + public void setPipeCheckAllSyncClientLiveTimeIntervalMs( + long pipeCheckSyncAllClientLiveTimeIntervalMs) { + if (this.pipeCheckAllSyncClientLiveTimeIntervalMs == pipeCheckSyncAllClientLiveTimeIntervalMs) { + return; + } + this.pipeCheckAllSyncClientLiveTimeIntervalMs = pipeCheckSyncAllClientLiveTimeIntervalMs; + logger.info( + "pipeCheckSyncAllClientLiveTimeIntervalMs is set to {}", + pipeCheckSyncAllClientLiveTimeIntervalMs); + } + public double getPipeAllSinksRateLimitBytesPerSecond() { return pipeAllSinksRateLimitBytesPerSecond; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index 89f5187eef9..78af97331b9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -285,6 +285,10 @@ public class PipeConfig { return COMMON_CONFIG.getPipeTransferTsFileSync(); } + public long getPipeCheckAllSyncClientLiveTimeIntervalMs() { + return COMMON_CONFIG.getPipeCheckAllSyncClientLiveTimeIntervalMs(); + } + /////////////////////////////// Meta Consistency /////////////////////////////// public boolean isSeperatedPipeHeartbeatEnabled() { @@ -508,6 +512,9 @@ public class PipeConfig { "PipePipeRemainingInsertEventCountAverage: {}", getPipeRemainingInsertNodeCountEMAAlpha()); LOGGER.info("PipeTsFileScanParsingThreshold(): {}", getPipeTsFileScanParsingThreshold()); LOGGER.info("PipeTransferTsFileSync: {}", isTransferTsFileSync()); + LOGGER.info( + "PipeCheckAllSyncClientLiveTimeIntervalMs: {}", + getPipeCheckAllSyncClientLiveTimeIntervalMs()); LOGGER.info("PipeDynamicMemoryHistoryWeight: {}", getPipeDynamicMemoryHistoryWeight()); LOGGER.info( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java index 31d9f1ec3ce..27ea02c8825 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java @@ -509,6 +509,11 @@ public class PipeDescriptor { Boolean.parseBoolean( properties.getProperty( "pipe_transfer_tsfile_sync", String.valueOf(config.getPipeTransferTsFileSync())))); + config.setPipeCheckAllSyncClientLiveTimeIntervalMs( + Long.parseLong( + properties.getProperty( + "pipe_check_all_sync_client_live_time_interval_ms", + String.valueOf(config.getPipeCheckAllSyncClientLiveTimeIntervalMs())))); config.setPipeRemainingTimeCommitRateAutoSwitchSeconds( Long.parseLong( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java index 00fc35653d8..0b21abbd406 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java @@ -59,6 +59,8 @@ public abstract class IoTDBSyncClientManager extends IoTDBClientManager implemen new ConcurrentHashMap<>(); private final Map<TEndPoint, String> endPoint2HandshakeErrorMessage = new ConcurrentHashMap<>(); + private volatile long lastCheckClientStatusTimestamp = 0L; + private final LoadBalancer loadBalancer; protected IoTDBSyncClientManager( @@ -111,6 +113,17 @@ public abstract class IoTDBSyncClientManager extends IoTDBClientManager implemen } public void checkClientStatusAndTryReconstructIfNecessary() { + if (System.currentTimeMillis() - lastCheckClientStatusTimestamp + < PipeConfig.getInstance().getPipeCheckAllSyncClientLiveTimeIntervalMs()) { + // Check whether any clients are available, if any client is available, return directly + for (final Pair<IoTDBSyncClient, Boolean> clientAndStatus : + endPoint2ClientAndStatus.values()) { + if (Boolean.TRUE.equals(clientAndStatus.getRight())) { + return; + } + } + } + // Reconstruct all dead clients for (final Map.Entry<TEndPoint, Pair<IoTDBSyncClient, Boolean>> entry : endPoint2ClientAndStatus.entrySet()) { @@ -124,6 +137,7 @@ public abstract class IoTDBSyncClientManager extends IoTDBClientManager implemen // Check whether any clients are available for (final Pair<IoTDBSyncClient, Boolean> clientAndStatus : endPoint2ClientAndStatus.values()) { if (Boolean.TRUE.equals(clientAndStatus.getRight())) { + lastCheckClientStatusTimestamp = System.currentTimeMillis(); return; } }
