This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 5581213a89e [To dev/1.3] Pipe / Load: Enable validation skip for load 
tsFile (#14774) (#14776)
5581213a89e is described below

commit 5581213a89eb9eff234a1877b2facde7d64d0233
Author: Caideyipi <[email protected]>
AuthorDate: Wed Feb 5 16:49:03 2025 +0800

    [To dev/1.3] Pipe / Load: Enable validation skip for load tsFile (#14774) 
(#14776)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../pipe/it/autocreate/IoTDBPipeDataSinkIT.java    | 72 ++++++++++++++++++++++
 .../org/apache/iotdb/tool/tsfile/ImportTsFile.java | 12 +++-
 .../iotdb/tool/tsfile/ImportTsFileLocally.java     |  8 ++-
 .../iotdb/tool/tsfile/ImportTsFileRemotely.java    |  8 +++
 .../client/IoTDBConfigNodeSyncClientManager.java   |  6 +-
 .../protocol/IoTDBConfigRegionAirGapConnector.java |  3 +
 .../protocol/IoTDBConfigRegionConnector.java       |  6 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 +++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  6 ++
 .../client/IoTDBDataNodeAsyncClientManager.java    |  9 ++-
 .../client/IoTDBDataNodeSyncClientManager.java     |  6 +-
 .../airgap/IoTDBDataNodeAirGapConnector.java       |  3 +
 .../async/IoTDBDataRegionAsyncConnector.java       |  3 +-
 .../thrift/sync/IoTDBDataNodeSyncConnector.java    |  6 +-
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |  2 +-
 .../db/queryengine/plan/parser/ASTVisitor.java     |  5 +-
 .../load/active/ActiveLoadTsFileLoader.java        |  3 +-
 .../load/config/LoadTsFileConfigurator.java        | 20 ++++++
 .../config/constant/PipeConnectorConstant.java     |  5 ++
 .../pipe/connector/client/IoTDBClientManager.java  |  6 +-
 .../connector/client/IoTDBSyncClientManager.java   |  9 ++-
 .../common/PipeTransferHandshakeConstant.java      |  1 +
 .../pipe/connector/protocol/IoTDBConnector.java    |  8 +++
 .../connector/protocol/IoTDBSslSyncConnector.java  |  6 +-
 .../commons/pipe/receiver/IoTDBFileReceiver.java   |  7 +++
 25 files changed, 206 insertions(+), 25 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
index 26c76bf0042..7fbb0f3a717 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
@@ -448,4 +448,76 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeDualAutoIT {
               new HashSet<>(Arrays.asList("0,1.0,", "1,1.0,", "2,1.0,", 
"3,1.0,", "4,1.0,"))));
     }
   }
+
+  @Test
+  public void testSyncLoadTsFileWithoutVerify() throws Exception {
+    testLoadTsFileWithoutVerify("sync");
+  }
+
+  @Test
+  public void testAsyncLoadTsFileWithoutVerify() throws Exception {
+    testLoadTsFileWithoutVerify("async");
+  }
+
+  @Test
+  private void testLoadTsFileWithoutVerify(final String loadTsFileStrategy) 
throws Exception {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    final String receiverIp = receiverDataNode.getIp();
+    final int receiverPort = receiverDataNode.getPort();
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+
+      // Do not fail if the failure has nothing to do with pipe
+      // Because the failures will randomly generate due to resource limitation
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList("insert into root.vehicle.d0(time, s1) values (1, 1)", 
"flush"))) {
+        return;
+      }
+
+      final Map<String, String> extractorAttributes = new HashMap<>();
+      final Map<String, String> processorAttributes = new HashMap<>();
+      final Map<String, String> connectorAttributes = new HashMap<>();
+
+      extractorAttributes.put("extractor.realtime.mode", "forced-log");
+
+      connectorAttributes.put("sink", "iotdb-thrift-sink");
+      connectorAttributes.put("sink.batch.enable", "false");
+      connectorAttributes.put("sink.ip", receiverIp);
+      connectorAttributes.put("sink.port", Integer.toString(receiverPort));
+      connectorAttributes.put("sink.load-tsfile-strategy", loadTsFileStrategy);
+      connectorAttributes.put("sink.tsfile.validation", "false");
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          client
+              .createPipe(
+                  new TCreatePipeReq("testPipe", connectorAttributes)
+                      .setExtractorAttributes(extractorAttributes)
+                      .setProcessorAttributes(processorAttributes))
+              .getCode());
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("testPipe").getCode());
+
+      // Do not fail if the failure has nothing to do with pipe
+      // Because the failures will randomly generate due to resource limitation
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              "create timeSeries root.vehicle.d0.s1 int32",
+              "insert into root.vehicle.d0(time, s1) values (2, 1)",
+              "flush"))) {
+        return;
+      }
+
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv,
+          "select * from root.**",
+          "Time,root.vehicle.d0.s1,",
+          Collections.unmodifiableSet(new HashSet<>(Arrays.asList("1,1.0,", 
"2,1.0,"))));
+    }
+  }
 }
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java
index 50cd1a03062..cdfbd5b4437 100644
--- 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java
@@ -60,6 +60,9 @@ public class ImportTsFile extends AbstractTsFileTool {
   private static final String THREAD_NUM_ARGS = "tn";
   private static final String THREAD_NUM_NAME = "thread_num";
 
+  protected static final String VERIFY_ARGS = "v";
+  protected static final String VERIFY_NAME = "verify";
+
   private static final IoTPrinter IOT_PRINTER = new IoTPrinter(System.out);
 
   private static final String TS_FILE_CLI_PREFIX = "ImportTsFile";
@@ -80,7 +83,7 @@ public class ImportTsFile extends AbstractTsFileTool {
   private static int threadNum = 8;
 
   private static boolean isRemoteLoad = true;
-
+  protected static boolean verify = true;
   private static SessionPool sessionPool;
 
   private static void createOptions() {
@@ -299,6 +302,11 @@ public class ImportTsFile extends AbstractTsFileTool {
     if (commandLine.getOptionValue(TIMESTAMP_PRECISION_ARGS) != null) {
       timestampPrecision = 
commandLine.getOptionValue(TIMESTAMP_PRECISION_ARGS);
     }
+
+    verify =
+        null != commandLine.getOptionValue(VERIFY_ARGS)
+            ? Boolean.parseBoolean(commandLine.getOptionValue(VERIFY_ARGS))
+            : verify;
   }
 
   public static boolean isFileStoreEquals(String pathString, File dir) {
@@ -387,12 +395,14 @@ public class ImportTsFile extends AbstractTsFileTool {
     }
 
     ImportTsFileLocally.setSessionPool(sessionPool);
+    ImportTsFileLocally.setVerify(verify);
 
     // ImportTsFileRemotely
     ImportTsFileRemotely.setHost(host);
     ImportTsFileRemotely.setPort(port);
     ImportTsFileRemotely.setUsername(username);
     ImportTsFileRemotely.setPassword(password);
+    ImportTsFileRemotely.setValidateTsFile(verify);
 
     // ImportTsFileBase
     ImportTsFileBase.setSuccessAndFailDirAndOperation(
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileLocally.java
 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileLocally.java
index 8f65d26fd32..f94050e98eb 100644
--- 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileLocally.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileLocally.java
@@ -27,13 +27,15 @@ public class ImportTsFileLocally extends ImportTsFileBase 
implements Runnable {
   private static final IoTPrinter ioTPrinter = new IoTPrinter(System.out);
 
   private static SessionPool sessionPool;
+  private static boolean verify;
 
   @Override
   public void loadTsFile() {
     String filePath;
     try {
       while ((filePath = ImportTsFileScanTool.pollFromQueue()) != null) {
-        final String sql = "load '" + filePath + "' onSuccess=none ";
+        final String sql =
+            "load '" + filePath + "' onSuccess=none " + (verify ? "" : 
"verify=false");
         try {
           sessionPool.executeNonQueryStatement(sql);
 
@@ -50,4 +52,8 @@ public class ImportTsFileLocally extends ImportTsFileBase 
implements Runnable {
   public static void setSessionPool(SessionPool sessionPool) {
     ImportTsFileLocally.sessionPool = sessionPool;
   }
+
+  public static void setVerify(boolean verify) {
+    ImportTsFileLocally.verify = verify;
+  }
 }
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
index 5459b0448ef..9f4e09fa364 100644
--- 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
@@ -74,6 +74,7 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
 
   private static String username = SessionConfig.DEFAULT_USER;
   private static String password = SessionConfig.DEFAULT_PASSWORD;
+  private static boolean validateTsFile;
 
   public ImportTsFileRemotely(String timePrecision) {
     setTimePrecision(timePrecision);
@@ -189,6 +190,9 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
     
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, 
LOAD_STRATEGY);
     params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username);
     params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password);
+    params.put(
+        PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
+        Boolean.toString(validateTsFile));
     return params;
   }
 
@@ -346,4 +350,8 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
   public static void setPassword(final String password) {
     ImportTsFileRemotely.password = password;
   }
+
+  public static void setValidateTsFile(final boolean validateTsFile) {
+    ImportTsFileRemotely.validateTsFile = validateTsFile;
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
index 00a06c926c8..ab678f98864 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
@@ -42,7 +42,8 @@ public class IoTDBConfigNodeSyncClientManager extends 
IoTDBSyncClientManager {
       String trustStorePwd,
       String loadBalanceStrategy,
       boolean shouldReceiverConvertOnTypeMismatch,
-      String loadTsFileStrategy) {
+      String loadTsFileStrategy,
+      boolean validateTsFile) {
     super(
         endPoints,
         username,
@@ -53,7 +54,8 @@ public class IoTDBConfigNodeSyncClientManager extends 
IoTDBSyncClientManager {
         false,
         loadBalanceStrategy,
         shouldReceiverConvertOnTypeMismatch,
-        loadTsFileStrategy);
+        loadTsFileStrategy,
+        validateTsFile);
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
index 7dd90f18dee..de8b8fe5d98 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
@@ -75,6 +75,9 @@ public class IoTDBConfigRegionAirGapConnector extends 
IoTDBAirGapConnector {
         PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, 
loadTsFileStrategy);
     params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username);
     params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password);
+    params.put(
+        PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
+        Boolean.toString(loadTsFileValidation));
 
     return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index 809daf044c0..c2a327fb853 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -67,7 +67,8 @@ public class IoTDBConfigRegionConnector extends 
IoTDBSslSyncConnector {
       final boolean useLeaderCache,
       final String loadBalanceStrategy,
       final boolean shouldReceiverConvertOnTypeMismatch,
-      final String loadTsFileStrategy) {
+      final String loadTsFileStrategy,
+      final boolean validateTsFile) {
     return new IoTDBConfigNodeSyncClientManager(
         nodeUrls,
         username,
@@ -77,7 +78,8 @@ public class IoTDBConfigRegionConnector extends 
IoTDBSslSyncConnector {
         trustStorePwd,
         loadBalanceStrategy,
         shouldReceiverConvertOnTypeMismatch,
-        loadTsFileStrategy);
+        loadTsFileStrategy,
+        validateTsFile);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 5113e8c045c..b2e99733072 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1172,11 +1172,12 @@ public class IoTDBConfig {
           + IoTDBConstant.LOAD_TSFILE_FOLDER_NAME
           + File.separator
           + IoTDBConstant.LOAD_TSFILE_ACTIVE_LISTENING_FAILED_FOLDER_NAME;
-
   private long loadActiveListeningCheckIntervalSeconds = 5L;
 
   private int loadActiveListeningMaxThreadNum = 
Runtime.getRuntime().availableProcessors();
 
+  private boolean loadActiveListeningVerifyEnable = true;
+
   /** Pipe related */
   /** initialized as empty, updated based on the latest `systemDir` during 
querying */
   private String[] pipeReceiverFileDirs = new String[0];
@@ -4024,6 +4025,14 @@ public class IoTDBConfig {
     this.loadActiveListeningMaxThreadNum = loadActiveListeningMaxThreadNum;
   }
 
+  public boolean isLoadActiveListeningVerifyEnable() {
+    return loadActiveListeningVerifyEnable;
+  }
+
+  public void setLoadActiveListeningVerifyEnable(boolean 
loadActiveListeningVerifyEnable) {
+    this.loadActiveListeningVerifyEnable = loadActiveListeningVerifyEnable;
+  }
+
   public long getLoadActiveListeningCheckIntervalSeconds() {
     return loadActiveListeningCheckIntervalSeconds;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 3fcb72d3686..c725b336e90 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2422,6 +2422,12 @@ public class IoTDBDescriptor {
     if (conf.getLoadActiveListeningMaxThreadNum() <= 0) {
       
conf.setLoadActiveListeningMaxThreadNum(Runtime.getRuntime().availableProcessors());
     }
+
+    conf.setLoadActiveListeningVerifyEnable(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "load_active_listening_verify_enable",
+                Boolean.toString(conf.isLoadActiveListeningVerifyEnable()))));
   }
 
   private void loadLoadTsFileHotModifiedProp(TrimProperties properties) throws 
IOException {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 74e56f728dc..11ef6aec7f4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -84,14 +84,16 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
       final String username,
       final String password,
       final boolean shouldReceiverConvertOnTypeMismatch,
-      final String loadTsFileStrategy) {
+      final String loadTsFileStrategy,
+      final boolean validateTsFile) {
     super(
         endPoints,
         username,
         password,
         shouldReceiverConvertOnTypeMismatch,
         loadTsFileStrategy,
-        useLeaderCache);
+        useLeaderCache,
+        validateTsFile);
 
     endPointSet = new HashSet<>(endPoints);
 
@@ -248,6 +250,9 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
           PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, 
loadTsFileStrategy);
       params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, 
username);
       params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, 
password);
+      params.put(
+          PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
+          Boolean.toString(validateTsFile));
 
       
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
       
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params),
 callback);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
index ae3f07b068c..62b8b563075 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
@@ -52,7 +52,8 @@ public class IoTDBDataNodeSyncClientManager extends 
IoTDBSyncClientManager
       final boolean useLeaderCache,
       final String loadBalanceStrategy,
       final boolean shouldReceiverConvertOnTypeMismatch,
-      final String loadTsFileStrategy) {
+      final String loadTsFileStrategy,
+      final boolean validateTsFile) {
     super(
         endPoints,
         username,
@@ -63,7 +64,8 @@ public class IoTDBDataNodeSyncClientManager extends 
IoTDBSyncClientManager
         useLeaderCache,
         loadBalanceStrategy,
         shouldReceiverConvertOnTypeMismatch,
-        loadTsFileStrategy);
+        loadTsFileStrategy,
+        validateTsFile);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
index 788244f738c..537d06bf767 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
@@ -107,6 +107,9 @@ public abstract class IoTDBDataNodeAirGapConnector extends 
IoTDBAirGapConnector
         PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, 
loadTsFileStrategy);
     params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username);
     params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password);
+    params.put(
+        PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
+        Boolean.toString(loadTsFileValidation));
 
     return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 7641eb30f8e..56c3f08d647 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -133,7 +133,8 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
             username,
             password,
             shouldReceiverConvertOnTypeMismatch,
-            loadTsFileStrategy);
+            loadTsFileStrategy,
+            loadTsFileValidation);
 
     if (isTabletBatchModeEnabled) {
       tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
index 0ae8ae8743a..5e8624397e9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
@@ -91,7 +91,8 @@ public abstract class IoTDBDataNodeSyncConnector extends 
IoTDBSslSyncConnector {
       final boolean useLeaderCache,
       final String loadBalanceStrategy,
       final boolean shouldReceiverConvertOnTypeMismatch,
-      final String loadTsFileStrategy) {
+      final String loadTsFileStrategy,
+      final boolean validateTsFile) {
     clientManager =
         new IoTDBDataNodeSyncClientManager(
             nodeUrls,
@@ -103,7 +104,8 @@ public abstract class IoTDBDataNodeSyncConnector extends 
IoTDBSslSyncConnector {
             useLeaderCache,
             loadBalanceStrategy,
             shouldReceiverConvertOnTypeMismatch,
-            loadTsFileStrategy);
+            loadTsFileStrategy,
+            validateTsFile);
     return clientManager;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index afb15822f8f..6b6ea69de34 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -485,7 +485,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
 
     statement.setDeleteAfterLoad(true);
     statement.setConvertOnTypeMismatch(true);
-    statement.setVerifySchema(true);
+    statement.setVerifySchema(validateTsFile.get());
     statement.setAutoCreateDatabase(false);
 
     return executeStatementAndClassifyExceptions(statement);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index f3643e4436b..c86af7cfb38 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -2069,10 +2069,7 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
     } else if (ctx.SGLEVEL() != null) {
       
loadTsFileStatement.setDatabaseLevel(Integer.parseInt(ctx.INTEGER_LITERAL().getText()));
     } else if (ctx.VERIFY() != null) {
-      if (!Boolean.parseBoolean(ctx.boolean_literal().getText())) {
-        throw new SemanticException("Load option VERIFY can only be set to 
true.");
-      }
-      loadTsFileStatement.setVerifySchema(true);
+      
loadTsFileStatement.setVerifySchema(Boolean.parseBoolean(ctx.boolean_literal().getText()));
     } else {
       throw new SemanticException(
           String.format(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index 8cb79658b66..65665f5e3d4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -73,6 +73,7 @@ public class ActiveLoadTsFileLoader {
   private final AtomicReference<WrappedThreadPoolExecutor> activeLoadExecutor =
       new AtomicReference<>();
   private final AtomicReference<String> failDir = new AtomicReference<>();
+  private final boolean isVerify = 
IOTDB_CONFIG.isLoadActiveListeningVerifyEnable();
 
   public int getCurrentAllowedPendingSize() {
     return MAX_PENDING_SIZE - pendingQueue.size();
@@ -198,7 +199,7 @@ public class ActiveLoadTsFileLoader {
     final LoadTsFileStatement statement = new 
LoadTsFileStatement(filePair.getLeft());
     statement.setDeleteAfterLoad(true);
     statement.setConvertOnTypeMismatch(true);
-    statement.setVerifySchema(true);
+    statement.setVerifySchema(isVerify);
     statement.setAutoCreateDatabase(false);
     return executeStatement(filePair.getRight() ? new 
PipeEnrichedStatement(statement) : statement);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
index d5672028369..00dca6b61b3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
@@ -43,6 +43,9 @@ public class LoadTsFileConfigurator {
       case CONVERT_ON_TYPE_MISMATCH_KEY:
         validateConvertOnTypeMismatchParam(value);
         break;
+      case VERIFY_KEY:
+        validateVerifyParam(value);
+        break;
       default:
         throw new SemanticException("Invalid parameter '" + key + "' for LOAD 
TSFILE command.");
     }
@@ -117,6 +120,23 @@ public class LoadTsFileConfigurator {
             CONVERT_ON_TYPE_MISMATCH_KEY, 
String.valueOf(CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE)));
   }
 
+  public static final String VERIFY_KEY = "verify";
+  private static final boolean VERIFY_DEFAULT_VALUE = true;
+
+  public static void validateVerifyParam(final String verify) {
+    if (!"true".equalsIgnoreCase(verify) && !"false".equalsIgnoreCase(verify)) 
{
+      throw new SemanticException(
+          String.format(
+              "Given %s value '%s' is not supported, please input a valid 
boolean value.",
+              VERIFY_KEY, verify));
+    }
+  }
+
+  public static boolean parseOrGetDefaultVerify(final Map<String, String> 
loadAttributes) {
+    return Boolean.parseBoolean(
+        loadAttributes.getOrDefault(VERIFY_KEY, 
String.valueOf(VERIFY_DEFAULT_VALUE)));
+  }
+
   private LoadTsFileConfigurator() {
     throw new IllegalStateException("Utility class");
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index afee3e9a0a2..1f7b7870303 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -238,6 +238,11 @@ public class PipeConnectorConstant {
                   CONNECTOR_LOAD_TSFILE_STRATEGY_ASYNC_VALUE,
                   CONNECTOR_LOAD_TSFILE_STRATEGY_SYNC_VALUE)));
 
+  public static final String CONNECTOR_LOAD_TSFILE_VALIDATION_KEY =
+      "connector.load-tsfile-validation";
+  public static final String SINK_LOAD_TSFILE_VALIDATION_KEY = 
"sink.load-tsfile-validation";
+  public static final boolean CONNECTOR_LOAD_TSFILE_VALIDATION_DEFAULT_VALUE = 
true;
+
   private PipeConnectorConstant() {
     throw new IllegalStateException("Utility class");
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
index ed3334b2459..18ef38ad47a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
@@ -40,6 +40,8 @@ public abstract class IoTDBClientManager {
   protected final String username;
   protected final String password;
 
+  protected final boolean validateTsFile;
+
   protected final boolean shouldReceiverConvertOnTypeMismatch;
   protected final String loadTsFileStrategy;
 
@@ -60,13 +62,15 @@ public abstract class IoTDBClientManager {
       String password,
       boolean shouldReceiverConvertOnTypeMismatch,
       final String loadTsFileStrategy,
-      boolean useLeaderCache) {
+      boolean useLeaderCache,
+      final boolean validateTsFile) {
     this.endPointList = endPointList;
     this.username = username;
     this.password = password;
     this.shouldReceiverConvertOnTypeMismatch = 
shouldReceiverConvertOnTypeMismatch;
     this.loadTsFileStrategy = loadTsFileStrategy;
     this.useLeaderCache = useLeaderCache;
+    this.validateTsFile = validateTsFile;
   }
 
   public boolean supportModsIfIsDataNodeReceiver() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index 0e4884d9246..13ee064fafb 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -71,14 +71,16 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
       boolean useLeaderCache,
       String loadBalanceStrategy,
       boolean shouldReceiverConvertOnTypeMismatch,
-      String loadTsFileStrategy) {
+      String loadTsFileStrategy,
+      boolean validateTsFile) {
     super(
         endPoints,
         username,
         password,
         shouldReceiverConvertOnTypeMismatch,
         loadTsFileStrategy,
-        useLeaderCache);
+        useLeaderCache,
+        validateTsFile);
 
     this.useSSL = useSSL;
     this.trustStorePath = trustStorePath;
@@ -211,6 +213,9 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
           PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, 
loadTsFileStrategy);
       params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, 
username);
       params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, 
password);
+      params.put(
+          PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
+          Boolean.toString(validateTsFile));
 
       // Try to handshake by PipeTransferHandshakeV2Req.
       TPipeTransferResp resp = 
client.pipeTransfer(buildHandshakeV2Req(params));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
index 46bd38ba45a..4291427a9bc 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
@@ -27,6 +27,7 @@ public class PipeTransferHandshakeConstant {
   public static final String HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY = 
"loadTsFileStrategy";
   public static final String HANDSHAKE_KEY_USERNAME = "username";
   public static final String HANDSHAKE_KEY_PASSWORD = "password";
+  public static final String HANDSHAKE_KEY_VALIDATE_TSFILE = "validateTsFile";
 
   private PipeTransferHandshakeConstant() {
     // Utility class
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index b64b3df2066..22228f10140 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -91,6 +91,8 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_STRATEGY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_STRATEGY_SET;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_STRATEGY_SYNC_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_VALIDATION_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_VALIDATION_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_COMPRESSOR_KEY;
@@ -113,6 +115,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USER_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_STRATEGY_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_VALIDATION_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_RATE_LIMIT_KEY;
 
 public abstract class IoTDBConnector implements PipeConnector {
@@ -132,6 +135,7 @@ public abstract class IoTDBConnector implements 
PipeConnector {
   protected String loadBalanceStrategy;
 
   protected String loadTsFileStrategy;
+  protected boolean loadTsFileValidation;
 
   private boolean isRpcCompressionEnabled;
   private final List<PipeCompressor> compressors = new ArrayList<>();
@@ -232,6 +236,10 @@ public abstract class IoTDBConnector implements 
PipeConnector {
             "Load tsfile strategy should be one of %s, but got %s.",
             CONNECTOR_LOAD_TSFILE_STRATEGY_SET, loadTsFileStrategy),
         loadTsFileStrategy);
+    loadTsFileValidation =
+        parameters.getBooleanOrDefault(
+            Arrays.asList(CONNECTOR_LOAD_TSFILE_VALIDATION_KEY, 
SINK_LOAD_TSFILE_VALIDATION_KEY),
+            CONNECTOR_LOAD_TSFILE_VALIDATION_DEFAULT_VALUE);
 
     final int zstdCompressionLevel =
         parameters.getIntOrDefault(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index ffdc9f55b18..e3a5dd21876 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -126,7 +126,8 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
             useLeaderCache,
             loadBalanceStrategy,
             shouldReceiverConvertOnTypeMismatch,
-            loadTsFileStrategy);
+            loadTsFileStrategy,
+            loadTsFileValidation);
   }
 
   protected abstract IoTDBSyncClientManager constructClient(
@@ -139,7 +140,8 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
       final boolean useLeaderCache,
       final String loadBalanceStrategy,
       final boolean shouldReceiverConvertOnTypeMismatch,
-      final String loadTsFileStrategy);
+      final String loadTsFileStrategy,
+      final boolean validateTsFile);
 
   @Override
   public void handshake() throws Exception {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index b3cbca89f12..ed4b85cfec6 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -84,6 +84,7 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
 
   // Used to determine current strategy is sync or async
   protected final AtomicBoolean isUsingAsyncLoadTsFileStrategy = new 
AtomicBoolean(false);
+  protected final AtomicBoolean validateTsFile = new AtomicBoolean(true);
 
   @Override
   public IoTDBConnectorRequestVersion getVersion() {
@@ -282,6 +283,12 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
               loadTsFileStrategyString));
     }
 
+    validateTsFile.set(
+        Boolean.parseBoolean(
+            req.getParams()
+                .getOrDefault(
+                    
PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE, "true")));
+
     // Handle the handshake request as a v1 request.
     // Here we construct a fake "dataNode" request to valid from v1 validation 
logic, though
     // it may not require the actual type of the v1 request.


Reply via email to