This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 670b45675e0 Pipe: Fix HA issues caused by exceptions not handled in 
handshake (IoTDBAirGapConnector / IoTDBSyncClientManager) (#14706)
670b45675e0 is described below

commit 670b45675e08eb148a4f7d919b3e1d11e2566923
Author: Zhenyu Luo <[email protected]>
AuthorDate: Sun Jan 19 14:00:07 2025 +0800

    Pipe: Fix HA issues caused by exceptions not handled in handshake 
(IoTDBAirGapConnector / IoTDBSyncClientManager) (#14706)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../iotdb/it/env/cluster/env/AbstractEnv.java      |  40 ++++++++
 .../iotdb/it/env/remote/env/RemoteServerEnv.java   |   7 ++
 .../java/org/apache/iotdb/itbase/env/BaseEnv.java  |   4 +
 .../org/apache/iotdb/db/it/utils/TestUtils.java    | 105 ++++++++++++++++++++-
 .../pipe/it/autocreate/IoTDBPipeClusterIT.java     |  98 +++++++++++++++++++
 .../pipe/it/tablemodel/IoTDBPipeClusterIT.java     |  65 +++++++++++++
 .../iotdb/pipe/it/tablemodel/TableModelUtils.java  |  15 +++
 .../client/IoTDBDataNodeAsyncClientManager.java    |   2 +-
 .../connector/client/IoTDBSyncClientManager.java   |  28 ++++--
 .../connector/protocol/IoTDBAirGapConnector.java   |  10 +-
 10 files changed, 356 insertions(+), 18 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index aeb34a2a7b9..acaf76e4514 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -443,6 +443,19 @@ public abstract class AbstractEnv implements BaseEnv {
         getReadConnections(null, username, password, sqlDialect));
   }
 
+  @Override
+  public Connection getConnection(
+      final DataNodeWrapper dataNodeWrapper,
+      final String username,
+      final String password,
+      final String sqlDialect)
+      throws SQLException {
+    return new ClusterTestConnection(
+        getWriteConnectionWithSpecifiedDataNode(
+            dataNodeWrapper, null, username, password, sqlDialect),
+        getReadConnections(null, dataNodeWrapper, username, password, 
sqlDialect));
+  }
+
   @Override
   public Connection getWriteOnlyConnectionWithSpecifiedDataNode(
       final DataNodeWrapper dataNode,
@@ -706,6 +719,33 @@ public abstract class AbstractEnv implements BaseEnv {
     return readConnRequestDelegate.requestAll();
   }
 
+  protected List<NodeConnection> getReadConnections(
+      final Constant.Version version,
+      final DataNodeWrapper dataNode,
+      final String username,
+      final String password,
+      final String sqlDialect)
+      throws SQLException {
+    final List<String> endpoints = new ArrayList<>();
+    final ParallelRequestDelegate<NodeConnection> readConnRequestDelegate =
+        new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT);
+
+    endpoints.add(dataNode.getIpAndPortString());
+    readConnRequestDelegate.addRequest(
+        () ->
+            new NodeConnection(
+                dataNode.getIpAndPortString(),
+                NodeConnection.NodeRole.DATA_NODE,
+                NodeConnection.ConnectionRole.READ,
+                DriverManager.getConnection(
+                    Config.IOTDB_URL_PREFIX
+                        + dataNode.getIpAndPortString()
+                        + getParam(version, NODE_NETWORK_TIMEOUT_MS, 
ZERO_TIME_ZONE),
+                    BaseEnv.constructProperties(username, password, 
sqlDialect))));
+
+    return readConnRequestDelegate.requestAll();
+  }
+
   // use this to avoid some runtimeExceptions when try to get jdbc connections.
   // because it is hard to add retry and handle exception when getting jdbc 
connections in
   // getWriteConnectionWithSpecifiedDataNode and getReadConnections.
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
index bf460fa826c..b29c5b96315 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
@@ -177,6 +177,13 @@ public class RemoteServerEnv implements BaseEnv {
     return connection;
   }
 
+  @Override
+  public Connection getConnection(
+      DataNodeWrapper dataNodeWrapper, String username, String password, 
String sqlDialect)
+      throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
   public void setTestMethodName(String testCaseName) {
     // Do nothing
   }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 1aa53cdbab1..ea044ddfd9d 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -141,6 +141,10 @@ public interface BaseEnv {
       Constant.Version version, String username, String password, String 
sqlDialect)
       throws SQLException;
 
+  Connection getConnection(
+      DataNodeWrapper dataNodeWrapper, String username, String password, 
String sqlDialect)
+      throws SQLException;
+
   default Connection getConnection(String username, String password) throws 
SQLException {
     return getConnection(username, password, TREE_SQL_DIALECT);
   }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
index 723dabc95f4..5cd9c63d527 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
@@ -57,6 +57,7 @@ import java.util.function.Consumer;
 import static org.apache.iotdb.itbase.constant.TestConstant.DELTA;
 import static org.apache.iotdb.itbase.constant.TestConstant.NULL;
 import static org.apache.iotdb.itbase.constant.TestConstant.TIMESTAMP_STR;
+import static org.apache.iotdb.itbase.env.BaseEnv.TREE_SQL_DIALECT;
 import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -704,7 +705,7 @@ public class TestUtils {
         SessionConfig.DEFAULT_USER,
         SessionConfig.DEFAULT_PASSWORD,
         null,
-        BaseEnv.TREE_SQL_DIALECT);
+        TREE_SQL_DIALECT);
   }
 
   public static boolean tryExecuteNonQueriesWithRetry(
@@ -722,8 +723,7 @@ public class TestUtils {
   // Instead, it returns a flag to indicate the result of the execution.
   public static boolean tryExecuteNonQueriesWithRetry(
       BaseEnv env, List<String> sqlList, String userName, String password) {
-    return tryExecuteNonQueriesWithRetry(
-        env, sqlList, userName, password, null, BaseEnv.TREE_SQL_DIALECT);
+    return tryExecuteNonQueriesWithRetry(env, sqlList, userName, password, 
null, TREE_SQL_DIALECT);
   }
 
   public static boolean tryExecuteNonQueriesWithRetry(
@@ -741,7 +741,7 @@ public class TestUtils {
                   password,
                   BaseEnv.TABLE_SQL_DIALECT.equals(sqlDialect)
                       ? BaseEnv.TABLE_SQL_DIALECT
-                      : BaseEnv.TREE_SQL_DIALECT);
+                      : TREE_SQL_DIALECT);
           Statement statement = connection.createStatement()) {
         if (BaseEnv.TABLE_SQL_DIALECT.equals(sqlDialect) && dataBase != null) {
           statement.execute("use " + dataBase);
@@ -1053,6 +1053,41 @@ public class TestUtils {
     }
   }
 
+  public static void assertDataEventuallyOnEnv(
+      BaseEnv env,
+      DataNodeWrapper dataNodeWrapper,
+      String sql,
+      String expectedHeader,
+      Set<String> expectedResSet,
+      long timeoutSeconds) {
+    try (Connection connection =
+            env.getConnection(
+                dataNodeWrapper,
+                SessionConfig.DEFAULT_USER,
+                SessionConfig.DEFAULT_PASSWORD,
+                TREE_SQL_DIALECT);
+        Statement statement = connection.createStatement()) {
+      // Keep retrying if there are execution failures
+      await()
+          .pollInSameThread()
+          .pollDelay(1L, TimeUnit.SECONDS)
+          .pollInterval(1L, TimeUnit.SECONDS)
+          .atMost(timeoutSeconds, TimeUnit.SECONDS)
+          .untilAsserted(
+              () -> {
+                try {
+                  TestUtils.assertResultSetEqual(
+                      executeQueryWithRetry(statement, sql), expectedHeader, 
expectedResSet);
+                } catch (Exception e) {
+                  Assert.fail();
+                }
+              });
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
   public static void assertDataSizeEventuallyOnEnv(
       final BaseEnv env, final String sql, final int size, final String 
databaseName) {
     assertDataSizeEventuallyOnEnv(env, sql, size, 600, databaseName);
@@ -1166,6 +1201,17 @@ public class TestUtils {
         env, sql, expectedHeader, expectedResSet, 600, dataBaseName, 
handleFailure);
   }
 
+  public static void assertDataEventuallyOnEnv(
+      final BaseEnv env,
+      final DataNodeWrapper dataNodeWrapper,
+      final String sql,
+      final String expectedHeader,
+      final Set<String> expectedResSet,
+      final String dataBaseName) {
+    assertDataEventuallyOnEnv(
+        env, dataNodeWrapper, sql, expectedHeader, expectedResSet, 600, 
dataBaseName, null);
+  }
+
   public static void assertDataEventuallyOnEnv(
       final BaseEnv env,
       final String sql,
@@ -1218,6 +1264,55 @@ public class TestUtils {
     }
   }
 
+  public static void assertDataEventuallyOnEnv(
+      final BaseEnv env,
+      final DataNodeWrapper dataNodeWrapper,
+      final String sql,
+      final String expectedHeader,
+      final Set<String> expectedResSet,
+      final long timeoutSeconds,
+      final String databaseName,
+      final Consumer<String> handleFailure) {
+    try (Connection connection =
+            env.getConnection(
+                dataNodeWrapper,
+                SessionConfig.DEFAULT_USER,
+                SessionConfig.DEFAULT_PASSWORD,
+                BaseEnv.TABLE_SQL_DIALECT);
+        Statement statement = connection.createStatement()) {
+      // Keep retrying if there are execution failures
+      await()
+          .pollInSameThread()
+          .pollDelay(1L, TimeUnit.SECONDS)
+          .pollInterval(1L, TimeUnit.SECONDS)
+          .atMost(timeoutSeconds, TimeUnit.SECONDS)
+          .untilAsserted(
+              () -> {
+                try {
+                  if (databaseName != null) {
+                    statement.execute("use " + databaseName);
+                  }
+                  if (sql != null && !sql.isEmpty()) {
+                    TestUtils.assertResultSetEqual(
+                        executeQueryWithRetry(statement, sql), expectedHeader, 
expectedResSet);
+                  }
+                } catch (Exception e) {
+                  if (handleFailure != null) {
+                    handleFailure.accept(e.getMessage());
+                  }
+                  Assert.fail();
+                } catch (Error e) {
+                  if (handleFailure != null) {
+                    handleFailure.accept(e.getMessage());
+                  }
+                  throw e;
+                }
+              });
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
   public static void assertDataEventuallyOnEnv(
       BaseEnv env, String sql, Map<String, String> expectedHeaderWithResult) {
     assertDataEventuallyOnEnv(env, sql, expectedHeaderWithResult, 600);
@@ -1294,7 +1389,7 @@ public class TestUtils {
     final boolean[] flushed = {false};
     try (Connection connection =
             env.getConnection(
-                Objects.isNull(database) ? BaseEnv.TREE_SQL_DIALECT : 
BaseEnv.TABLE_SQL_DIALECT);
+                Objects.isNull(database) ? TREE_SQL_DIALECT : 
BaseEnv.TABLE_SQL_DIALECT);
         Statement statement = connection.createStatement()) {
       // Keep retrying if there are execution failures
       await()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
index 0d4fc3eea7e..ad8c86ea054 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
+import org.apache.iotdb.pipe.it.tablemodel.TableModelUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.TException;
@@ -80,6 +81,8 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeDualAutoIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDataReplicationFactor(2)
+        .setSchemaReplicationFactor(3)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
@@ -92,6 +95,101 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeDualAutoIT {
     receiverEnv.initClusterEnvironment(3, 3, 180);
   }
 
+  @Test
+  public void testMachineDowntimeAsync() {
+    testMachineDowntime("iotdb-thrift-connector");
+  }
+
+  @Test
+  public void testMachineDowntimeSync() {
+    testMachineDowntime("iotdb-thrift-sync-connector");
+  }
+
+  private void testMachineDowntime(String sink) {
+    StringBuilder a = new StringBuilder();
+    for (DataNodeWrapper nodeWrapper : receiverEnv.getDataNodeWrapperList()) {
+      a.append(nodeWrapper.getIp()).append(":").append(nodeWrapper.getPort());
+      a.append(",");
+    }
+    a.deleteCharAt(a.length() - 1);
+
+    TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
+    TableModelUtils.insertData("test", "test", 0, 1, senderEnv);
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              "insert into root.db.d1(time, s1) values 
(2010-01-01T10:00:00+08:00, 1)",
+              "insert into root.db.d1(time, s1) values 
(2010-01-02T10:00:00+08:00, 2)",
+              "flush"))) {
+        return;
+      }
+
+      final Map<String, String> extractorAttributes = new HashMap<>();
+      final Map<String, String> processorAttributes = new HashMap<>();
+      final Map<String, String> connectorAttributes = new HashMap<>();
+
+      extractorAttributes.put("extractor", "iotdb-extractor");
+      extractorAttributes.put("capture.tree", "true");
+
+      processorAttributes.put("processor", "do-nothing-processor");
+
+      connectorAttributes.put("connector", sink);
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.node-urls", a.toString());
+
+      final TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+
+      receiverEnv.getDataNodeWrapper(0).stop();
+
+      // Ensure that the kill -9 operation is completed
+      Thread.sleep(5000);
+      for (DataNodeWrapper nodeWrapper : receiverEnv.getDataNodeWrapperList()) 
{
+        if (!nodeWrapper.isAlive()) {
+          continue;
+        }
+        TestUtils.assertDataEventuallyOnEnv(
+            receiverEnv,
+            nodeWrapper,
+            "select count(*) from root.**",
+            "count(root.db.d1.s1),",
+            Collections.singleton("2,"),
+            600);
+      }
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList("insert into root.db.d1(time, s1) values (now(), 3)", 
"flush"))) {
+        return;
+      }
+
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+
+    for (DataNodeWrapper nodeWrapper : receiverEnv.getDataNodeWrapperList()) {
+      if (!nodeWrapper.isAlive()) {
+        continue;
+      }
+
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv,
+          nodeWrapper,
+          "select count(*) from root.**",
+          "count(root.db.d1.s1),",
+          Collections.singleton("3,"),
+          600);
+      return;
+    }
+  }
+
   @Test
   public void testWithAllParametersInLogMode() throws Exception {
     testWithAllParameters("log");
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeClusterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeClusterIT.java
index 0e0c97a61da..cab7f31a756 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeClusterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeClusterIT.java
@@ -78,6 +78,8 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelTestIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDataReplicationFactor(2)
+        .setSchemaReplicationFactor(3)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
@@ -90,6 +92,69 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelTestIT {
     receiverEnv.initClusterEnvironment(3, 3, 180);
   }
 
+  @Test
+  public void testMachineDowntimeAsync() {
+    testMachineDowntime("iotdb-thrift-connector");
+  }
+
+  @Test
+  public void testMachineDowntimeSync() {
+    testMachineDowntime("iotdb-thrift-sync-connector");
+  }
+
+  private void testMachineDowntime(String sink) {
+    StringBuilder a = new StringBuilder();
+    for (DataNodeWrapper nodeWrapper : receiverEnv.getDataNodeWrapperList()) {
+      a.append(nodeWrapper.getIp()).append(":").append(nodeWrapper.getPort());
+      a.append(",");
+    }
+    a.deleteCharAt(a.length() - 1);
+
+    TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
+    TableModelUtils.insertData("test", "test", 0, 1, senderEnv);
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+
+      final Map<String, String> extractorAttributes = new HashMap<>();
+      final Map<String, String> processorAttributes = new HashMap<>();
+      final Map<String, String> connectorAttributes = new HashMap<>();
+
+      extractorAttributes.put("extractor", "iotdb-extractor");
+      extractorAttributes.put("capture.table", "true");
+
+      processorAttributes.put("processor", "do-nothing-processor");
+
+      connectorAttributes.put("connector", sink);
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.node-urls", a.toString());
+
+      final TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+
+      TableModelUtils.assertCountData("test", "test", 1, receiverEnv);
+      receiverEnv.getDataNodeWrapper(0).stop();
+
+      // Ensure that the kill -9 operation is completed
+      Thread.sleep(5000);
+      TableModelUtils.insertData("test", "test", 1, 2, senderEnv);
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+
+    for (DataNodeWrapper nodeWrapper : receiverEnv.getDataNodeWrapperList()) {
+      if (!nodeWrapper.isAlive()) {
+        continue;
+      }
+      TableModelUtils.assertCountData("test", "test", 2, receiverEnv, 
nodeWrapper);
+      return;
+    }
+  }
+
   @Test
   public void testWithAllParametersInStreamingMode() throws Exception {
     testWithAllParameters("true");
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/TableModelUtils.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/TableModelUtils.java
index e234cc6bd5c..b116cad502c 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/TableModelUtils.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/TableModelUtils.java
@@ -422,6 +422,21 @@ public class TableModelUtils {
         handleFailure);
   }
 
+  public static void assertCountData(
+      final String database,
+      final String table,
+      final int count,
+      final BaseEnv baseEnv,
+      final DataNodeWrapper wrapper) {
+    TestUtils.assertDataEventuallyOnEnv(
+        baseEnv,
+        wrapper,
+        getQueryCountSql(table),
+        "_col0,",
+        Collections.singleton(count + ","),
+        database);
+  }
+
   public static String getDateStr(final int value) {
     Date date = new Date(value);
     SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 40a84c4128f..6cee85cf2c2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -256,7 +256,7 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
       // Retry to handshake by PipeTransferHandshakeV1Req.
       if (resp.get() != null
           && resp.get().getStatus().getCode() == 
TSStatusCode.PIPE_TYPE_ERROR.getStatusCode()) {
-        LOGGER.info(
+        LOGGER.warn(
             "Handshake error by PipeTransferHandshakeV2Req with receiver {}:{} 
"
                 + "retry to handshake by PipeTransferHandshakeV1Req.",
             targetNodeUrl.getIp(),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index 1cf5c354d08..78d5b7d5f0a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -125,6 +125,8 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
         return;
       }
     }
+
+    // If all clients are not available, throw an exception
     final StringBuilder errorMessage =
         new StringBuilder(
             String.format(
@@ -132,7 +134,7 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
     for (final Map.Entry<TEndPoint, String> entry : 
endPoint2HandshakeErrorMessage.entrySet()) {
       errorMessage
           .append(" (")
-          .append(" host: ")
+          .append("host: ")
           .append(entry.getKey().getIp())
           .append(", port: ")
           .append(entry.getKey().getPort())
@@ -160,11 +162,15 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
       }
     }
 
-    initClientAndStatus(clientAndStatus, endPoint);
-    sendHandshakeReq(clientAndStatus);
+    // It is necessary to ensure that the client is initialized successfully 
and not null. If false
+    // is returned, it means that the initialization is not successful and the 
handshake operation
+    // is not performed.
+    if (initClientAndStatus(clientAndStatus, endPoint)) {
+      sendHandshakeReq(clientAndStatus);
+    }
   }
 
-  private void initClientAndStatus(
+  private boolean initClientAndStatus(
       final Pair<IoTDBSyncClient, Boolean> clientAndStatus, final TEndPoint 
endPoint) {
     try {
       clientAndStatus.setLeft(
@@ -179,14 +185,16 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
               useSSL,
               trustStorePath,
               trustStorePwd));
+      return true;
     } catch (Exception e) {
       endPoint2HandshakeErrorMessage.put(endPoint, e.getMessage());
-      throw new PipeConnectionException(
-          String.format(
-              PipeConnectionException.CONNECTION_ERROR_FORMATTER,
-              endPoint.getIp(),
-              endPoint.getPort()),
+      LOGGER.warn(
+          "Failed to initialize client with target server ip: {}, port: {}, 
because {}",
+          endPoint.getIp(),
+          endPoint.getPort(),
+          e.getMessage(),
           e);
+      return false;
     }
   }
 
@@ -211,7 +219,7 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
       // Receiver may be an old version, so we need to retry to handshake by
       // PipeTransferHandshakeV1Req.
       if (resp.getStatus().getCode() == 
TSStatusCode.PIPE_TYPE_ERROR.getStatusCode()) {
-        LOGGER.info(
+        LOGGER.warn(
             "Handshake error with target server ip: {}, port: {}, because: {}. 
"
                 + "Retry to handshake by PipeTransferHandshakeV1Req.",
             client.getIpAddress(),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
index 3df2af38a34..799913756e4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
@@ -189,8 +189,14 @@ public abstract class IoTDBAirGapConnector extends 
IoTDBConnector {
         continue;
       }
 
-      sendHandshakeReq(socket);
-      isSocketAlive.set(i, true);
+      try {
+        sendHandshakeReq(socket);
+        isSocketAlive.set(i, true);
+      } catch (Exception e) {
+        LOGGER.warn(
+            "Handshake error occurs. It may be caused by an error on the 
receiving end. Ignore it.",
+            e);
+      }
     }
 
     for (int i = 0; i < sockets.size(); i++) {

Reply via email to