This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 670b45675e0 Pipe: Fix HA issues caused by exceptions not handled in
handshake (IoTDBAirGapConnector / IoTDBSyncClientManager) (#14706)
670b45675e0 is described below
commit 670b45675e08eb148a4f7d919b3e1d11e2566923
Author: Zhenyu Luo <[email protected]>
AuthorDate: Sun Jan 19 14:00:07 2025 +0800
Pipe: Fix HA issues caused by exceptions not handled in handshake
(IoTDBAirGapConnector / IoTDBSyncClientManager) (#14706)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../iotdb/it/env/cluster/env/AbstractEnv.java | 40 ++++++++
.../iotdb/it/env/remote/env/RemoteServerEnv.java | 7 ++
.../java/org/apache/iotdb/itbase/env/BaseEnv.java | 4 +
.../org/apache/iotdb/db/it/utils/TestUtils.java | 105 ++++++++++++++++++++-
.../pipe/it/autocreate/IoTDBPipeClusterIT.java | 98 +++++++++++++++++++
.../pipe/it/tablemodel/IoTDBPipeClusterIT.java | 65 +++++++++++++
.../iotdb/pipe/it/tablemodel/TableModelUtils.java | 15 +++
.../client/IoTDBDataNodeAsyncClientManager.java | 2 +-
.../connector/client/IoTDBSyncClientManager.java | 28 ++++--
.../connector/protocol/IoTDBAirGapConnector.java | 10 +-
10 files changed, 356 insertions(+), 18 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index aeb34a2a7b9..acaf76e4514 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -443,6 +443,19 @@ public abstract class AbstractEnv implements BaseEnv {
getReadConnections(null, username, password, sqlDialect));
}
+ @Override
+ public Connection getConnection(
+ final DataNodeWrapper dataNodeWrapper,
+ final String username,
+ final String password,
+ final String sqlDialect)
+ throws SQLException {
+ return new ClusterTestConnection(
+ getWriteConnectionWithSpecifiedDataNode(
+ dataNodeWrapper, null, username, password, sqlDialect),
+ getReadConnections(null, dataNodeWrapper, username, password,
sqlDialect));
+ }
+
@Override
public Connection getWriteOnlyConnectionWithSpecifiedDataNode(
final DataNodeWrapper dataNode,
@@ -706,6 +719,33 @@ public abstract class AbstractEnv implements BaseEnv {
return readConnRequestDelegate.requestAll();
}
+ protected List<NodeConnection> getReadConnections(
+ final Constant.Version version,
+ final DataNodeWrapper dataNode,
+ final String username,
+ final String password,
+ final String sqlDialect)
+ throws SQLException {
+ final List<String> endpoints = new ArrayList<>();
+ final ParallelRequestDelegate<NodeConnection> readConnRequestDelegate =
+ new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT);
+
+ endpoints.add(dataNode.getIpAndPortString());
+ readConnRequestDelegate.addRequest(
+ () ->
+ new NodeConnection(
+ dataNode.getIpAndPortString(),
+ NodeConnection.NodeRole.DATA_NODE,
+ NodeConnection.ConnectionRole.READ,
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX
+ + dataNode.getIpAndPortString()
+ + getParam(version, NODE_NETWORK_TIMEOUT_MS,
ZERO_TIME_ZONE),
+ BaseEnv.constructProperties(username, password,
sqlDialect))));
+
+ return readConnRequestDelegate.requestAll();
+ }
+
// use this to avoid some runtimeExceptions when try to get jdbc connections.
// because it is hard to add retry and handle exception when getting jdbc
connections in
// getWriteConnectionWithSpecifiedDataNode and getReadConnections.
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
index bf460fa826c..b29c5b96315 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
@@ -177,6 +177,13 @@ public class RemoteServerEnv implements BaseEnv {
return connection;
}
+ @Override
+ public Connection getConnection(
+ DataNodeWrapper dataNodeWrapper, String username, String password,
String sqlDialect)
+ throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
public void setTestMethodName(String testCaseName) {
// Do nothing
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 1aa53cdbab1..ea044ddfd9d 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -141,6 +141,10 @@ public interface BaseEnv {
Constant.Version version, String username, String password, String
sqlDialect)
throws SQLException;
+ Connection getConnection(
+ DataNodeWrapper dataNodeWrapper, String username, String password,
String sqlDialect)
+ throws SQLException;
+
default Connection getConnection(String username, String password) throws
SQLException {
return getConnection(username, password, TREE_SQL_DIALECT);
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
index 723dabc95f4..5cd9c63d527 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
@@ -57,6 +57,7 @@ import java.util.function.Consumer;
import static org.apache.iotdb.itbase.constant.TestConstant.DELTA;
import static org.apache.iotdb.itbase.constant.TestConstant.NULL;
import static org.apache.iotdb.itbase.constant.TestConstant.TIMESTAMP_STR;
+import static org.apache.iotdb.itbase.env.BaseEnv.TREE_SQL_DIALECT;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -704,7 +705,7 @@ public class TestUtils {
SessionConfig.DEFAULT_USER,
SessionConfig.DEFAULT_PASSWORD,
null,
- BaseEnv.TREE_SQL_DIALECT);
+ TREE_SQL_DIALECT);
}
public static boolean tryExecuteNonQueriesWithRetry(
@@ -722,8 +723,7 @@ public class TestUtils {
// Instead, it returns a flag to indicate the result of the execution.
public static boolean tryExecuteNonQueriesWithRetry(
BaseEnv env, List<String> sqlList, String userName, String password) {
- return tryExecuteNonQueriesWithRetry(
- env, sqlList, userName, password, null, BaseEnv.TREE_SQL_DIALECT);
+ return tryExecuteNonQueriesWithRetry(env, sqlList, userName, password,
null, TREE_SQL_DIALECT);
}
public static boolean tryExecuteNonQueriesWithRetry(
@@ -741,7 +741,7 @@ public class TestUtils {
password,
BaseEnv.TABLE_SQL_DIALECT.equals(sqlDialect)
? BaseEnv.TABLE_SQL_DIALECT
- : BaseEnv.TREE_SQL_DIALECT);
+ : TREE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
if (BaseEnv.TABLE_SQL_DIALECT.equals(sqlDialect) && dataBase != null) {
statement.execute("use " + dataBase);
@@ -1053,6 +1053,41 @@ public class TestUtils {
}
}
+ public static void assertDataEventuallyOnEnv(
+ BaseEnv env,
+ DataNodeWrapper dataNodeWrapper,
+ String sql,
+ String expectedHeader,
+ Set<String> expectedResSet,
+ long timeoutSeconds) {
+ try (Connection connection =
+ env.getConnection(
+ dataNodeWrapper,
+ SessionConfig.DEFAULT_USER,
+ SessionConfig.DEFAULT_PASSWORD,
+ TREE_SQL_DIALECT);
+ Statement statement = connection.createStatement()) {
+ // Keep retrying if there are execution failures
+ await()
+ .pollInSameThread()
+ .pollDelay(1L, TimeUnit.SECONDS)
+ .pollInterval(1L, TimeUnit.SECONDS)
+ .atMost(timeoutSeconds, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ try {
+ TestUtils.assertResultSetEqual(
+ executeQueryWithRetry(statement, sql), expectedHeader,
expectedResSet);
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ });
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
public static void assertDataSizeEventuallyOnEnv(
final BaseEnv env, final String sql, final int size, final String
databaseName) {
assertDataSizeEventuallyOnEnv(env, sql, size, 600, databaseName);
@@ -1166,6 +1201,17 @@ public class TestUtils {
env, sql, expectedHeader, expectedResSet, 600, dataBaseName,
handleFailure);
}
+ public static void assertDataEventuallyOnEnv(
+ final BaseEnv env,
+ final DataNodeWrapper dataNodeWrapper,
+ final String sql,
+ final String expectedHeader,
+ final Set<String> expectedResSet,
+ final String dataBaseName) {
+ assertDataEventuallyOnEnv(
+ env, dataNodeWrapper, sql, expectedHeader, expectedResSet, 600,
dataBaseName, null);
+ }
+
public static void assertDataEventuallyOnEnv(
final BaseEnv env,
final String sql,
@@ -1218,6 +1264,55 @@ public class TestUtils {
}
}
+ public static void assertDataEventuallyOnEnv(
+ final BaseEnv env,
+ final DataNodeWrapper dataNodeWrapper,
+ final String sql,
+ final String expectedHeader,
+ final Set<String> expectedResSet,
+ final long timeoutSeconds,
+ final String databaseName,
+ final Consumer<String> handleFailure) {
+ try (Connection connection =
+ env.getConnection(
+ dataNodeWrapper,
+ SessionConfig.DEFAULT_USER,
+ SessionConfig.DEFAULT_PASSWORD,
+ BaseEnv.TABLE_SQL_DIALECT);
+ Statement statement = connection.createStatement()) {
+ // Keep retrying if there are execution failures
+ await()
+ .pollInSameThread()
+ .pollDelay(1L, TimeUnit.SECONDS)
+ .pollInterval(1L, TimeUnit.SECONDS)
+ .atMost(timeoutSeconds, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ try {
+ if (databaseName != null) {
+ statement.execute("use " + databaseName);
+ }
+ if (sql != null && !sql.isEmpty()) {
+ TestUtils.assertResultSetEqual(
+ executeQueryWithRetry(statement, sql), expectedHeader,
expectedResSet);
+ }
+ } catch (Exception e) {
+ if (handleFailure != null) {
+ handleFailure.accept(e.getMessage());
+ }
+ Assert.fail();
+ } catch (Error e) {
+ if (handleFailure != null) {
+ handleFailure.accept(e.getMessage());
+ }
+ throw e;
+ }
+ });
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
public static void assertDataEventuallyOnEnv(
BaseEnv env, String sql, Map<String, String> expectedHeaderWithResult) {
assertDataEventuallyOnEnv(env, sql, expectedHeaderWithResult, 600);
@@ -1294,7 +1389,7 @@ public class TestUtils {
final boolean[] flushed = {false};
try (Connection connection =
env.getConnection(
- Objects.isNull(database) ? BaseEnv.TREE_SQL_DIALECT :
BaseEnv.TABLE_SQL_DIALECT);
+ Objects.isNull(database) ? TREE_SQL_DIALECT :
BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
await()
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
index 0d4fc3eea7e..ad8c86ea054 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
+import org.apache.iotdb.pipe.it.tablemodel.TableModelUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
@@ -80,6 +81,8 @@ public class IoTDBPipeClusterIT extends
AbstractPipeDualAutoIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDataReplicationFactor(2)
+ .setSchemaReplicationFactor(3)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
@@ -92,6 +95,101 @@ public class IoTDBPipeClusterIT extends
AbstractPipeDualAutoIT {
receiverEnv.initClusterEnvironment(3, 3, 180);
}
+ @Test
+ public void testMachineDowntimeAsync() {
+ testMachineDowntime("iotdb-thrift-connector");
+ }
+
+ @Test
+ public void testMachineDowntimeSync() {
+ testMachineDowntime("iotdb-thrift-sync-connector");
+ }
+
+ private void testMachineDowntime(String sink) {
+ StringBuilder a = new StringBuilder();
+ for (DataNodeWrapper nodeWrapper : receiverEnv.getDataNodeWrapperList()) {
+ a.append(nodeWrapper.getIp()).append(":").append(nodeWrapper.getPort());
+ a.append(",");
+ }
+ a.deleteCharAt(a.length() - 1);
+
+ TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
+ TableModelUtils.insertData("test", "test", 0, 1, senderEnv);
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1(time, s1) values
(2010-01-01T10:00:00+08:00, 1)",
+ "insert into root.db.d1(time, s1) values
(2010-01-02T10:00:00+08:00, 2)",
+ "flush"))) {
+ return;
+ }
+
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor", "iotdb-extractor");
+ extractorAttributes.put("capture.tree", "true");
+
+ processorAttributes.put("processor", "do-nothing-processor");
+
+ connectorAttributes.put("connector", sink);
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.node-urls", a.toString());
+
+ final TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ receiverEnv.getDataNodeWrapper(0).stop();
+
+ // Ensure that the kill -9 operation is completed
+ Thread.sleep(5000);
+ for (DataNodeWrapper nodeWrapper : receiverEnv.getDataNodeWrapperList())
{
+ if (!nodeWrapper.isAlive()) {
+ continue;
+ }
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ nodeWrapper,
+ "select count(*) from root.**",
+ "count(root.db.d1.s1),",
+ Collections.singleton("2,"),
+ 600);
+ }
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList("insert into root.db.d1(time, s1) values (now(), 3)",
"flush"))) {
+ return;
+ }
+
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+
+ for (DataNodeWrapper nodeWrapper : receiverEnv.getDataNodeWrapperList()) {
+ if (!nodeWrapper.isAlive()) {
+ continue;
+ }
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ nodeWrapper,
+ "select count(*) from root.**",
+ "count(root.db.d1.s1),",
+ Collections.singleton("3,"),
+ 600);
+ return;
+ }
+ }
+
@Test
public void testWithAllParametersInLogMode() throws Exception {
testWithAllParameters("log");
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeClusterIT.java
index 0e0c97a61da..cab7f31a756 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeClusterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeClusterIT.java
@@ -78,6 +78,8 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelTestIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDataReplicationFactor(2)
+ .setSchemaReplicationFactor(3)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
@@ -90,6 +92,69 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelTestIT {
receiverEnv.initClusterEnvironment(3, 3, 180);
}
+ @Test
+ public void testMachineDowntimeAsync() {
+ testMachineDowntime("iotdb-thrift-connector");
+ }
+
+ @Test
+ public void testMachineDowntimeSync() {
+ testMachineDowntime("iotdb-thrift-sync-connector");
+ }
+
+ private void testMachineDowntime(String sink) {
+ StringBuilder a = new StringBuilder();
+ for (DataNodeWrapper nodeWrapper : receiverEnv.getDataNodeWrapperList()) {
+ a.append(nodeWrapper.getIp()).append(":").append(nodeWrapper.getPort());
+ a.append(",");
+ }
+ a.deleteCharAt(a.length() - 1);
+
+ TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
+ TableModelUtils.insertData("test", "test", 0, 1, senderEnv);
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor", "iotdb-extractor");
+ extractorAttributes.put("capture.table", "true");
+
+ processorAttributes.put("processor", "do-nothing-processor");
+
+ connectorAttributes.put("connector", sink);
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.node-urls", a.toString());
+
+ final TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ TableModelUtils.assertCountData("test", "test", 1, receiverEnv);
+ receiverEnv.getDataNodeWrapper(0).stop();
+
+ // Ensure that the kill -9 operation is completed
+ Thread.sleep(5000);
+ TableModelUtils.insertData("test", "test", 1, 2, senderEnv);
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+
+ for (DataNodeWrapper nodeWrapper : receiverEnv.getDataNodeWrapperList()) {
+ if (!nodeWrapper.isAlive()) {
+ continue;
+ }
+ TableModelUtils.assertCountData("test", "test", 2, receiverEnv,
nodeWrapper);
+ return;
+ }
+ }
+
@Test
public void testWithAllParametersInStreamingMode() throws Exception {
testWithAllParameters("true");
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/TableModelUtils.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/TableModelUtils.java
index e234cc6bd5c..b116cad502c 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/TableModelUtils.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/TableModelUtils.java
@@ -422,6 +422,21 @@ public class TableModelUtils {
handleFailure);
}
+ public static void assertCountData(
+ final String database,
+ final String table,
+ final int count,
+ final BaseEnv baseEnv,
+ final DataNodeWrapper wrapper) {
+ TestUtils.assertDataEventuallyOnEnv(
+ baseEnv,
+ wrapper,
+ getQueryCountSql(table),
+ "_col0,",
+ Collections.singleton(count + ","),
+ database);
+ }
+
public static String getDateStr(final int value) {
Date date = new Date(value);
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 40a84c4128f..6cee85cf2c2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -256,7 +256,7 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
// Retry to handshake by PipeTransferHandshakeV1Req.
if (resp.get() != null
&& resp.get().getStatus().getCode() ==
TSStatusCode.PIPE_TYPE_ERROR.getStatusCode()) {
- LOGGER.info(
+ LOGGER.warn(
"Handshake error by PipeTransferHandshakeV2Req with receiver {}:{}
"
+ "retry to handshake by PipeTransferHandshakeV1Req.",
targetNodeUrl.getIp(),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index 1cf5c354d08..78d5b7d5f0a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -125,6 +125,8 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
return;
}
}
+
+ // If all clients are not available, throw an exception
final StringBuilder errorMessage =
new StringBuilder(
String.format(
@@ -132,7 +134,7 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
for (final Map.Entry<TEndPoint, String> entry :
endPoint2HandshakeErrorMessage.entrySet()) {
errorMessage
.append(" (")
- .append(" host: ")
+ .append("host: ")
.append(entry.getKey().getIp())
.append(", port: ")
.append(entry.getKey().getPort())
@@ -160,11 +162,15 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
}
}
- initClientAndStatus(clientAndStatus, endPoint);
- sendHandshakeReq(clientAndStatus);
+ // It is necessary to ensure that the client is initialized successfully
and not null. If false
+ // is returned, it means that the initialization is not successful and the
handshake operation
+ // is not performed.
+ if (initClientAndStatus(clientAndStatus, endPoint)) {
+ sendHandshakeReq(clientAndStatus);
+ }
}
- private void initClientAndStatus(
+ private boolean initClientAndStatus(
final Pair<IoTDBSyncClient, Boolean> clientAndStatus, final TEndPoint
endPoint) {
try {
clientAndStatus.setLeft(
@@ -179,14 +185,16 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
useSSL,
trustStorePath,
trustStorePwd));
+ return true;
} catch (Exception e) {
endPoint2HandshakeErrorMessage.put(endPoint, e.getMessage());
- throw new PipeConnectionException(
- String.format(
- PipeConnectionException.CONNECTION_ERROR_FORMATTER,
- endPoint.getIp(),
- endPoint.getPort()),
+ LOGGER.warn(
+ "Failed to initialize client with target server ip: {}, port: {},
because {}",
+ endPoint.getIp(),
+ endPoint.getPort(),
+ e.getMessage(),
e);
+ return false;
}
}
@@ -211,7 +219,7 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
// Receiver may be an old version, so we need to retry to handshake by
// PipeTransferHandshakeV1Req.
if (resp.getStatus().getCode() ==
TSStatusCode.PIPE_TYPE_ERROR.getStatusCode()) {
- LOGGER.info(
+ LOGGER.warn(
"Handshake error with target server ip: {}, port: {}, because: {}.
"
+ "Retry to handshake by PipeTransferHandshakeV1Req.",
client.getIpAddress(),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
index 3df2af38a34..799913756e4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
@@ -189,8 +189,14 @@ public abstract class IoTDBAirGapConnector extends
IoTDBConnector {
continue;
}
- sendHandshakeReq(socket);
- isSocketAlive.set(i, true);
+ try {
+ sendHandshakeReq(socket);
+ isSocketAlive.set(i, true);
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Handshake error occurs. It may be caused by an error on the
receiving end. Ignore it.",
+ e);
+ }
}
for (int i = 0; i < sockets.size(); i++) {