This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 63475a16592 Pipe: Add pipeSinkHandshakeTimeout parameter to
distinguish it from pipeSinkTransferTimeout & Call handshake() method of
temporary connector's to validate createPipeStatement (#11655)
63475a16592 is described below
commit 63475a165927191e614936c320d06ff46d70b302
Author: Caideyipi <[email protected]>
AuthorDate: Wed Dec 13 17:38:48 2023 +0800
Pipe: Add pipeSinkHandshakeTimeout parameter to distinguish it from
pipeSinkTransferTimeout & Call handshake() method of temporary connector's to
validate createPipeStatement (#11655)
Currently, pipe's handshake and transfer share one timeout, however,
handshake requires less timeout to avoid timeout of procedure, and transfer
requires more timeout to allow time-consuming loading of receiver's tsFiles.
Thus, this commit distinguished them apart, and refactored the parameters
and syncClient logic.
This commit includes the following modifications:
1. Separate pipe handshake and transfer timeOut and let the former one
sooner.
2. Added handshake logic to pipe SQL validation before the SQL reach
ConfigNode.
3. Moved the authIT to pipe ITs to prevent premature failure.
4. TestUtils refactor.
---------
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../iotdb/db/it/auth/IoTDBClusterAuthorityIT.java | 5 +-
.../iotdb/db/it/auth/IoTDBSystemPermissionIT.java | 75 +-------------
.../org/apache/iotdb/db/it/utils/TestUtils.java | 108 ++++++++++++---------
.../apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java | 97 ++++++++++++++++++
.../db/pipe/agent/plugin/PipePluginAgent.java | 7 ++
.../env/PipeTaskTemporaryRuntimeEnvironment.java | 30 ++++++
.../protocol/airgap/IoTDBAirGapConnector.java | 2 +-
.../thrift/sync/IoTDBThriftSyncConnector.java | 6 +-
.../sync/IoTDBThriftSyncConnectorClient.java | 12 ++-
.../pipe/receiver/airgap/IoTDBAirGapReceiver.java | 2 +-
.../iotdb/commons/client/ClientPoolFactory.java | 2 +-
.../apache/iotdb/commons/conf/CommonConfig.java | 19 +++-
.../iotdb/commons/conf/CommonDescriptor.java | 13 ++-
.../iotdb/commons/pipe/config/PipeConfig.java | 11 ++-
14 files changed, 248 insertions(+), 141 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/auth/IoTDBClusterAuthorityIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/auth/IoTDBClusterAuthorityIT.java
index 47996971fa1..9514474881b 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/auth/IoTDBClusterAuthorityIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/auth/IoTDBClusterAuthorityIT.java
@@ -51,6 +51,7 @@ import java.util.List;
import java.util.Set;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@@ -385,7 +386,7 @@ public class IoTDBClusterAuthorityIT {
authorizerResp = client.queryPermission(authorizerReq);
status = authorizerResp.getStatus();
assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
- assertEquals(null, authorizerResp.getPermissionInfo().getUserInfo());
+ assertNull(authorizerResp.getPermissionInfo().getUserInfo());
assertEquals(1, authorizerResp.getPermissionInfo().getRoleInfoSize());
assertEquals(
0,
@@ -440,7 +441,7 @@ public class IoTDBClusterAuthorityIT {
authorizerResp = client.queryPermission(authorizerReq);
status = authorizerResp.getStatus();
assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
- assertEquals(null, authorizerResp.getMemberInfo());
+ assertNull(authorizerResp.getMemberInfo());
assertEquals(new HashMap<>(),
authorizerResp.getPermissionInfo().getRoleInfo());
assertEquals(
new ArrayList<>(),
authorizerResp.getPermissionInfo().getUserInfo().getRoleList());
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/auth/IoTDBSystemPermissionIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/auth/IoTDBSystemPermissionIT.java
index 1c98040fc01..b270862d4d5 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/auth/IoTDBSystemPermissionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/auth/IoTDBSystemPermissionIT.java
@@ -188,79 +188,8 @@ public class IoTDBSystemPermissionIT {
executeQuery("show CQs", "test", "test123");
}
- @Test
- public void managePipe() {
- assertNonQueryTestFail(
- "create pipe testPipe\n"
- + "with connector (\n"
- + " 'connector'='iotdb-thrift-connector',\n"
- + " 'connector.ip'='127.0.0.1',\n"
- + " 'connector.port'='6668'\n"
- + ")",
- "803: No permissions for this operation, please add privilege
USE_PIPE",
- "test",
- "test123");
- assertNonQueryTestFail(
- "drop pipe testPipe",
- "803: No permissions for this operation, please add privilege
USE_PIPE",
- "test",
- "test123");
- assertTestFail(
- "show pipes",
- "803: No permissions for this operation, please add privilege
USE_PIPE",
- "test",
- "test123");
- assertNonQueryTestFail(
- "start pipe testPipe",
- "803: No permissions for this operation, please add privilege
USE_PIPE",
- "test",
- "test123");
- assertNonQueryTestFail(
- "stop pipe testPipe",
- "803: No permissions for this operation, please add privilege
USE_PIPE",
- "test",
- "test123");
-
- assertNonQueryTestFail(
- "create pipePlugin TestProcessor as
'org.apache.iotdb.db.pipe.example.TestProcessor' USING URI 'xxx'",
- "803: No permissions for this operation, please add privilege
USE_PIPE",
- "test",
- "test123");
- assertNonQueryTestFail(
- "drop pipePlugin TestProcessor",
- "803: No permissions for this operation, please add privilege
USE_PIPE",
- "test",
- "test123");
- assertTestFail(
- "show pipe plugins",
- "803: No permissions for this operation, please add privilege
USE_PIPE",
- "test",
- "test123");
-
- grantUserSystemPrivileges("test", PrivilegeType.USE_PIPE);
-
- executeNonQuery(
- "create pipe testPipe\n"
- + "with connector (\n"
- + " 'connector'='iotdb-thrift-connector',\n"
- + " 'connector.ip'='127.0.0.1',\n"
- + " 'connector.port'='6668'\n"
- + ")",
- "test",
- "test123");
- executeQuery("show pipes", "test", "test123");
- executeNonQuery("start pipe testPipe", "test", "test123");
- executeNonQuery("stop pipe testPipe", "test", "test123");
- executeNonQuery("drop pipe testPipe", "test", "test123");
-
- assertNonQueryTestFail(
- "create pipePlugin TestProcessor as
'org.apache.iotdb.db.pipe.example.TestProcessor' USING URI 'xxx'",
- "1603: The scheme of URI is not set, please specify the scheme of
URI.",
- "test",
- "test123");
- executeNonQuery("drop pipePlugin TestProcessor", "test", "test123");
- executeQuery("show pipe plugins", "test", "test123");
- }
+ // We test pipe permission in IoTDBPipeLifeCycleIT because a fake or self
receiver
+ // will surely lead to premature failure
@Test
public void maintainOperationsTest() {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
index 3fa35d05f25..5e5c8bdd1cb 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
@@ -37,6 +37,7 @@ import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.DateFormat;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -258,7 +259,12 @@ public class TestUtils {
}
public static void assertTestFail(String sql, String errMsg, String
userName, String password) {
- try (Connection connection = EnvFactory.getEnv().getConnection(userName,
password);
+ assertTestFail(EnvFactory.getEnv(), sql, errMsg, userName, password);
+ }
+
+ public static void assertTestFail(
+ BaseEnv env, String sql, String errMsg, String userName, String
password) {
+ try (Connection connection = env.getConnection(userName, password);
Statement statement = connection.createStatement()) {
statement.executeQuery(sql);
fail("No exception!");
@@ -273,7 +279,12 @@ public class TestUtils {
public static void assertNonQueryTestFail(
String sql, String errMsg, String userName, String password) {
- try (Connection connection = EnvFactory.getEnv().getConnection(userName,
password);
+ assertNonQueryTestFail(EnvFactory.getEnv(), sql, errMsg, userName,
password);
+ }
+
+ public static void assertNonQueryTestFail(
+ BaseEnv env, String sql, String errMsg, String userName, String
password) {
+ try (Connection connection = env.getConnection(userName, password);
Statement statement = connection.createStatement()) {
statement.execute(sql);
fail("No exception!");
@@ -388,35 +399,26 @@ public class TestUtils {
}
}
- // This method will not throw failure given that a failure is encountered.
- // Instead, it return a flag to indicate the result of the execution.
public static boolean tryExecuteNonQueryWithRetry(BaseEnv env, String sql) {
- for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
- try (Connection connection = env.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute(sql);
- return true;
- } catch (SQLException e) {
- if (retryCountLeft > 0) {
- try {
- Thread.sleep(10000);
- } catch (InterruptedException ignored) {
- }
- } else {
- e.printStackTrace();
- return false;
- }
- }
- }
- return false;
+ return tryExecuteNonQueryWithRetry(env, sql, "root", "root");
+ }
+
+ public static boolean tryExecuteNonQueryWithRetry(
+ BaseEnv env, String sql, String userName, String password) {
+ return tryExecuteNonQueriesWithRetry(env, Collections.singletonList(sql),
userName, password);
+ }
+
+ public static boolean tryExecuteNonQueriesWithRetry(BaseEnv env,
List<String> sqlList) {
+ return tryExecuteNonQueriesWithRetry(env, sqlList, "root", "root");
}
// This method will not throw failure given that a failure is encountered.
// Instead, it return a flag to indicate the result of the execution.
- public static boolean tryExecuteNonQueriesWithRetry(BaseEnv env,
List<String> sqlList) {
+ public static boolean tryExecuteNonQueriesWithRetry(
+ BaseEnv env, List<String> sqlList, String userName, String password) {
int lastIndex = 0;
for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
- try (Connection connection = env.getConnection();
+ try (Connection connection = env.getConnection(userName, password);
Statement statement = connection.createStatement()) {
for (int i = lastIndex; i < sqlList.size(); ++i) {
statement.execute(sqlList.get(i));
@@ -463,24 +465,8 @@ public class TestUtils {
// Instead, it return a flag to indicate the result of the execution.
public static boolean tryExecuteNonQueryOnSpecifiedDataNodeWithRetry(
BaseEnv env, DataNodeWrapper wrapper, String sql) {
- for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
- try (Connection connection =
env.getConnectionWithSpecifiedDataNode(wrapper);
- Statement statement = connection.createStatement()) {
- statement.execute(sql);
- return true;
- } catch (SQLException e) {
- if (retryCountLeft > 0) {
- try {
- Thread.sleep(10000);
- } catch (InterruptedException ignored) {
- }
- } else {
- e.printStackTrace();
- return false;
- }
- }
- }
- return false;
+ return tryExecuteNonQueriesOnSpecifiedDataNodeWithRetry(
+ env, wrapper, Collections.singletonList(sql));
}
public static boolean tryExecuteNonQueriesOnSpecifiedDataNodeWithRetry(
@@ -523,6 +509,31 @@ public class TestUtils {
}
}
+ public static void executeQueryWithRetry(
+ BaseEnv env, String sql, String userName, String password) {
+ try (Connection connection = env.getConnection(userName, password);
+ Statement statement = connection.createStatement()) {
+ for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
+ try {
+ statement.executeQuery(sql);
+ } catch (SQLException e) {
+ if (retryCountLeft > 0) {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException ignored) {
+ }
+ } else {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
public static ResultSet executeQueryWithRetry(Statement statement, String
sql) {
for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
try {
@@ -569,7 +580,11 @@ public class TestUtils {
}
public static void createUser(String userName, String password) {
- try (Connection connection = EnvFactory.getEnv().getConnection();
+ createUser(EnvFactory.getEnv(), userName, password);
+ }
+
+ public static void createUser(BaseEnv env, String userName, String password)
{
+ try (Connection connection = env.getConnection();
Statement statement = connection.createStatement()) {
statement.execute(String.format("create user %s '%s'", userName,
password));
} catch (SQLException e) {
@@ -579,7 +594,12 @@ public class TestUtils {
}
public static void grantUserSystemPrivileges(String userName, PrivilegeType
privilegeType) {
- try (Connection connection = EnvFactory.getEnv().getConnection();
+ grantUserSystemPrivileges(EnvFactory.getEnv(), userName, privilegeType);
+ }
+
+ public static void grantUserSystemPrivileges(
+ BaseEnv env, String userName, PrivilegeType privilegeType) {
+ try (Connection connection = env.getConnection();
Statement statement = connection.createStatement()) {
statement.execute(String.format("grant %s on root.** to user %s",
privilegeType, userName));
} catch (SQLException e) {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
index 8226d25969d..b7eacc79c87 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.pipe.it;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
@@ -41,6 +42,14 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.iotdb.db.it.utils.TestUtils.assertNonQueryTestFail;
+import static org.apache.iotdb.db.it.utils.TestUtils.assertTestFail;
+import static org.apache.iotdb.db.it.utils.TestUtils.createUser;
+import static org.apache.iotdb.db.it.utils.TestUtils.executeQueryWithRetry;
+import static org.apache.iotdb.db.it.utils.TestUtils.grantUserSystemPrivileges;
+import static
org.apache.iotdb.db.it.utils.TestUtils.tryExecuteNonQueriesWithRetry;
+import static
org.apache.iotdb.db.it.utils.TestUtils.tryExecuteNonQueryWithRetry;
+
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2.class})
public class IoTDBPipeLifeCycleIT extends AbstractPipeDualIT {
@@ -696,4 +705,92 @@ public class IoTDBPipeLifeCycleIT extends
AbstractPipeDualIT {
TestUtils.assertDataOnEnv(
receiverEnv, "select * from root.**", "Time,root.db.d1.s1,",
expectedResSet);
}
+
+ @Test
+ public void testPermission() {
+ createUser(senderEnv, "test", "test123");
+
+ assertNonQueryTestFail(
+ senderEnv,
+ "create pipe testPipe\n"
+ + "with connector (\n"
+ + " 'connector'='iotdb-thrift-connector',\n"
+ + " 'connector.ip'='127.0.0.1',\n"
+ + " 'connector.port'='6668'\n"
+ + ")",
+ "803: No permissions for this operation, please add privilege
USE_PIPE",
+ "test",
+ "test123");
+ assertNonQueryTestFail(
+ senderEnv,
+ "drop pipe testPipe",
+ "803: No permissions for this operation, please add privilege
USE_PIPE",
+ "test",
+ "test123");
+ assertTestFail(
+ senderEnv,
+ "show pipes",
+ "803: No permissions for this operation, please add privilege
USE_PIPE",
+ "test",
+ "test123");
+ assertNonQueryTestFail(
+ senderEnv,
+ "start pipe testPipe",
+ "803: No permissions for this operation, please add privilege
USE_PIPE",
+ "test",
+ "test123");
+ assertNonQueryTestFail(
+ senderEnv,
+ "stop pipe testPipe",
+ "803: No permissions for this operation, please add privilege
USE_PIPE",
+ "test",
+ "test123");
+
+ assertNonQueryTestFail(
+ senderEnv,
+ "create pipePlugin TestProcessor as
'org.apache.iotdb.db.pipe.example.TestProcessor' USING URI 'xxx'",
+ "803: No permissions for this operation, please add privilege
USE_PIPE",
+ "test",
+ "test123");
+ assertNonQueryTestFail(
+ senderEnv,
+ "drop pipePlugin TestProcessor",
+ "803: No permissions for this operation, please add privilege
USE_PIPE",
+ "test",
+ "test123");
+ assertTestFail(
+ senderEnv,
+ "show pipe plugins",
+ "803: No permissions for this operation, please add privilege
USE_PIPE",
+ "test",
+ "test123");
+
+ grantUserSystemPrivileges(senderEnv, "test", PrivilegeType.USE_PIPE);
+
+ tryExecuteNonQueryWithRetry(
+ senderEnv,
+ "create pipe testPipe\n"
+ + "with connector (\n"
+ + " 'connector'='iotdb-thrift-connector',\n"
+ + " 'connector.ip'='127.0.0.1',\n"
+ + " 'connector.port'='6668'\n"
+ + ")",
+ "test",
+ "test123");
+ executeQueryWithRetry(senderEnv, "show pipes", "test", "test123");
+ tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList("start pipe testPipe", "stop pipe testPipe", "drop pipe
testPipe"),
+ "test",
+ "test123");
+
+ assertNonQueryTestFail(
+ senderEnv,
+ "create pipePlugin TestProcessor as
'org.apache.iotdb.db.pipe.example.TestProcessor' USING URI 'xxx'",
+ "1603: The scheme of URI is not set, please specify the scheme of
URI.",
+ "test",
+ "test123");
+ tryExecuteNonQueryWithRetry(senderEnv, "drop pipePlugin TestProcessor",
"test", "test123");
+ executeQueryWithRetry(senderEnv, "show pipe plugins", "test", "test123");
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index 7ee2f99b1ea..66e110e45a5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -24,6 +24,8 @@ import
org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoader;
import
org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
import
org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
+import
org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
+import
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskTemporaryRuntimeEnvironment;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipeStatement;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.PipeExtractor;
@@ -240,6 +242,11 @@ public class PipePluginAgent {
final PipeConnector temporaryConnector =
reflectConnector(connectorParameters);
try {
temporaryConnector.validate(new
PipeParameterValidator(connectorParameters));
+ temporaryConnector.customize(
+ connectorParameters,
+ new PipeTaskRuntimeConfiguration(
+ new PipeTaskTemporaryRuntimeEnvironment(createPipeStatement)));
+ temporaryConnector.handshake();
} finally {
try {
temporaryConnector.close();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskTemporaryRuntimeEnvironment.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskTemporaryRuntimeEnvironment.java
new file mode 100644
index 00000000000..45abc1f070c
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskTemporaryRuntimeEnvironment.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.config.plugin.env;
+
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipeStatement;
+
+/** For temporary use when validating during creating a new pipe. */
+public class PipeTaskTemporaryRuntimeEnvironment extends
PipeTaskRuntimeEnvironment {
+
+ public PipeTaskTemporaryRuntimeEnvironment(CreatePipeStatement
createPipeStatement) {
+ super(createPipeStatement.getPipeName(), System.currentTimeMillis(), -1);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
index 547fe8f60f6..072db1a58d8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
@@ -199,7 +199,7 @@ public class IoTDBAirGapConnector extends IoTDBConnector {
throw new PipeException("Handshake error with target server ip: " + ip
+ ", port: " + port);
} else {
isSocketAlive.set(i, true);
- socket.setSoTimeout((int) PIPE_CONFIG.getPipeConnectorTimeoutMs());
+ socket.setSoTimeout((int)
PIPE_CONFIG.getPipeConnectorTransferTimeoutMs());
LOGGER.info("Handshake success. Target server ip: {}, port: {}", ip,
port);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
index 2f50a7081b0..f4a538f1b92 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
@@ -177,7 +177,7 @@ public class IoTDBThriftSyncConnector extends
IoTDBConnector {
i,
new IoTDBThriftSyncConnectorClient(
new ThriftClientProperty.Builder()
- .setConnectionTimeoutMs((int)
PIPE_CONFIG.getPipeConnectorTimeoutMs())
+ .setConnectionTimeoutMs((int)
PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
.setRpcThriftCompressionEnabled(
PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
.build(),
@@ -186,7 +186,6 @@ public class IoTDBThriftSyncConnector extends
IoTDBConnector {
useSSL,
trustStore,
trustStorePwd));
-
try {
final TPipeTransferResp resp =
clients
@@ -202,6 +201,9 @@ public class IoTDBThriftSyncConnector extends
IoTDBConnector {
resp.status);
} else {
isClientAlive.set(i, true);
+ clients
+ .get(i)
+ .setTimeout((int)
PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
LOGGER.info("Handshake success. Target server ip: {}, port: {}", ip,
port);
}
} catch (TException e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnectorClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnectorClient.java
index 8e229e9010a..d0c82ecf93a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnectorClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnectorClient.java
@@ -21,8 +21,8 @@ package
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync;
import org.apache.iotdb.commons.client.ThriftClient;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TimeoutChangeableTransport;
import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
import org.apache.thrift.transport.TTransport;
@@ -47,19 +47,21 @@ public class IoTDBThriftSyncConnectorClient extends
IClientRPCService.Client
? RpcTransportFactory.INSTANCE.getTransport(
ipAddress,
port,
- (int)
PipeConfig.getInstance().getPipeConnectorTimeoutMs(),
+ property.getConnectionTimeoutMs(),
trustStore,
trustStorePwd)
: RpcTransportFactory.INSTANCE.getTransport(
- ipAddress,
- port,
- (int)
PipeConfig.getInstance().getPipeConnectorTimeoutMs())));
+ ipAddress, port, property.getConnectionTimeoutMs())));
TTransport transport = getInputProtocol().getTransport();
if (!transport.isOpen()) {
transport.open();
}
}
+ public void setTimeout(int timeout) {
+ ((TimeoutChangeableTransport)
(getInputProtocol().getTransport())).setTimeout(timeout);
+ }
+
@Override
public void close() throws Exception {
invalidate();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
index 11dd9b29be5..3c7f038ccef 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
@@ -74,7 +74,7 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
@Override
public void runMayThrow() throws Throwable {
- socket.setSoTimeout((int)
PipeConfig.getInstance().getPipeConnectorTimeoutMs());
+ socket.setSoTimeout((int)
PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
socket.setKeepAlive(true);
LOGGER.info("Pipe air gap receiver {} started. Socket: {}", receiverId,
socket);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index 9297136365f..077906332f0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -265,7 +265,7 @@ public class ClientPoolFactory {
new AsyncPipeDataTransferServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
- .setConnectionTimeoutMs((int)
conf.getPipeConnectorTimeoutMs())
+ .setConnectionTimeoutMs((int)
conf.getPipeConnectorTransferTimeoutMs())
.setRpcThriftCompressionEnabled(
conf.isPipeConnectorRPCThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 579d59e3720..683654d3bb0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -162,7 +162,8 @@ public class CommonConfig {
private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 50;
// 50B
private int pipeExtractorMatcherCacheSize = 1024;
- private long pipeConnectorTimeoutMs = 15 * 60 * 1000L; // 15 minutes
+ private long pipeConnectorHandshakeTimeoutMs = 10 * 1000L; // 10 seconds
+ private long pipeConnectorTransferTimeoutMs = 15 * 60 * 1000L; // 15 minutes
private int pipeConnectorReadFileBufferSize = 8388608;
private long pipeConnectorRetryIntervalMs = 1000L;
// recommend to set this value to 3 * pipeSubtaskExecutorMaxThreadNum *
@@ -561,12 +562,20 @@ public class CommonConfig {
this.pipeExtractorMatcherCacheSize = pipeExtractorMatcherCacheSize;
}
- public long getPipeConnectorTimeoutMs() {
- return pipeConnectorTimeoutMs;
+ public long getPipeConnectorHandshakeTimeoutMs() {
+ return pipeConnectorHandshakeTimeoutMs;
}
- public void setPipeConnectorTimeoutMs(long pipeConnectorTimeoutMs) {
- this.pipeConnectorTimeoutMs = pipeConnectorTimeoutMs;
+ public void setPipeConnectorHandshakeTimeoutMs(long
pipeConnectorHandshakeTimeoutMs) {
+ this.pipeConnectorHandshakeTimeoutMs = pipeConnectorHandshakeTimeoutMs;
+ }
+
+ public long getPipeConnectorTransferTimeoutMs() {
+ return pipeConnectorTransferTimeoutMs;
+ }
+
+ public void setPipeConnectorTransferTimeoutMs(long
pipeConnectorTransferTimeoutMs) {
+ this.pipeConnectorTransferTimeoutMs = pipeConnectorTransferTimeoutMs;
}
public int getPipeConnectorReadFileBufferSize() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index a7ae02c1bea..e7c4b127af7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.commons.conf;
import org.apache.iotdb.commons.enums.HandleSystemErrorStrategy;
-import org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
@@ -56,7 +55,7 @@ public class CommonDescriptor {
config.setProcedureWalFolder(systemDir + File.separator + "procedure");
}
- public void loadCommonProps(Properties properties) throws
BadNodeUrlException {
+ public void loadCommonProps(Properties properties) {
config.setAuthorizerProvider(
properties.getProperty("authorizer_provider_class",
config.getAuthorizerProvider()).trim());
// if using org.apache.iotdb.db.auth.authorizer.OpenIdAuthorizer,
openID_url is needed.
@@ -310,10 +309,16 @@ public class CommonDescriptor {
"pipe_extractor_matcher_cache_size",
String.valueOf(config.getPipeExtractorMatcherCacheSize()))));
- config.setPipeConnectorTimeoutMs(
+ config.setPipeConnectorHandshakeTimeoutMs(
Long.parseLong(
properties.getProperty(
- "pipe_connector_timeout_ms",
String.valueOf(config.getPipeConnectorTimeoutMs()))));
+ "pipe_connector_handshake_timeout_ms",
+ String.valueOf(config.getPipeConnectorHandshakeTimeoutMs()))));
+ config.setPipeConnectorTransferTimeoutMs(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_connector_timeout_ms",
+ String.valueOf(config.getPipeConnectorTransferTimeoutMs()))));
config.setPipeConnectorReadFileBufferSize(
Integer.parseInt(
properties.getProperty(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index c70def090c5..b16e588783f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -91,8 +91,12 @@ public class PipeConfig {
/////////////////////////////// Connector ///////////////////////////////
- public long getPipeConnectorTimeoutMs() {
- return COMMON_CONFIG.getPipeConnectorTimeoutMs();
+ public long getPipeConnectorHandshakeTimeoutMs() {
+ return COMMON_CONFIG.getPipeConnectorHandshakeTimeoutMs();
+ }
+
+ public long getPipeConnectorTransferTimeoutMs() {
+ return COMMON_CONFIG.getPipeConnectorTransferTimeoutMs();
}
public int getPipeConnectorReadFileBufferSize() {
@@ -229,7 +233,8 @@ public class PipeConfig {
getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes());
LOGGER.info("PipeExtractorMatcherCacheSize: {}",
getPipeExtractorMatcherCacheSize());
- LOGGER.info("PipeConnectorTimeoutMs: {}", getPipeConnectorTimeoutMs());
+ LOGGER.info("PipeConnectorHandshakeTimeoutMs: {}",
getPipeConnectorHandshakeTimeoutMs());
+ LOGGER.info("PipeConnectorTransferTimeoutMs: {}",
getPipeConnectorTransferTimeoutMs());
LOGGER.info("PipeConnectorReadFileBufferSize: {}",
getPipeConnectorReadFileBufferSize());
LOGGER.info("PipeConnectorRetryIntervalMs: {}",
getPipeConnectorRetryIntervalMs());
LOGGER.info("PipeConnectorPendingQueueSize: {}",
getPipeConnectorPendingQueueSize());