This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new f66794ba229 [To dev/1.3] feat: encode load attributes in active load 
directories (#16722) (#16758)
f66794ba229 is described below

commit f66794ba229185b3afe826d7f4eaec6132112f50
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Nov 14 18:36:39 2025 +0800

    [To dev/1.3] feat: encode load attributes in active load directories 
(#16722) (#16758)
---
 .../org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java  |  41 ++++
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |  10 +-
 .../plan/analyze/load/LoadTsFileAnalyzer.java      |  11 +-
 .../plan/statement/crud/LoadTsFileStatement.java   |  10 +-
 .../load/active/ActiveLoadDirScanner.java          |  17 +-
 .../active/ActiveLoadFailedMessageHandler.java     |  45 ++--
 .../load/active/ActiveLoadPathHelper.java          | 256 +++++++++++++++++++++
 .../load/active/ActiveLoadPendingQueue.java        |  45 +++-
 .../load/active/ActiveLoadTsFileLoader.java        | 156 ++++++++-----
 .../storageengine/load/active/ActiveLoadUtil.java  |  37 +--
 .../load/config/LoadTsFileConfigurator.java        |  18 ++
 11 files changed, 527 insertions(+), 119 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
index a19612245c5..d8a51b53501 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
@@ -998,6 +998,47 @@ public class IoTDBLoadTsFileIT {
     }
   }
 
+  @Test
+  public void testAsyncLoadLocally() throws Exception {
+    registerSchema();
+
+    final long writtenPoint1;
+    // device 0, device 1, sg 0
+    try (final TsFileGenerator generator =
+        new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) {
+      generator.registerTimeseries(
+          SchemaConfig.DEVICE_0, 
Collections.singletonList(SchemaConfig.MEASUREMENT_00));
+      generator.generateData(SchemaConfig.DEVICE_0, 1, PARTITION_INTERVAL / 
10_000, false);
+      writtenPoint1 = generator.getTotalNumber();
+    }
+
+    try (final Connection connection = EnvFactory.getEnv().getConnection();
+        final Statement statement = connection.createStatement()) {
+
+      statement.execute(
+          String.format(
+              "load \"%s\" with ('async'='true','database-level'='2')", 
tmpDir.getAbsolutePath()));
+
+      for (int i = 0; i < 20; i++) {
+        try (final ResultSet resultSet =
+            statement.executeQuery("select count(*) from root.** group by 
level=1,2")) {
+          if (resultSet.next()) {
+            final long sg1Count = 
resultSet.getLong("count(root.sg.test_0.*.*)");
+            Assert.assertEquals(writtenPoint1, sg1Count);
+          } else {
+            Assert.fail("This ResultSet is empty.");
+          }
+        } catch (final Throwable e) {
+          if (i < 19) {
+            Thread.sleep(1000);
+          } else {
+            throw e;
+          }
+        }
+      }
+    }
+  }
+
   @Test
   @Ignore("Load with conversion is currently banned")
   public void testLoadWithConvertOnTypeMismatch() throws Exception {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 9c04baba6d4..a8654103349 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -83,6 +83,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
+import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
 import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil;
 import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
 import 
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
@@ -475,7 +476,14 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   }
 
   private TSStatus loadTsFileAsync(final List<String> absolutePaths) throws 
IOException {
-    if (!ActiveLoadUtil.loadFilesToActiveDir(null, absolutePaths, true)) {
+    final Map<String, String> loadAttributes =
+        ActiveLoadPathHelper.buildAttributes(
+            null,
+            shouldConvertDataTypeOnTypeMismatch,
+            validateTsFile.get(),
+            null,
+            shouldMarkAsPipeRequest.get());
+    if (!ActiveLoadUtil.loadFilesToActiveDir(loadAttributes, absolutePaths, 
true)) {
       throw new PipeException("Load active listening pipe dir is not set.");
     }
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 765300a65c1..eb36ddeb19c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -65,6 +65,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
 import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
+import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
 import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil;
 import 
org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter;
 import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryBlock;
@@ -240,7 +241,15 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
   private boolean doAsyncLoad(final Analysis analysis) {
     final long startTime = System.nanoTime();
     try {
-      if (ActiveLoadUtil.loadTsFileAsyncToActiveDir(tsFiles, null, 
isDeleteAfterLoad)) {
+      final Map<String, String> activeLoadAttributes =
+          ActiveLoadPathHelper.buildAttributes(
+              databaseLevel,
+              isConvertOnTypeMismatch,
+              isVerifySchema,
+              tabletConversionThresholdBytes,
+              isGeneratedByPipe);
+      if (ActiveLoadUtil.loadTsFileAsyncToActiveDir(
+          tsFiles, activeLoadAttributes, isDeleteAfterLoad)) {
         analysis.setFinishQueryAfterAnalyze(true);
         
analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
         analysis.setStatement(loadTsFileStatement);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index 77e2d14840b..88a00d39599 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -51,8 +51,6 @@ public class LoadTsFileStatement extends Statement {
   private boolean isGeneratedByPipe = false;
   private boolean isAsyncLoad = false;
 
-  private Map<String, String> loadAttributes;
-
   private List<File> tsFiles;
   private List<TsFileResource> resources;
   private List<Long> writePointCountList;
@@ -212,15 +210,14 @@ public class LoadTsFileStatement extends Statement {
   }
 
   public void setLoadAttributes(final Map<String, String> loadAttributes) {
-    this.loadAttributes = loadAttributes;
-    initAttributes();
+    initAttributes(loadAttributes);
   }
 
   public boolean isAsyncLoad() {
     return isAsyncLoad;
   }
 
-  private void initAttributes() {
+  private void initAttributes(final Map<String, String> loadAttributes) {
     this.databaseLevel = 
LoadTsFileConfigurator.parseOrGetDefaultDatabaseLevel(loadAttributes);
     this.deleteAfterLoad = 
LoadTsFileConfigurator.parseOrGetDefaultOnSuccess(loadAttributes);
     this.convertOnTypeMismatch =
@@ -229,6 +226,9 @@ public class LoadTsFileStatement extends Statement {
         
LoadTsFileConfigurator.parseOrGetDefaultTabletConversionThresholdBytes(loadAttributes);
     this.verifySchema = 
LoadTsFileConfigurator.parseOrGetDefaultVerify(loadAttributes);
     this.isAsyncLoad = 
LoadTsFileConfigurator.parseOrGetDefaultAsyncLoad(loadAttributes);
+    if (LoadTsFileConfigurator.parseOrGetDefaultPipeGenerated(loadAttributes)) 
{
+      markIsGeneratedByPipe();
+    }
   }
 
   public boolean reconstructStatementIfMiniFileConverted(final List<Boolean> 
isMiniTsFile) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
index d029fbdecc8..f4c705c17b3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
@@ -39,6 +39,7 @@ import java.nio.file.Path;
 import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.util.Arrays;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -101,8 +102,9 @@ public class ActiveLoadDirScanner extends 
ActiveLoadScheduledExecutorService {
 
       final boolean isGeneratedByPipe =
           listeningDir.equals(IOTDB_CONFIG.getLoadActiveListeningPipeDir());
+      final File listeningDirFile = new File(listeningDir);
       try (final Stream<File> fileStream =
-          FileUtils.streamFiles(new File(listeningDir), true, (String[]) 
null)) {
+          FileUtils.streamFiles(listeningDirFile, true, (String[]) null)) {
         try {
           fileStream
               .filter(file -> 
!activeLoadTsFileLoader.isFilePendingOrLoading(file))
@@ -115,7 +117,18 @@ public class ActiveLoadDirScanner extends 
ActiveLoadScheduledExecutorService {
               .filter(this::isTsFileCompleted)
               .limit(currentAllowedPendingSize)
               .forEach(
-                  file -> activeLoadTsFileLoader.tryTriggerTsFileLoad(file, 
isGeneratedByPipe));
+                  filePath -> {
+                    final File tsFile = new File(filePath);
+                    final Map<String, String> attributes =
+                        ActiveLoadPathHelper.parseAttributes(tsFile, 
listeningDirFile);
+
+                    final File parentFile = tsFile.getParentFile();
+
+                    activeLoadTsFileLoader.tryTriggerTsFileLoad(
+                        tsFile.getAbsolutePath(),
+                        listeningDirFile.getAbsolutePath(),
+                        isGeneratedByPipe);
+                  });
         } catch (UncheckedIOException e) {
           LOGGER.debug("The file has been deleted. Ignore this exception.");
         } catch (final Exception e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
index be66e591d5d..c2d9af94725 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.storageengine.load.active;
 
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 
-import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,72 +40,72 @@ public class ActiveLoadFailedMessageHandler {
               // system is memory constrains
               put(
                   "memory",
-                  filePair ->
+                  entry ->
                       LOGGER.info(
                           "Rejecting auto load tsfile {} (isGeneratedByPipe = 
{}) due to memory constraints, will retry later.",
-                          filePair.getLeft(),
-                          filePair.getRight()));
+                          entry.getFile(),
+                          entry.isGeneratedByPipe()));
               // system is read only
               put(
                   "read only",
-                  filePair ->
+                  entry ->
                       LOGGER.info(
                           "Rejecting auto load tsfile {} (isGeneratedByPipe = 
{}) due to the system is read only, will retry later.",
-                          filePair.getLeft(),
-                          filePair.getRight()));
+                          entry.getFile(),
+                          entry.isGeneratedByPipe()));
               // Timed out to wait for procedure return. The procedure is 
still running.
               put(
                   "procedure return",
-                  filePair ->
+                  entry ->
                       LOGGER.info(
                           "Rejecting auto load tsfile {} (isGeneratedByPipe = 
{}) due to time out to wait for procedure return, will retry later.",
-                          filePair.getLeft(),
-                          filePair.getRight()));
+                          entry.getFile(),
+                          entry.isGeneratedByPipe()));
               // DataNode is not enough, please register more.
               put(
                   "not enough",
-                  filePair ->
+                  entry ->
                       LOGGER.info(
                           "Rejecting auto load tsfile {} (isGeneratedByPipe = 
{}) due to the datanode is not enough, will retry later.",
-                          filePair.getLeft(),
-                          filePair.getRight()));
+                          entry.getFile(),
+                          entry.isGeneratedByPipe()));
               // Fail to connect to any config node. Please check status of 
ConfigNodes or logs of
               // connected DataNode.
               put(
                   "any config node",
-                  filePair ->
+                  entry ->
                       LOGGER.info(
                           "Rejecting auto load tsfile {} (isGeneratedByPipe = 
{}) due to fail to connect to any config node, will retry later.",
-                          filePair.getLeft(),
-                          filePair.getRight()));
+                          entry.getFile(),
+                          entry.isGeneratedByPipe()));
               // Current query is time out, query start time is 1729653161797, 
ddl is
               // -3046040214706, current time is 1729653184210, please check 
your statement or
               // modify timeout parameter
               put(
                   "query is time out",
-                  filePair ->
+                  entry ->
                       LOGGER.info(
                           "Rejecting auto load tsfile {} (isGeneratedByPipe = 
{}) due to current query is time out, will retry later.",
-                          filePair.getLeft(),
-                          filePair.getRight()));
+                          entry.getFile(),
+                          entry.isGeneratedByPipe()));
             }
           });
 
   @FunctionalInterface
   private interface ExceptionMessageHandler {
-    void handle(final Pair<String, Boolean> filePair);
+    void handle(final ActiveLoadPendingQueue.ActiveLoadEntry activeLoadEntry);
   }
 
   public static boolean isExceptionMessageShouldRetry(
-      final Pair<String, Boolean> filePair, final String message) {
+      final ActiveLoadPendingQueue.ActiveLoadEntry entry, final String 
message) {
     if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
-      EXCEPTION_MESSAGE_HANDLER_MAP.get("read only").handle(filePair);
+      EXCEPTION_MESSAGE_HANDLER_MAP.get("read only").handle(entry);
       return true;
     }
 
     for (String key : EXCEPTION_MESSAGE_HANDLER_MAP.keySet()) {
       if (message != null && message.contains(key)) {
-        EXCEPTION_MESSAGE_HANDLER_MAP.get(key).handle(filePair);
+        EXCEPTION_MESSAGE_HANDLER_MAP.get(key).handle(entry);
         return true;
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPathHelper.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPathHelper.java
new file mode 100644
index 00000000000..965f2941dc6
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPathHelper.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.active;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator;
+
+import java.io.File;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Utility methods for encoding and decoding load attributes into directory 
structures. */
+public final class ActiveLoadPathHelper {
+
+  private static final String SEGMENT_SEPARATOR = "-";
+
+  private static final List<String> KEY_ORDER =
+      Collections.unmodifiableList(
+          Arrays.asList(
+              LoadTsFileConfigurator.DATABASE_LEVEL_KEY,
+              LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY,
+              LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY,
+              LoadTsFileConfigurator.VERIFY_KEY,
+              LoadTsFileConfigurator.PIPE_GENERATED_KEY));
+
+  private ActiveLoadPathHelper() {
+    throw new IllegalStateException("Utility class");
+  }
+
+  public static Map<String, String> buildAttributes(
+      final Integer databaseLevel,
+      final Boolean convertOnTypeMismatch,
+      final Boolean verify,
+      final Long tabletConversionThresholdBytes,
+      final Boolean pipeGenerated) {
+    final Map<String, String> attributes = new LinkedHashMap<>();
+
+    if (Objects.nonNull(databaseLevel)) {
+      attributes.put(LoadTsFileConfigurator.DATABASE_LEVEL_KEY, 
databaseLevel.toString());
+    }
+
+    if (Objects.nonNull(convertOnTypeMismatch)) {
+      attributes.put(
+          LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY,
+          Boolean.toString(convertOnTypeMismatch));
+    }
+
+    if (Objects.nonNull(tabletConversionThresholdBytes)) {
+      attributes.put(
+          LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY,
+          tabletConversionThresholdBytes.toString());
+    }
+
+    if (Objects.nonNull(verify)) {
+      attributes.put(LoadTsFileConfigurator.VERIFY_KEY, 
Boolean.toString(verify));
+    }
+
+    if (Objects.nonNull(pipeGenerated) && pipeGenerated) {
+      attributes.put(LoadTsFileConfigurator.PIPE_GENERATED_KEY, 
Boolean.TRUE.toString());
+    }
+    return attributes;
+  }
+
+  public static File resolveTargetDir(final File baseDir, final Map<String, 
String> attributes) {
+    File current = baseDir;
+    for (final String key : KEY_ORDER) {
+      final String value = attributes.get(key);
+      if (value == null) {
+        continue;
+      }
+      current = new File(current, formatSegment(key, value));
+    }
+    return current;
+  }
+
+  public static Map<String, String> parseAttributes(final File file, final 
File pendingDir) {
+    if (file == null) {
+      return Collections.emptyMap();
+    }
+
+    final Map<String, String> attributes = new HashMap<>();
+    File current = file.getParentFile();
+    while (current != null) {
+      final String dirName = current.getName();
+      if (pendingDir != null && current.equals(pendingDir)) {
+        break;
+      }
+      for (final String key : KEY_ORDER) {
+        final String prefix = key + SEGMENT_SEPARATOR;
+        if (dirName.startsWith(prefix)) {
+          extractAndValidateAttributeValue(key, dirName, prefix.length())
+              .ifPresent(value -> attributes.putIfAbsent(key, value));
+          break;
+        }
+      }
+      current = current.getParentFile();
+    }
+    return attributes;
+  }
+
+  public static File findPendingDirectory(final File file) {
+    if (file == null) {
+      return null;
+    }
+    String[] dirs = 
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs();
+    File current = file;
+    while (current != null) {
+      for (final String dir : dirs) {
+        if (current.isDirectory() && current.getAbsolutePath().equals(dir)) {
+          return current;
+        }
+      }
+      current = current.getParentFile();
+    }
+    return null;
+  }
+
+  public static void applyAttributesToStatement(
+      final Map<String, String> attributes,
+      final LoadTsFileStatement statement,
+      final boolean defaultVerify) {
+
+    
Optional.ofNullable(attributes.get(LoadTsFileConfigurator.DATABASE_LEVEL_KEY))
+        .ifPresent(
+            level -> {
+              try {
+                statement.setDatabaseLevel(Integer.parseInt(level));
+              } catch (final NumberFormatException ignored) {
+                // keep the default when parsing fails
+              }
+            });
+
+    
Optional.ofNullable(attributes.get(LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY))
+        .ifPresent(value -> 
statement.setConvertOnTypeMismatch(Boolean.parseBoolean(value)));
+
+    
Optional.ofNullable(attributes.get(LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY))
+        .ifPresent(
+            threshold -> {
+              try {
+                
statement.setTabletConversionThresholdBytes(Long.parseLong(threshold));
+              } catch (final NumberFormatException ignored) {
+                // keep the default when parsing fails
+              }
+            });
+
+    if (attributes.containsKey(LoadTsFileConfigurator.VERIFY_KEY)) {
+      statement.setVerifySchema(
+          
Boolean.parseBoolean(attributes.get(LoadTsFileConfigurator.VERIFY_KEY)));
+    } else {
+      statement.setVerifySchema(defaultVerify);
+    }
+
+    if (attributes.containsKey(LoadTsFileConfigurator.PIPE_GENERATED_KEY)
+        && 
Boolean.parseBoolean(attributes.get(LoadTsFileConfigurator.PIPE_GENERATED_KEY)))
 {
+      statement.markIsGeneratedByPipe();
+    }
+  }
+
+  private static String formatSegment(final String key, final String value) {
+    return key + SEGMENT_SEPARATOR + encodeValue(value);
+  }
+
+  private static String encodeValue(final String value) {
+    try {
+      return URLEncoder.encode(value, StandardCharsets.UTF_8.toString());
+    } catch (final UnsupportedEncodingException e) {
+      // UTF-8 should always be supported; fallback to raw value when 
unexpected
+      return value;
+    }
+  }
+
+  private static Optional<String> extractAndValidateAttributeValue(
+      final String key, final String dirName, final int prefixLength) {
+    if (dirName.length() <= prefixLength) {
+      return Optional.empty();
+    }
+
+    final String encodedValue = dirName.substring(prefixLength);
+    final String decodedValue = decodeValue(encodedValue);
+    try {
+      validateAttributeValue(key, decodedValue);
+      return Optional.of(decodedValue);
+    } catch (final SemanticException e) {
+      return Optional.empty();
+    }
+  }
+
+  private static void validateAttributeValue(final String key, final String 
value) {
+    switch (key) {
+      case LoadTsFileConfigurator.DATABASE_LEVEL_KEY:
+        LoadTsFileConfigurator.validateDatabaseLevelParam(value);
+        break;
+      case LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY:
+        LoadTsFileConfigurator.validateConvertOnTypeMismatchParam(value);
+        break;
+      case LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY:
+        validateTabletConversionThreshold(value);
+        break;
+      case LoadTsFileConfigurator.VERIFY_KEY:
+        LoadTsFileConfigurator.validateVerifyParam(value);
+        break;
+      default:
+        LoadTsFileConfigurator.validateParameters(key, value);
+    }
+  }
+
+  private static void validateTabletConversionThreshold(final String value) {
+    try {
+      final long threshold = Long.parseLong(value);
+      if (threshold < 0) {
+        throw new SemanticException(
+            "Tablet conversion threshold must be a non-negative long value.");
+      }
+    } catch (final NumberFormatException e) {
+      throw new SemanticException(
+          String.format("Tablet conversion threshold '%s' is not a valid long 
value.", value));
+    }
+  }
+
+  private static String decodeValue(final String value) {
+    try {
+      return URLDecoder.decode(value, StandardCharsets.UTF_8.toString());
+    } catch (final UnsupportedEncodingException e) {
+      return value;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
index 6c2b2cd41f5..bb611b842be 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
@@ -21,8 +21,6 @@ package org.apache.iotdb.db.storageengine.load.active;
 
 import 
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesNumberMetricsSet;
 
-import org.apache.tsfile.utils.Pair;
-
 import java.util.HashSet;
 import java.util.Queue;
 import java.util.Set;
@@ -31,13 +29,14 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 public class ActiveLoadPendingQueue {
 
   private final Set<String> pendingFileSet = new HashSet<>();
-  private final Queue<Pair<String, Boolean>> pendingFileQueue = new 
ConcurrentLinkedQueue<>();
+  private final Queue<ActiveLoadEntry> pendingFileQueue = new 
ConcurrentLinkedQueue<>();
 
   private final Set<String> loadingFileSet = new HashSet<>();
 
-  public synchronized boolean enqueue(final String file, final boolean 
isGeneratedByPipe) {
+  public synchronized boolean enqueue(
+      final String file, final String pendingDir, final boolean 
isGeneratedByPipe) {
     if (!loadingFileSet.contains(file) && pendingFileSet.add(file)) {
-      pendingFileQueue.offer(new Pair<>(file, isGeneratedByPipe));
+      pendingFileQueue.offer(new ActiveLoadEntry(file, pendingDir, 
isGeneratedByPipe));
 
       
ActiveLoadingFilesNumberMetricsSet.getInstance().increaseQueuingFileCounter(1);
       return true;
@@ -45,16 +44,16 @@ public class ActiveLoadPendingQueue {
     return false;
   }
 
-  public synchronized Pair<String, Boolean> dequeueFromPending() {
-    final Pair<String, Boolean> pair = pendingFileQueue.poll();
-    if (pair != null) {
-      pendingFileSet.remove(pair.left);
-      loadingFileSet.add(pair.left);
+  public synchronized ActiveLoadEntry dequeueFromPending() {
+    final ActiveLoadEntry entry = pendingFileQueue.poll();
+    if (entry != null) {
+      pendingFileSet.remove(entry.getFile());
+      loadingFileSet.add(entry.getFile());
 
       
ActiveLoadingFilesNumberMetricsSet.getInstance().increaseLoadingFileCounter(1);
       
ActiveLoadingFilesNumberMetricsSet.getInstance().increaseQueuingFileCounter(-1);
     }
-    return pair;
+    return entry;
   }
 
   public synchronized void removeFromLoading(final String file) {
@@ -74,4 +73,28 @@ public class ActiveLoadPendingQueue {
   public boolean isEmpty() {
     return pendingFileQueue.isEmpty() && loadingFileSet.isEmpty();
   }
+
+  public static class ActiveLoadEntry {
+    private final String file;
+    private final String pendingDir;
+    private final boolean isGeneratedByPipe;
+
+    public ActiveLoadEntry(String file, String pendingDir, boolean 
isGeneratedByPipe) {
+      this.file = file;
+      this.pendingDir = pendingDir;
+      this.isGeneratedByPipe = isGeneratedByPipe;
+    }
+
+    public String getFile() {
+      return file;
+    }
+
+    public String getPendingDir() {
+      return pendingDir;
+    }
+
+    public boolean isGeneratedByPipe() {
+      return isGeneratedByPipe;
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index b1f950321dd..0aeb83c959e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -24,10 +24,13 @@ import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import 
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.InternalClientSession;
 import org.apache.iotdb.db.protocol.session.SessionManager;
 import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
@@ -41,7 +44,6 @@ import 
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesSizeMetr
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +56,7 @@ import java.nio.file.Path;
 import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.time.ZoneId;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -67,6 +70,8 @@ public class ActiveLoadTsFileLoader {
 
   private static final IoTDBConfig IOTDB_CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
 
+  private final SessionManager SESSION_MANAGER = SessionManager.getInstance();
+
   private static final int MAX_PENDING_SIZE = 1000;
   private final ActiveLoadPendingQueue pendingQueue = new 
ActiveLoadPendingQueue();
 
@@ -79,12 +84,13 @@ public class ActiveLoadTsFileLoader {
     return MAX_PENDING_SIZE - pendingQueue.size();
   }
 
-  public void tryTriggerTsFileLoad(String absolutePath, boolean 
isGeneratedByPipe) {
+  public void tryTriggerTsFileLoad(
+      String absolutePath, String pendingDir, boolean isGeneratedByPipe) {
     if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
       return;
     }
 
-    if (pendingQueue.enqueue(absolutePath, isGeneratedByPipe)) {
+    if (pendingQueue.enqueue(absolutePath, pendingDir, isGeneratedByPipe)) {
       initFailDirIfNecessary();
       adjustExecutorIfNecessary();
     }
@@ -149,42 +155,55 @@ public class ActiveLoadTsFileLoader {
   }
 
   private void tryLoadPendingTsFiles() {
-    while (true) {
-      final Optional<Pair<String, Boolean>> filePair = tryGetNextPendingFile();
-      if (!filePair.isPresent()) {
-        return;
-      }
+    final IClientSession session =
+        new InternalClientSession(
+            String.format(
+                "%s_%s",
+                ActiveLoadTsFileLoader.class.getSimpleName(), 
Thread.currentThread().getName()));
+    session.setUsername(AuthorityChecker.SUPER_USER);
+    session.setClientVersion(IoTDBConstant.ClientVersion.V_1_0);
+    session.setZoneId(ZoneId.systemDefault());
+
+    try {
+      while (true) {
+        final Optional<ActiveLoadPendingQueue.ActiveLoadEntry> loadEntry = 
tryGetNextPendingFile();
+        if (!loadEntry.isPresent()) {
+          return;
+        }
 
-      try {
-        final TSStatus result = loadTsFile(filePair.get());
-        if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
-            || result.getCode() == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
-          LOGGER.info(
-              "Successfully auto load tsfile {} (isGeneratedByPipe = {})",
-              filePair.get().getLeft(),
-              filePair.get().getRight());
-        } else {
-          handleLoadFailure(filePair.get(), result);
+        try {
+          final TSStatus result = loadTsFile(loadEntry.get(), session);
+          if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+              || result.getCode() == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+            LOGGER.info(
+                "Successfully auto load tsfile {} (isGeneratedByPipe = {})",
+                loadEntry.get().getFile(),
+                loadEntry.get().isGeneratedByPipe());
+          } else {
+            handleLoadFailure(loadEntry.get(), result);
+          }
+        } catch (final FileNotFoundException e) {
+          handleFileNotFoundException(loadEntry.get());
+        } catch (final Exception e) {
+          handleOtherException(loadEntry.get(), e);
+        } finally {
+          pendingQueue.removeFromLoading(loadEntry.get().getFile());
         }
-      } catch (final FileNotFoundException e) {
-        handleFileNotFoundException(filePair.get());
-      } catch (final Exception e) {
-        handleOtherException(filePair.get(), e);
-      } finally {
-        pendingQueue.removeFromLoading(filePair.get().getLeft());
       }
+    } finally {
+      SESSION_MANAGER.closeSession(session, 
Coordinator.getInstance()::cleanupQueryExecution);
     }
   }
 
-  private Optional<Pair<String, Boolean>> tryGetNextPendingFile() {
+  private Optional<ActiveLoadPendingQueue.ActiveLoadEntry> 
tryGetNextPendingFile() {
     final long maxRetryTimes =
         Math.max(1, IOTDB_CONFIG.getLoadActiveListeningCheckIntervalSeconds() 
<< 1);
     long currentRetryTimes = 0;
 
     while (true) {
-      final Pair<String, Boolean> filePair = pendingQueue.dequeueFromPending();
-      if (Objects.nonNull(filePair)) {
-        return Optional.of(filePair);
+      final ActiveLoadPendingQueue.ActiveLoadEntry entry = 
pendingQueue.dequeueFromPending();
+      if (Objects.nonNull(entry)) {
+        return Optional.of(entry);
       }
 
       LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
@@ -195,58 +214,75 @@ public class ActiveLoadTsFileLoader {
     }
   }
 
-  private TSStatus loadTsFile(final Pair<String, Boolean> filePair) throws 
FileNotFoundException {
-    final LoadTsFileStatement statement = new 
LoadTsFileStatement(filePair.getLeft());
+  private TSStatus loadTsFile(
+      final ActiveLoadPendingQueue.ActiveLoadEntry entry, final IClientSession 
session)
+      throws FileNotFoundException {
+    final File tsFile = new File(entry.getFile());
+    final LoadTsFileStatement statement = new 
LoadTsFileStatement(entry.getFile());
+
     statement.setDeleteAfterLoad(true);
-    statement.setConvertOnTypeMismatch(true);
-    statement.setVerifySchema(isVerify);
     statement.setAutoCreateDatabase(
         IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled());
-    return executeStatement(filePair.getRight() ? new 
PipeEnrichedStatement(statement) : statement);
+
+    final File pendingDir =
+        entry.getPendingDir() == null
+            ? ActiveLoadPathHelper.findPendingDirectory(tsFile)
+            : new File(entry.getPendingDir());
+    final Map<String, String> attributes = 
ActiveLoadPathHelper.parseAttributes(tsFile, pendingDir);
+    ActiveLoadPathHelper.applyAttributesToStatement(attributes, statement, 
isVerify);
+
+    return executeStatement(
+        entry.isGeneratedByPipe() ? new PipeEnrichedStatement(statement) : 
statement, session);
   }
 
-  private TSStatus executeStatement(final Statement statement) {
-    return Coordinator.getInstance()
-        .executeForTreeModel(
-            statement,
-            SessionManager.getInstance().requestQueryId(),
-            new SessionInfo(0, AuthorityChecker.SUPER_USER, 
ZoneId.systemDefault()),
-            "",
-            ClusterPartitionFetcher.getInstance(),
-            ClusterSchemaFetcher.getInstance(),
-            IOTDB_CONFIG.getQueryTimeoutThreshold(),
-            false)
-        .status;
+  private TSStatus executeStatement(final Statement statement, final 
IClientSession session) {
+    SESSION_MANAGER.registerSession(session);
+    try {
+      return Coordinator.getInstance()
+          .executeForTreeModel(
+              statement,
+              SessionManager.getInstance().requestQueryId(),
+              new SessionInfo(0, AuthorityChecker.SUPER_USER, 
ZoneId.systemDefault()),
+              "",
+              ClusterPartitionFetcher.getInstance(),
+              ClusterSchemaFetcher.getInstance(),
+              IOTDB_CONFIG.getQueryTimeoutThreshold(),
+              false)
+          .status;
+    } finally {
+      SESSION_MANAGER.removeCurrSession();
+    }
   }
 
-  private void handleLoadFailure(final Pair<String, Boolean> filePair, final 
TSStatus status) {
-    if (!ActiveLoadFailedMessageHandler.isExceptionMessageShouldRetry(
-        filePair, status.getMessage())) {
+  private void handleLoadFailure(
+      final ActiveLoadPendingQueue.ActiveLoadEntry entry, final TSStatus 
status) {
+    if (!ActiveLoadFailedMessageHandler.isExceptionMessageShouldRetry(entry, 
status.getMessage())) {
       LOGGER.warn(
           "Failed to auto load tsfile {} (isGeneratedByPipe = {}), status: {}. 
File will be moved to fail directory.",
-          filePair.getLeft(),
-          filePair.getRight(),
+          entry.getFile(),
+          entry.isGeneratedByPipe(),
           status);
-      removeFileAndResourceAndModsToFailDir(filePair.getLeft());
+      removeFileAndResourceAndModsToFailDir(entry.getFile());
     }
   }
 
-  private void handleFileNotFoundException(final Pair<String, Boolean> 
filePair) {
+  private void handleFileNotFoundException(final 
ActiveLoadPendingQueue.ActiveLoadEntry entry) {
     LOGGER.warn(
         "Failed to auto load tsfile {} (isGeneratedByPipe = {}) due to file 
not found, will skip this file.",
-        filePair.getLeft(),
-        filePair.getRight());
-    removeFileAndResourceAndModsToFailDir(filePair.getLeft());
+        entry.getFile(),
+        entry.isGeneratedByPipe());
+    removeFileAndResourceAndModsToFailDir(entry.getFile());
   }
 
-  private void handleOtherException(final Pair<String, Boolean> filePair, 
final Exception e) {
-    if 
(!ActiveLoadFailedMessageHandler.isExceptionMessageShouldRetry(filePair, 
e.getMessage())) {
+  private void handleOtherException(
+      final ActiveLoadPendingQueue.ActiveLoadEntry entry, final Exception e) {
+    if (!ActiveLoadFailedMessageHandler.isExceptionMessageShouldRetry(entry, 
e.getMessage())) {
       LOGGER.warn(
           "Failed to auto load tsfile {} (isGeneratedByPipe = {}) because of 
an unexpected exception. File will be moved to fail directory.",
-          filePair.getLeft(),
-          filePair.getRight(),
+          entry.getFile(),
+          entry.isGeneratedByPipe(),
           e);
-      removeFileAndResourceAndModsToFailDir(filePair.getLeft());
+      removeFileAndResourceAndModsToFailDir(entry.getFile());
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
index 5bfa9b71105..1ab573a14cd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
@@ -32,7 +32,9 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 import static org.apache.iotdb.commons.utils.FileUtils.copyFileWithMD5Check;
@@ -45,14 +47,16 @@ public class ActiveLoadUtil {
   private static volatile ILoadDiskSelector loadDiskSelector = 
updateLoadDiskSelector();
 
   public static boolean loadTsFileAsyncToActiveDir(
-      final List<File> tsFiles, final String dataBaseName, final boolean 
isDeleteAfterLoad) {
+      final List<File> tsFiles,
+      final Map<String, String> loadAttributes,
+      final boolean isDeleteAfterLoad) {
     if (tsFiles == null || tsFiles.isEmpty()) {
       return true;
     }
 
     try {
       for (File file : tsFiles) {
-        if (!loadTsFilesToActiveDir(dataBaseName, file, isDeleteAfterLoad)) {
+        if (!loadTsFilesToActiveDir(loadAttributes, file, isDeleteAfterLoad)) {
           return false;
         }
       }
@@ -65,7 +69,7 @@ public class ActiveLoadUtil {
   }
 
   private static boolean loadTsFilesToActiveDir(
-      final String dataBaseName, final File file, final boolean 
isDeleteAfterLoad)
+      final Map<String, String> loadAttributes, final File file, final boolean 
isDeleteAfterLoad)
       throws IOException {
     if (file == null) {
       return true;
@@ -84,12 +88,9 @@ public class ActiveLoadUtil {
       LOGGER.warn("Load active listening dir is not set.");
       return false;
     }
-    final File targetDir;
-    if (Objects.nonNull(dataBaseName)) {
-      targetDir = new File(targetFilePath, dataBaseName);
-    } else {
-      targetDir = targetFilePath;
-    }
+    final Map<String, String> attributes =
+        Objects.nonNull(loadAttributes) ? loadAttributes : 
Collections.emptyMap();
+    final File targetDir = 
ActiveLoadPathHelper.resolveTargetDir(targetFilePath, attributes);
 
     loadTsFileAsyncToTargetDir(targetDir, file, isDeleteAfterLoad);
     loadTsFileAsyncToTargetDir(
@@ -100,7 +101,9 @@ public class ActiveLoadUtil {
   }
 
   public static boolean loadFilesToActiveDir(
-      final String dataBaseName, final List<String> files, final boolean 
isDeleteAfterLoad)
+      final Map<String, String> loadAttributes,
+      final List<String> files,
+      final boolean isDeleteAfterLoad)
       throws IOException {
     if (files == null || files.isEmpty()) {
       return true;
@@ -120,12 +123,9 @@ public class ActiveLoadUtil {
       LOGGER.warn("Load active listening dir is not set.");
       return false;
     }
-    final File targetDir;
-    if (Objects.nonNull(dataBaseName)) {
-      targetDir = new File(targetFilePath, dataBaseName);
-    } else {
-      targetDir = targetFilePath;
-    }
+    final Map<String, String> attributes =
+        Objects.nonNull(loadAttributes) ? loadAttributes : 
Collections.emptyMap();
+    final File targetDir = 
ActiveLoadPathHelper.resolveTargetDir(targetFilePath, attributes);
 
     for (final String file : files) {
       loadTsFileAsyncToTargetDir(targetDir, new File(file), isDeleteAfterLoad);
@@ -138,6 +138,11 @@ public class ActiveLoadUtil {
     if (!file.exists()) {
       return;
     }
+    if (!targetDir.exists() && !targetDir.mkdirs()) {
+      if (!targetDir.exists()) {
+        throw new IOException("Failed to create target directory " + 
targetDir.getAbsolutePath());
+      }
+    }
     RetryUtils.retryOnException(
         () -> {
           if (isDeleteAfterLoad) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
index b1d2472603c..6ab188915f5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
@@ -48,6 +48,9 @@ public class LoadTsFileConfigurator {
       case VERIFY_KEY:
         validateVerifyParam(value);
         break;
+      case PIPE_GENERATED_KEY:
+        validatePipeGeneratedParam(value);
+        break;
       case ASYNC_LOAD_KEY:
         validateAsyncLoadParam(value);
         break;
@@ -155,6 +158,21 @@ public class LoadTsFileConfigurator {
         loadAttributes.getOrDefault(VERIFY_KEY, 
String.valueOf(VERIFY_DEFAULT_VALUE)));
   }
 
+  public static final String PIPE_GENERATED_KEY = "pipe-generated";
+
+  public static void validatePipeGeneratedParam(final String pipeGenerated) {
+    if (!"true".equalsIgnoreCase(pipeGenerated) && 
!"false".equalsIgnoreCase(pipeGenerated)) {
+      throw new SemanticException(
+          String.format(
+              "Given %s value '%s' is not supported, please input a valid 
boolean value.",
+              PIPE_GENERATED_KEY, pipeGenerated));
+    }
+  }
+
+  public static boolean parseOrGetDefaultPipeGenerated(final Map<String, 
String> loadAttributes) {
+    return 
Boolean.parseBoolean(loadAttributes.getOrDefault(PIPE_GENERATED_KEY, "false"));
+  }
+
   public static final String ASYNC_LOAD_KEY = "async";
   private static final boolean ASYNC_LOAD_DEFAULT_VALUE = false;
 


Reply via email to