This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/1.3.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 42b110729bf4f2fcd038ef216beb37219fe7a774
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Jul 22 11:34:56 2025 +0800

    Pipe: Fix the problem that a cluster crash in the multi-cluster receiving 
end causes all synchronization to be suspended (#15962) (#15991)
    
    * Pipe: Fix the problem that a cluster crash in the multi-cluster receiving 
end causes all synchronization to be suspended
    
    * add IsEndPointAlive Function
    
    * add IsEndPointAlive Function
    
    * add IsEndPointAlive Function
    
    * add pipeCheckSyncAllClientLiveTimeIntervalMs config
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    (cherry picked from commit d34054dec5e7bdaa472ca134a9e2cebe93c1772f)
---
 .../client/IoTDBDataNodeAsyncClientManager.java    | 49 +++++++++++++++++++++-
 .../thrift/sync/IoTDBDataRegionSyncConnector.java  |  5 +++
 .../apache/iotdb/commons/conf/CommonConfig.java    | 16 +++++++
 .../iotdb/commons/pipe/config/PipeConfig.java      |  7 ++++
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |  5 +++
 .../connector/client/IoTDBSyncClientManager.java   | 14 +++++++
 6 files changed, 95 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 2fc41a61866..522bda0f6dd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -85,6 +85,8 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
 
   private volatile boolean isClosed = false;
 
+  private final Map<TEndPoint, Long> unhealthyEndPointMap = new 
ConcurrentHashMap<>();
+
   public IoTDBDataNodeAsyncClientManager(
       final List<TEndPoint> endPoints,
       /* The following parameters are used locally. */
@@ -179,7 +181,7 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
 
   public AsyncPipeDataTransferServiceClient borrowClient(final TEndPoint 
endPoint)
       throws Exception {
-    if (!useLeaderCache || Objects.isNull(endPoint)) {
+    if (!useLeaderCache || Objects.isNull(endPoint) || isUnhealthy(endPoint)) {
       return borrowClient();
     }
 
@@ -313,10 +315,14 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
         waitHandshakeFinished(isHandshakeFinished);
       }
       if (exception.get() != null) {
+        markUnhealthy(targetNodeUrl);
         throw new PipeConnectionException("Failed to handshake.", 
exception.get());
+      } else {
+        markHealthy(targetNodeUrl);
       }
     } catch (TException e) {
       client.resetMethodStateIfStopped();
+      markUnhealthy(targetNodeUrl);
       throw e;
     } finally {
       client.setShouldReturnSelf(true);
@@ -405,8 +411,14 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
     @Override
     public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
       final int clientSize = endPointList.size();
+      long n = 0;
       while (true) {
         final TEndPoint targetNodeUrl = endPointList.get((int) 
(currentClientIndex++ % clientSize));
+        if (isUnhealthy(targetNodeUrl) && n < clientSize) {
+          n++;
+          continue;
+        }
+
         final AsyncPipeDataTransferServiceClient client =
             endPoint2Client.borrowClient(targetNodeUrl);
         if (handshakeIfNecessary(targetNodeUrl, client)) {
@@ -420,8 +432,15 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
     @Override
     public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
       final int clientSize = endPointList.size();
+      long n = 0;
+
       while (true) {
         final TEndPoint targetNodeUrl = endPointList.get((int) (Math.random() 
* clientSize));
+        if (isUnhealthy(targetNodeUrl) && n <= clientSize) {
+          n++;
+          continue;
+        }
+
         final AsyncPipeDataTransferServiceClient client =
             endPoint2Client.borrowClient(targetNodeUrl);
         if (handshakeIfNecessary(targetNodeUrl, client)) {
@@ -434,8 +453,15 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
   private class PriorityLoadBalancer implements LoadBalancer {
     @Override
     public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
+      final int clientSize = endPointList.size();
+      long n = 0;
       while (true) {
         for (final TEndPoint targetNodeUrl : endPointList) {
+          if (isUnhealthy(targetNodeUrl) && n <= clientSize) {
+            n++;
+            continue;
+          }
+
           final AsyncPipeDataTransferServiceClient client =
               endPoint2Client.borrowClient(targetNodeUrl);
           if (handshakeIfNecessary(targetNodeUrl, client)) {
@@ -445,4 +471,25 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
       }
     }
   }
+
+  private boolean isUnhealthy(TEndPoint endPoint) {
+    Long downTime = unhealthyEndPointMap.get(endPoint);
+    if (downTime == null) {
+      return false;
+    }
+    if (System.currentTimeMillis() - downTime
+        > 
PipeConfig.getInstance().getPipeCheckAllSyncClientLiveTimeIntervalMs()) {
+      markHealthy(endPoint);
+      return false;
+    }
+    return true;
+  }
+
+  private void markUnhealthy(TEndPoint endPoint) {
+    unhealthyEndPointMap.put(endPoint, System.currentTimeMillis());
+  }
+
+  private void markHealthy(TEndPoint endPoint) {
+    unhealthyEndPointMap.remove(endPoint);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index d996cac2820..d7d19bdf07e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.utils.RetryUtils;
+import 
org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeSyncClientManager;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventBatch;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventPlainBatch;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch;
@@ -522,4 +523,8 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
 
     super.close();
   }
+
+  public IoTDBDataNodeSyncClientManager getClientManager() {
+    return clientManager;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index dbe81b6e2ef..f716f3b302e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -331,6 +331,7 @@ public class CommonConfig {
   private double pipeThresholdAllocationStrategyLowUsageThreshold = 0.2d;
   private double pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold 
= 0.8d;
   private boolean pipeTransferTsFileSync = false;
+  private long pipeCheckAllSyncClientLiveTimeIntervalMs = 5 * 60 * 1000L; // 5 
minutes
 
   private long twoStageAggregateMaxCombinerLiveTimeInMs = 8 * 60 * 1000L; // 8 
minutes
   private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000L; 
// 3 minutes
@@ -1935,6 +1936,21 @@ public class CommonConfig {
     logger.info("pipeTransferTsFileSync is set to {}", pipeTransferTsFileSync);
   }
 
+  public long getPipeCheckAllSyncClientLiveTimeIntervalMs() {
+    return pipeCheckAllSyncClientLiveTimeIntervalMs;
+  }
+
+  public void setPipeCheckAllSyncClientLiveTimeIntervalMs(
+      long pipeCheckSyncAllClientLiveTimeIntervalMs) {
+    if (this.pipeCheckAllSyncClientLiveTimeIntervalMs == 
pipeCheckSyncAllClientLiveTimeIntervalMs) {
+      return;
+    }
+    this.pipeCheckAllSyncClientLiveTimeIntervalMs = 
pipeCheckSyncAllClientLiveTimeIntervalMs;
+    logger.info(
+        "pipeCheckSyncAllClientLiveTimeIntervalMs is set to {}",
+        pipeCheckSyncAllClientLiveTimeIntervalMs);
+  }
+
   public double getPipeAllSinksRateLimitBytesPerSecond() {
     return pipeAllSinksRateLimitBytesPerSecond;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 89f5187eef9..78af97331b9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -285,6 +285,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeTransferTsFileSync();
   }
 
+  public long getPipeCheckAllSyncClientLiveTimeIntervalMs() {
+    return COMMON_CONFIG.getPipeCheckAllSyncClientLiveTimeIntervalMs();
+  }
+
   /////////////////////////////// Meta Consistency 
///////////////////////////////
 
   public boolean isSeperatedPipeHeartbeatEnabled() {
@@ -508,6 +512,9 @@ public class PipeConfig {
         "PipePipeRemainingInsertEventCountAverage: {}", 
getPipeRemainingInsertNodeCountEMAAlpha());
     LOGGER.info("PipeTsFileScanParsingThreshold(): {}", 
getPipeTsFileScanParsingThreshold());
     LOGGER.info("PipeTransferTsFileSync: {}", isTransferTsFileSync());
+    LOGGER.info(
+        "PipeCheckAllSyncClientLiveTimeIntervalMs: {}",
+        getPipeCheckAllSyncClientLiveTimeIntervalMs());
 
     LOGGER.info("PipeDynamicMemoryHistoryWeight: {}", 
getPipeDynamicMemoryHistoryWeight());
     LOGGER.info(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 31d9f1ec3ce..27ea02c8825 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -509,6 +509,11 @@ public class PipeDescriptor {
         Boolean.parseBoolean(
             properties.getProperty(
                 "pipe_transfer_tsfile_sync", 
String.valueOf(config.getPipeTransferTsFileSync()))));
+    config.setPipeCheckAllSyncClientLiveTimeIntervalMs(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_check_all_sync_client_live_time_interval_ms",
+                
String.valueOf(config.getPipeCheckAllSyncClientLiveTimeIntervalMs()))));
 
     config.setPipeRemainingTimeCommitRateAutoSwitchSeconds(
         Long.parseLong(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index 00fc35653d8..0b21abbd406 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -59,6 +59,8 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
       new ConcurrentHashMap<>();
   private final Map<TEndPoint, String> endPoint2HandshakeErrorMessage = new 
ConcurrentHashMap<>();
 
+  private volatile long lastCheckClientStatusTimestamp = 0L;
+
   private final LoadBalancer loadBalancer;
 
   protected IoTDBSyncClientManager(
@@ -111,6 +113,17 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
   }
 
   public void checkClientStatusAndTryReconstructIfNecessary() {
+    if (System.currentTimeMillis() - lastCheckClientStatusTimestamp
+        < 
PipeConfig.getInstance().getPipeCheckAllSyncClientLiveTimeIntervalMs()) {
+      // Check whether any clients are available, if any client is available, 
return directly
+      for (final Pair<IoTDBSyncClient, Boolean> clientAndStatus :
+          endPoint2ClientAndStatus.values()) {
+        if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
+          return;
+        }
+      }
+    }
+
     // Reconstruct all dead clients
     for (final Map.Entry<TEndPoint, Pair<IoTDBSyncClient, Boolean>> entry :
         endPoint2ClientAndStatus.entrySet()) {
@@ -124,6 +137,7 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
     // Check whether any clients are available
     for (final Pair<IoTDBSyncClient, Boolean> clientAndStatus : 
endPoint2ClientAndStatus.values()) {
       if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
+        lastCheckClientStatusTimestamp = System.currentTimeMillis();
         return;
       }
     }

Reply via email to