This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 02ff0d27c57 Pipe / Load: Enable validation skip for load tsFile
(#14774)
02ff0d27c57 is described below
commit 02ff0d27c57ed79fb294fea08ed1583a2f8434d2
Author: Caideyipi <[email protected]>
AuthorDate: Wed Feb 5 15:27:36 2025 +0800
Pipe / Load: Enable validation skip for load tsFile (#14774)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../pipe/it/autocreate/IoTDBPipeDataSinkIT.java | 71 ++++++++++++++++
.../pipe/it/tablemodel/IoTDBPipeDataSinkIT.java | 97 ++++++++++++++++++++++
.../org/apache/iotdb/tool/tsfile/ImportTsFile.java | 12 ++-
.../iotdb/tool/tsfile/ImportTsFileLocally.java | 8 +-
.../iotdb/tool/tsfile/ImportTsFileRemotely.java | 8 ++
.../client/IoTDBConfigNodeSyncClientManager.java | 6 +-
.../protocol/IoTDBConfigRegionAirGapConnector.java | 3 +
.../protocol/IoTDBConfigRegionConnector.java | 6 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 ++-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 ++
.../client/IoTDBDataNodeAsyncClientManager.java | 9 +-
.../client/IoTDBDataNodeSyncClientManager.java | 6 +-
.../airgap/IoTDBDataNodeAirGapConnector.java | 3 +
.../async/IoTDBDataRegionAsyncConnector.java | 3 +-
.../thrift/sync/IoTDBDataNodeSyncConnector.java | 6 +-
.../protocol/thrift/IoTDBDataNodeReceiver.java | 2 +-
.../plan/analyze/load/LoadTsFileAnalyzer.java | 2 +-
.../db/queryengine/plan/parser/ASTVisitor.java | 5 +-
.../plan/relational/sql/ast/LoadTsFile.java | 7 ++
.../load/active/ActiveLoadTsFileLoader.java | 3 +-
.../load/config/LoadTsFileConfigurator.java | 20 +++++
.../config/constant/PipeConnectorConstant.java | 5 ++
.../pipe/connector/client/IoTDBClientManager.java | 6 +-
.../connector/client/IoTDBSyncClientManager.java | 9 +-
.../common/PipeTransferHandshakeConstant.java | 1 +
.../pipe/connector/protocol/IoTDBConnector.java | 8 ++
.../connector/protocol/IoTDBSslSyncConnector.java | 6 +-
.../commons/pipe/receiver/IoTDBFileReceiver.java | 7 ++
28 files changed, 310 insertions(+), 26 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
index 26c76bf0042..7facee0bbf0 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
@@ -448,4 +448,75 @@ public class IoTDBPipeDataSinkIT extends
AbstractPipeDualAutoIT {
new HashSet<>(Arrays.asList("0,1.0,", "1,1.0,", "2,1.0,",
"3,1.0,", "4,1.0,"))));
}
}
+
+ @Test
+ public void testSyncLoadTsFileWithoutVerify() throws Exception {
+ testLoadTsFileWithoutVerify("sync");
+ }
+
+ @Test
+ public void testAsyncLoadTsFileWithoutVerify() throws Exception {
+ testLoadTsFileWithoutVerify("async");
+ }
+
+ private void testLoadTsFileWithoutVerify(final String loadTsFileStrategy)
throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+
+ // Do not fail if the failure has nothing to do with pipe
+ // Because the failures will randomly generate due to resource limitation
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList("insert into root.vehicle.d0(time, s1) values (1, 1)",
"flush"))) {
+ return;
+ }
+
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor.realtime.mode", "batch");
+
+ connectorAttributes.put("sink", "iotdb-thrift-sink");
+ connectorAttributes.put("sink.batch.enable", "false");
+ connectorAttributes.put("sink.ip", receiverIp);
+ connectorAttributes.put("sink.port", Integer.toString(receiverPort));
+ connectorAttributes.put("sink.load-tsfile-strategy", loadTsFileStrategy);
+ connectorAttributes.put("sink.tsfile.validation", "false");
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ client
+ .createPipe(
+ new TCreatePipeReq("testPipe", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes))
+ .getCode());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("testPipe").getCode());
+
+ // Do not fail if the failure has nothing to do with pipe
+ // Because the failures will randomly generate due to resource limitation
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "create timeSeries root.vehicle.d0.s1 int32",
+ "insert into root.vehicle.d0(time, s1) values (2, 1)",
+ "flush"))) {
+ return;
+ }
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select * from root.**",
+ "Time,root.vehicle.d0.s1,",
+ Collections.unmodifiableSet(new HashSet<>(Arrays.asList("1,1.0,",
"2,1.0,"))));
+ }
+ }
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDataSinkIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDataSinkIT.java
index 1faf4a4c65c..2867a340250 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDataSinkIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDataSinkIT.java
@@ -694,4 +694,101 @@ public class IoTDBPipeDataSinkIT extends
AbstractPipeTableModelTestIT {
}
}
}
+
+ @Test
+ public void testLoadTsFileWithoutVerify() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+
+ // Do not fail if the failure has nothing to do with pipe
+ // Because the failures will randomly generate due to resource limitation
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList("insert into root.vehicle.d0(time, s1) values (1, 1)",
"flush"))) {
+ return;
+ }
+
+ for (int i = 0; i < 5; i++) {
+ TableModelUtils.createDataBaseAndTable(senderEnv, "test" + i, "test0");
+ TableModelUtils.createDataBaseAndTable(senderEnv, "test" + i, "test1");
+ }
+
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor.realtime.mode", "batch");
+ extractorAttributes.put("capture.table", "true");
+ extractorAttributes.put("capture.tree", "true");
+
+ connectorAttributes.put("sink", "iotdb-thrift-sink");
+ connectorAttributes.put("sink.batch.enable", "false");
+ connectorAttributes.put("sink.ip", receiverIp);
+ connectorAttributes.put("sink.port", Integer.toString(receiverPort));
+ connectorAttributes.put("sink.tsfile.validation", "false");
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ client
+ .createPipe(
+ new TCreatePipeReq("testPipe", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes))
+ .getCode());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("testPipe").getCode());
+
+ // Do not fail if the failure has nothing to do with pipe
+ // Because the failures will randomly generate due to resource limitation
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "create timeSeries root.vehicle.d0.s1 int32",
+ "insert into root.vehicle.d0(time, s1) values (2, 1)",
+ "flush"))) {
+ return;
+ }
+
+ Map<String, List<Tablet>> testResult = new HashMap<>();
+ Map<String, List<Tablet>> test1Result = new HashMap<>();
+
+ insertTablet1(testResult, test1Result);
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select * from root.**",
+ "Time,root.vehicle.d0.s1,",
+ Collections.unmodifiableSet(new HashSet<>(Arrays.asList("1,1.0,",
"2,1.0,"))));
+
+ for (Map.Entry<String, List<Tablet>> entry : testResult.entrySet()) {
+ final Set<String> set = new HashSet<>();
+ entry
+ .getValue()
+ .forEach(
+ tablet -> {
+ set.addAll(TableModelUtils.generateExpectedResults(tablet));
+ });
+ TableModelUtils.assertCountData("test0", entry.getKey(), set.size(),
receiverEnv, s -> {});
+ TableModelUtils.assertData("test0", entry.getKey(), set, receiverEnv,
s -> {});
+ }
+
+ for (Map.Entry<String, List<Tablet>> entry : test1Result.entrySet()) {
+ final Set<String> set = new HashSet<>();
+ entry
+ .getValue()
+ .forEach(
+ tablet -> {
+ set.addAll(TableModelUtils.generateExpectedResults(tablet));
+ });
+ TableModelUtils.assertCountData("test1", entry.getKey(), set.size(),
receiverEnv, s -> {});
+ TableModelUtils.assertData("test1", entry.getKey(), set, receiverEnv,
s -> {});
+ }
+ }
+ }
}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java
index 50cd1a03062..cdfbd5b4437 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java
@@ -60,6 +60,9 @@ public class ImportTsFile extends AbstractTsFileTool {
private static final String THREAD_NUM_ARGS = "tn";
private static final String THREAD_NUM_NAME = "thread_num";
+ protected static final String VERIFY_ARGS = "v";
+ protected static final String VERIFY_NAME = "verify";
+
private static final IoTPrinter IOT_PRINTER = new IoTPrinter(System.out);
private static final String TS_FILE_CLI_PREFIX = "ImportTsFile";
@@ -80,7 +83,7 @@ public class ImportTsFile extends AbstractTsFileTool {
private static int threadNum = 8;
private static boolean isRemoteLoad = true;
-
+ protected static boolean verify = true;
private static SessionPool sessionPool;
private static void createOptions() {
@@ -299,6 +302,11 @@ public class ImportTsFile extends AbstractTsFileTool {
if (commandLine.getOptionValue(TIMESTAMP_PRECISION_ARGS) != null) {
timestampPrecision =
commandLine.getOptionValue(TIMESTAMP_PRECISION_ARGS);
}
+
+ verify =
+ null != commandLine.getOptionValue(VERIFY_ARGS)
+ ? Boolean.parseBoolean(commandLine.getOptionValue(VERIFY_ARGS))
+ : verify;
}
public static boolean isFileStoreEquals(String pathString, File dir) {
@@ -387,12 +395,14 @@ public class ImportTsFile extends AbstractTsFileTool {
}
ImportTsFileLocally.setSessionPool(sessionPool);
+ ImportTsFileLocally.setVerify(verify);
// ImportTsFileRemotely
ImportTsFileRemotely.setHost(host);
ImportTsFileRemotely.setPort(port);
ImportTsFileRemotely.setUsername(username);
ImportTsFileRemotely.setPassword(password);
+ ImportTsFileRemotely.setValidateTsFile(verify);
// ImportTsFileBase
ImportTsFileBase.setSuccessAndFailDirAndOperation(
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileLocally.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileLocally.java
index 8f65d26fd32..f94050e98eb 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileLocally.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileLocally.java
@@ -27,13 +27,15 @@ public class ImportTsFileLocally extends ImportTsFileBase
implements Runnable {
private static final IoTPrinter ioTPrinter = new IoTPrinter(System.out);
private static SessionPool sessionPool;
+ private static boolean verify;
@Override
public void loadTsFile() {
String filePath;
try {
while ((filePath = ImportTsFileScanTool.pollFromQueue()) != null) {
- final String sql = "load '" + filePath + "' onSuccess=none ";
+ final String sql =
+ "load '" + filePath + "' onSuccess=none " + (verify ? "" :
"verify=false");
try {
sessionPool.executeNonQueryStatement(sql);
@@ -50,4 +52,8 @@ public class ImportTsFileLocally extends ImportTsFileBase
implements Runnable {
public static void setSessionPool(SessionPool sessionPool) {
ImportTsFileLocally.sessionPool = sessionPool;
}
+
+ public static void setVerify(boolean verify) {
+ ImportTsFileLocally.verify = verify;
+ }
}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
index 5459b0448ef..9f4e09fa364 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
@@ -74,6 +74,7 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
private static String username = SessionConfig.DEFAULT_USER;
private static String password = SessionConfig.DEFAULT_PASSWORD;
+ private static boolean validateTsFile;
public ImportTsFileRemotely(String timePrecision) {
setTimePrecision(timePrecision);
@@ -189,6 +190,9 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY,
LOAD_STRATEGY);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password);
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
+ Boolean.toString(validateTsFile));
return params;
}
@@ -346,4 +350,8 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
public static void setPassword(final String password) {
ImportTsFileRemotely.password = password;
}
+
+ public static void setValidateTsFile(final boolean validateTsFile) {
+ ImportTsFileRemotely.validateTsFile = validateTsFile;
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
index 8ae0651187e..33013ff6f39 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
@@ -44,7 +44,8 @@ public class IoTDBConfigNodeSyncClientManager extends
IoTDBSyncClientManager {
String username,
String password,
boolean shouldReceiverConvertOnTypeMismatch,
- String loadTsFileStrategy) {
+ String loadTsFileStrategy,
+ boolean validateTsFile) {
super(
endPoints,
useSSL,
@@ -55,7 +56,8 @@ public class IoTDBConfigNodeSyncClientManager extends
IoTDBSyncClientManager {
username,
password,
shouldReceiverConvertOnTypeMismatch,
- loadTsFileStrategy);
+ loadTsFileStrategy,
+ validateTsFile);
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
index f20a412a7c0..ce74675dc44 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
@@ -79,6 +79,9 @@ public class IoTDBConfigRegionAirGapConnector extends
IoTDBAirGapConnector {
PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY,
loadTsFileStrategy);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password);
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
+ Boolean.toString(loadTsFileValidation));
return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index 3fb90f03cf0..34d99517d30 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -73,7 +73,8 @@ public class IoTDBConfigRegionConnector extends
IoTDBSslSyncConnector {
final String username,
final String password,
final boolean shouldReceiverConvertOnTypeMismatch,
- final String loadTsFileStrategy) {
+ final String loadTsFileStrategy,
+ final boolean validateTsFile) {
return new IoTDBConfigNodeSyncClientManager(
nodeUrls,
useSSL,
@@ -83,7 +84,8 @@ public class IoTDBConfigRegionConnector extends
IoTDBSslSyncConnector {
username,
password,
shouldReceiverConvertOnTypeMismatch,
- loadTsFileStrategy);
+ loadTsFileStrategy,
+ validateTsFile);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index ec6884eeccf..05c36271e60 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1201,11 +1201,12 @@ public class IoTDBConfig {
+ IoTDBConstant.LOAD_TSFILE_FOLDER_NAME
+ File.separator
+ IoTDBConstant.LOAD_TSFILE_ACTIVE_LISTENING_FAILED_FOLDER_NAME;
-
private long loadActiveListeningCheckIntervalSeconds = 5L;
private int loadActiveListeningMaxThreadNum =
Runtime.getRuntime().availableProcessors();
+ private boolean loadActiveListeningVerifyEnable = true;
+
/** Pipe related */
/** initialized as empty, updated based on the latest `systemDir` during
querying */
private String[] pipeReceiverFileDirs = new String[0];
@@ -4146,6 +4147,14 @@ public class IoTDBConfig {
this.loadActiveListeningMaxThreadNum = loadActiveListeningMaxThreadNum;
}
+ public boolean isLoadActiveListeningVerifyEnable() {
+ return loadActiveListeningVerifyEnable;
+ }
+
+ public void setLoadActiveListeningVerifyEnable(boolean
loadActiveListeningVerifyEnable) {
+ this.loadActiveListeningVerifyEnable = loadActiveListeningVerifyEnable;
+ }
+
public long getLoadActiveListeningCheckIntervalSeconds() {
return loadActiveListeningCheckIntervalSeconds;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 2d567c401c3..890e37ae477 100755
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2470,6 +2470,12 @@ public class IoTDBDescriptor {
if (conf.getLoadActiveListeningMaxThreadNum() <= 0) {
conf.setLoadActiveListeningMaxThreadNum(Runtime.getRuntime().availableProcessors());
}
+
+ conf.setLoadActiveListeningVerifyEnable(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "load_active_listening_verify_enable",
+ Boolean.toString(conf.isLoadActiveListeningVerifyEnable()))));
}
private void loadLoadTsFileHotModifiedProp(TrimProperties properties) throws
IOException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 6cee85cf2c2..cd6e3e48c52 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -84,14 +84,16 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
final String username,
final String password,
final boolean shouldReceiverConvertOnTypeMismatch,
- final String loadTsFileStrategy) {
+ final String loadTsFileStrategy,
+ final boolean validateTsFile) {
super(
endPoints,
useLeaderCache,
username,
password,
shouldReceiverConvertOnTypeMismatch,
- loadTsFileStrategy);
+ loadTsFileStrategy,
+ validateTsFile);
endPointSet = new HashSet<>(endPoints);
@@ -248,6 +250,9 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY,
loadTsFileStrategy);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME,
username);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD,
password);
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
+ Boolean.toString(validateTsFile));
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params),
callback);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
index c9f9489e164..e89268b2a23 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
@@ -54,7 +54,8 @@ public class IoTDBDataNodeSyncClientManager extends
IoTDBSyncClientManager
final String username,
final String password,
final boolean shouldReceiverConvertOnTypeMismatch,
- final String loadTsFileStrategy) {
+ final String loadTsFileStrategy,
+ final boolean validateTsFile) {
super(
endPoints,
useSSL,
@@ -65,7 +66,8 @@ public class IoTDBDataNodeSyncClientManager extends
IoTDBSyncClientManager
username,
password,
shouldReceiverConvertOnTypeMismatch,
- loadTsFileStrategy);
+ loadTsFileStrategy,
+ validateTsFile);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
index 591db25fc6a..3bb2348e75d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
@@ -106,6 +106,9 @@ public abstract class IoTDBDataNodeAirGapConnector extends
IoTDBAirGapConnector
PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY,
loadTsFileStrategy);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password);
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
+ Boolean.toString(loadTsFileValidation));
return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 72b6f29494f..242564ea9bb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -137,7 +137,8 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
username,
password,
shouldReceiverConvertOnTypeMismatch,
- loadTsFileStrategy);
+ loadTsFileStrategy,
+ loadTsFileValidation);
if (isTabletBatchModeEnabled) {
tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
index d3386a3eb93..02c98f2958a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
@@ -87,7 +87,8 @@ public abstract class IoTDBDataNodeSyncConnector extends
IoTDBSslSyncConnector {
final String username,
final String password,
final boolean shouldReceiverConvertOnTypeMismatch,
- final String loadTsFileStrategy) {
+ final String loadTsFileStrategy,
+ final boolean validateTsFile) {
clientManager =
new IoTDBDataNodeSyncClientManager(
nodeUrls,
@@ -99,7 +100,8 @@ public abstract class IoTDBDataNodeSyncConnector extends
IoTDBSslSyncConnector {
username,
password,
shouldReceiverConvertOnTypeMismatch,
- loadTsFileStrategy);
+ loadTsFileStrategy,
+ validateTsFile);
return clientManager;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index ec0274a38fc..d72cbc030dc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -594,7 +594,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
final LoadTsFileStatement statement = new
LoadTsFileStatement(fileAbsolutePath);
statement.setDeleteAfterLoad(true);
statement.setConvertOnTypeMismatch(true);
- statement.setVerifySchema(true);
+ statement.setVerifySchema(validateTsFile.get());
statement.setAutoCreateDatabase(false);
statement.setModel(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 83d685c007c..92cd9584841 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -107,7 +107,7 @@ public abstract class LoadTsFileAnalyzer implements
AutoCloseable {
this.loadTsFileTableStatement = loadTsFileTableStatement;
this.tsFiles = loadTsFileTableStatement.getTsFiles();
this.statementString = loadTsFileTableStatement.toString();
- this.isVerifySchema = true;
+ this.isVerifySchema = loadTsFileTableStatement.isVerifySchema();
this.isDeleteAfterLoad = loadTsFileTableStatement.isDeleteAfterLoad();
this.isConvertOnTypeMismatch =
loadTsFileTableStatement.isConvertOnTypeMismatch();
this.isAutoCreateDatabase =
loadTsFileTableStatement.isAutoCreateDatabase();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index 36dda455814..d7118d7e70e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -2078,10 +2078,7 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
} else if (ctx.SGLEVEL() != null) {
loadTsFileStatement.setDatabaseLevel(Integer.parseInt(ctx.INTEGER_LITERAL().getText()));
} else if (ctx.VERIFY() != null) {
- if (!Boolean.parseBoolean(ctx.boolean_literal().getText())) {
- throw new SemanticException("Load option VERIFY can only be set to
true.");
- }
- loadTsFileStatement.setVerifySchema(true);
+
loadTsFileStatement.setVerifySchema(Boolean.parseBoolean(ctx.boolean_literal().getText()));
} else {
throw new SemanticException(
String.format(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
index 669d8da6645..4d7312a31b5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
@@ -44,6 +44,7 @@ public class LoadTsFile extends Statement {
private boolean deleteAfterLoad = false;
private boolean convertOnTypeMismatch = true;
private boolean autoCreateDatabase = true;
+ private boolean verify;
private boolean isGeneratedByPipe = false;
private String model = LoadTsFileConfigurator.MODEL_TABLE_VALUE;
@@ -61,6 +62,7 @@ public class LoadTsFile extends Statement {
this.databaseLevel =
IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel();
this.deleteAfterLoad = false;
this.convertOnTypeMismatch = true;
+ this.verify = true;
this.autoCreateDatabase =
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
this.resources = new ArrayList<>();
this.writePointCountList = new ArrayList<>();
@@ -100,6 +102,10 @@ public class LoadTsFile extends Statement {
return convertOnTypeMismatch;
}
+ public boolean isVerifySchema() {
+ return verify;
+ }
+
public int getDatabaseLevel() {
return databaseLevel;
}
@@ -151,6 +157,7 @@ public class LoadTsFile extends Statement {
this.deleteAfterLoad =
LoadTsFileConfigurator.parseOrGetDefaultOnSuccess(loadAttributes);
this.convertOnTypeMismatch =
LoadTsFileConfigurator.parseOrGetDefaultConvertOnTypeMismatch(loadAttributes);
+ this.verify =
LoadTsFileConfigurator.parseOrGetDefaultVerify(loadAttributes);
this.model =
LoadTsFileConfigurator.parseOrGetDefaultModel(
loadAttributes, LoadTsFileConfigurator.MODEL_TABLE_VALUE);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index 8cb79658b66..65665f5e3d4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -73,6 +73,7 @@ public class ActiveLoadTsFileLoader {
private final AtomicReference<WrappedThreadPoolExecutor> activeLoadExecutor =
new AtomicReference<>();
private final AtomicReference<String> failDir = new AtomicReference<>();
+ private final boolean isVerify =
IOTDB_CONFIG.isLoadActiveListeningVerifyEnable();
public int getCurrentAllowedPendingSize() {
return MAX_PENDING_SIZE - pendingQueue.size();
@@ -198,7 +199,7 @@ public class ActiveLoadTsFileLoader {
final LoadTsFileStatement statement = new
LoadTsFileStatement(filePair.getLeft());
statement.setDeleteAfterLoad(true);
statement.setConvertOnTypeMismatch(true);
- statement.setVerifySchema(true);
+ statement.setVerifySchema(isVerify);
statement.setAutoCreateDatabase(false);
return executeStatement(filePair.getRight() ? new
PipeEnrichedStatement(statement) : statement);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
index 70d7401c560..b1736f0f1af 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
@@ -52,6 +52,9 @@ public class LoadTsFileConfigurator {
case CONVERT_ON_TYPE_MISMATCH_KEY:
validateConvertOnTypeMismatchParam(value);
break;
+ case VERIFY_KEY:
+ validateVerifyParam(value);
+ break;
default:
throw new SemanticException("Invalid parameter '" + key + "' for LOAD
TSFILE command.");
}
@@ -133,6 +136,23 @@ public class LoadTsFileConfigurator {
CONVERT_ON_TYPE_MISMATCH_KEY,
String.valueOf(CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE)));
}
+ public static final String VERIFY_KEY = "verify";
+ private static final boolean VERIFY_DEFAULT_VALUE = true;
+
+ public static void validateVerifyParam(final String verify) {
+ if (!"true".equalsIgnoreCase(verify) && !"false".equalsIgnoreCase(verify))
{
+ throw new SemanticException(
+ String.format(
+ "Given %s value '%s' is not supported, please input a valid
boolean value.",
+ VERIFY_KEY, verify));
+ }
+ }
+
+ public static boolean parseOrGetDefaultVerify(final Map<String, String>
loadAttributes) {
+ return Boolean.parseBoolean(
+ loadAttributes.getOrDefault(VERIFY_KEY,
String.valueOf(VERIFY_DEFAULT_VALUE)));
+ }
+
public static final String MODEL_KEY = "model";
public static final String MODEL_TREE_VALUE = "tree";
public static final String MODEL_TABLE_VALUE = "table";
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 526853049d3..c66e6c9c0c6 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -242,6 +242,11 @@ public class PipeConnectorConstant {
CONNECTOR_LOAD_TSFILE_STRATEGY_ASYNC_VALUE,
CONNECTOR_LOAD_TSFILE_STRATEGY_SYNC_VALUE)));
+ public static final String CONNECTOR_LOAD_TSFILE_VALIDATION_KEY =
+ "connector.load-tsfile-validation";
+ public static final String SINK_LOAD_TSFILE_VALIDATION_KEY =
"sink.load-tsfile-validation";
+ public static final boolean CONNECTOR_LOAD_TSFILE_VALIDATION_DEFAULT_VALUE =
true;
+
private PipeConnectorConstant() {
throw new IllegalStateException("Utility class");
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
index 37449f9b17b..2c4921712e4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
@@ -42,6 +42,8 @@ public abstract class IoTDBClientManager {
protected final String username;
protected final String password;
+ protected final boolean validateTsFile;
+
protected final boolean shouldReceiverConvertOnTypeMismatch;
protected final String loadTsFileStrategy;
@@ -62,7 +64,8 @@ public abstract class IoTDBClientManager {
final String username,
final String password,
final boolean shouldReceiverConvertOnTypeMismatch,
- final String loadTsFileStrategy) {
+ final String loadTsFileStrategy,
+ final boolean validateTsFile) {
this.endPointList = endPointList;
this.useLeaderCache = useLeaderCache;
@@ -71,6 +74,7 @@ public abstract class IoTDBClientManager {
this.password = password;
this.shouldReceiverConvertOnTypeMismatch =
shouldReceiverConvertOnTypeMismatch;
this.loadTsFileStrategy = loadTsFileStrategy;
+ this.validateTsFile = validateTsFile;
}
public boolean supportModsIfIsDataNodeReceiver() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index 78d5b7d5f0a..2c9830212dc 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -73,14 +73,16 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
String username,
String password,
boolean shouldReceiverConvertOnTypeMismatch,
- String loadTsFileStrategy) {
+ String loadTsFileStrategy,
+ boolean validateTsFile) {
super(
endPoints,
useLeaderCache,
username,
password,
shouldReceiverConvertOnTypeMismatch,
- loadTsFileStrategy);
+ loadTsFileStrategy,
+ validateTsFile);
this.useSSL = useSSL;
this.trustStorePath = trustStorePath;
@@ -213,6 +215,9 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY,
loadTsFileStrategy);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME,
username);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD,
password);
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
+ Boolean.toString(validateTsFile));
// Try to handshake by PipeTransferHandshakeV2Req.
TPipeTransferResp resp =
client.pipeTransfer(buildHandshakeV2Req(params));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
index 46bd38ba45a..4291427a9bc 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
@@ -27,6 +27,7 @@ public class PipeTransferHandshakeConstant {
public static final String HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY =
"loadTsFileStrategy";
public static final String HANDSHAKE_KEY_USERNAME = "username";
public static final String HANDSHAKE_KEY_PASSWORD = "password";
+ public static final String HANDSHAKE_KEY_VALIDATE_TSFILE = "validateTsFile";
private PipeTransferHandshakeConstant() {
// Utility class
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index 3d173a49a8a..e61f22ee9f5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -93,6 +93,8 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_STRATEGY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_STRATEGY_SET;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_STRATEGY_SYNC_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_VALIDATION_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_VALIDATION_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_COMPRESSOR_KEY;
@@ -115,6 +117,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USER_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_STRATEGY_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_VALIDATION_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_RATE_LIMIT_KEY;
@TreeModel
@@ -136,6 +139,7 @@ public abstract class IoTDBConnector implements
PipeConnector {
protected String loadBalanceStrategy;
protected String loadTsFileStrategy;
+ protected boolean loadTsFileValidation;
private boolean isRpcCompressionEnabled;
private final List<PipeCompressor> compressors = new ArrayList<>();
@@ -236,6 +240,10 @@ public abstract class IoTDBConnector implements
PipeConnector {
"Load tsfile strategy should be one of %s, but got %s.",
CONNECTOR_LOAD_TSFILE_STRATEGY_SET, loadTsFileStrategy),
loadTsFileStrategy);
+ loadTsFileValidation =
+ parameters.getBooleanOrDefault(
+ Arrays.asList(CONNECTOR_LOAD_TSFILE_VALIDATION_KEY,
SINK_LOAD_TSFILE_VALIDATION_KEY),
+ CONNECTOR_LOAD_TSFILE_VALIDATION_DEFAULT_VALUE);
final int zstdCompressionLevel =
parameters.getIntOrDefault(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index 329bbd031b8..6909a3b1384 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -130,7 +130,8 @@ public abstract class IoTDBSslSyncConnector extends
IoTDBConnector {
username,
password,
shouldReceiverConvertOnTypeMismatch,
- loadTsFileStrategy);
+ loadTsFileStrategy,
+ loadTsFileValidation);
}
protected abstract IoTDBSyncClientManager constructClient(
@@ -145,7 +146,8 @@ public abstract class IoTDBSslSyncConnector extends
IoTDBConnector {
final String username,
final String password,
final boolean shouldReceiverConvertOnTypeMismatch,
- final String loadTsFileStrategy);
+ final String loadTsFileStrategy,
+ final boolean validateTsFile);
@Override
public void handshake() throws Exception {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index c2259a3c8b7..0d064ae8ee6 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -84,6 +84,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
// Used to determine current strategy is sync or async
protected final AtomicBoolean isUsingAsyncLoadTsFileStrategy = new
AtomicBoolean(false);
+ protected final AtomicBoolean validateTsFile = new AtomicBoolean(true);
@Override
public IoTDBConnectorRequestVersion getVersion() {
@@ -282,6 +283,12 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
loadTsFileStrategyString));
}
+ validateTsFile.set(
+ Boolean.parseBoolean(
+ req.getParams()
+ .getOrDefault(
+
PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE, "true")));
+
// Handle the handshake request as a v1 request.
// Here we construct a fake "dataNode" request to valid from v1 validation
logic, though
// it may not require the actual type of the v1 request.