This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 91e48f07f63 feat: encode load attributes in active load directories 
(#16722)
91e48f07f63 is described below

commit 91e48f07f63a1161b46487a281858eb1b4b63479
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Nov 14 15:14:37 2025 +0800

    feat: encode load attributes in active load directories (#16722)
    
    * feat: encode load attributes in active load directories
    
    * spotless
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * update it
    
    * update it
    
    * fix
    
    * update
    
    * update
---
 .../iotdb/db/it/IoTDBLoadTsFileWithModIT.java      |  60 ++++-
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |  12 +-
 .../plan/analyze/load/LoadTsFileAnalyzer.java      |  13 +-
 .../plan/statement/crud/LoadTsFileStatement.java   |  16 +-
 .../load/active/ActiveLoadDirScanner.java          |  23 +-
 .../load/active/ActiveLoadPathHelper.java          | 282 +++++++++++++++++++++
 .../load/active/ActiveLoadPendingQueue.java        |  17 +-
 .../load/active/ActiveLoadTsFileLoader.java        |  36 +--
 .../storageengine/load/active/ActiveLoadUtil.java  |  37 +--
 .../load/config/LoadTsFileConfigurator.java        |  18 ++
 10 files changed, 460 insertions(+), 54 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileWithModIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileWithModIT.java
index 9d0a54f1672..5578023e4f5 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileWithModIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileWithModIT.java
@@ -85,7 +85,7 @@ public class IoTDBLoadTsFileWithModIT {
     // write mods file
     resource
         .getExclusiveModFile()
-        .write(new TreeDeletionEntry(new MeasurementPath("root.test.d1.s1"), 
1, 2));
+        .write(new TreeDeletionEntry(new 
MeasurementPath("root.test.d1.de.s1"), 1, 2));
     resource.getExclusiveModFile().close();
   }
 
@@ -93,7 +93,7 @@ public class IoTDBLoadTsFileWithModIT {
       throws IOException, DataRegionException, WriteProcessException, 
IllegalPathException {
     TsFileResource resource = generateFile();
     ModificationFileV1 oldModFile = ModificationFileV1.getNormalMods(resource);
-    oldModFile.write(new Deletion(new MeasurementPath("root.test.d1.s1"), 
Long.MAX_VALUE, 1, 2));
+    oldModFile.write(new Deletion(new MeasurementPath("root.test.d1.de.s1"), 
Long.MAX_VALUE, 1, 2));
     oldModFile.close();
   }
 
@@ -102,11 +102,11 @@ public class IoTDBLoadTsFileWithModIT {
     File tsfile = new File(tmpDir, "1-1-0-0.tsfile");
     try (TsFileWriter writer = new TsFileWriter(tsfile)) {
       writer.registerAlignedTimeseries(
-          "root.test.d1",
+          "root.test.d1.de",
           Collections.singletonList(new MeasurementSchema("s1", 
TSDataType.BOOLEAN)));
       Tablet tablet =
           new Tablet(
-              "root.test.d1",
+              "root.test.d1.de",
               Collections.singletonList(new MeasurementSchema("s1", 
TSDataType.BOOLEAN)));
       for (int i = 0; i < 5; i++) {
         tablet.addTimestamp(i, i);
@@ -138,13 +138,61 @@ public class IoTDBLoadTsFileWithModIT {
       statement.execute(String.format("load \'%s\'", 
tmpDir.getAbsolutePath()));
 
       try (final ResultSet resultSet =
-          statement.executeQuery("select count(s1) as c from root.test.d1")) {
+          statement.executeQuery("select count(s1) as c from 
root.test.d1.de")) {
         Assert.assertTrue(resultSet.next());
         Assert.assertEquals(3, resultSet.getLong("c"));
       }
     }
   }
 
+  @Test
+  public void testWithNewModFileAndLoadAttributes()
+      throws SQLException,
+          IOException,
+          DataRegionException,
+          WriteProcessException,
+          IllegalPathException {
+    generateFileWithNewModFile();
+    final String databaseName = "root.test.d1";
+
+    try (final Connection connection = EnvFactory.getEnv().getConnection();
+        final Statement statement = connection.createStatement()) {
+
+      statement.execute(
+          String.format(
+              "load \'%s\' with ("
+                  + "'database-name'='%s',"
+                  + "'database-level'='2',"
+                  + "'verify'='true',"
+                  + "'on-success'='none',"
+                  + "'async'='true')",
+              tmpDir.getAbsolutePath(), databaseName));
+
+      boolean databaseFound = false;
+      out:
+      for (int i = 0; i < 10; i++) {
+        try (final ResultSet resultSet = statement.executeQuery("show 
databases")) {
+          while (resultSet.next()) {
+            final String currentDatabase = resultSet.getString(1);
+            if (databaseName.equalsIgnoreCase(currentDatabase)) {
+              databaseFound = true;
+              break out;
+            }
+          }
+
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException e) {
+            break;
+          }
+        }
+      }
+      Assert.assertTrue(
+          "The `database-level` parameter is not working; the generated 
database does not contain 'root.test.d1'.",
+          databaseFound);
+    }
+  }
+
   @Test
   public void testWithOldModFile()
       throws SQLException,
@@ -159,7 +207,7 @@ public class IoTDBLoadTsFileWithModIT {
       statement.execute(String.format("load \'%s\'", 
tmpDir.getAbsolutePath()));
 
       try (final ResultSet resultSet =
-          statement.executeQuery("select count(s1) as c from root.test.d1")) {
+          statement.executeQuery("select count(s1) as c from 
root.test.d1.de")) {
         Assert.assertTrue(resultSet.next());
         Assert.assertEquals(3, resultSet.getLong("c"));
         Assert.assertTrue(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 8a9ceb61c93..652530d11b4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -100,6 +100,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
 import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
+import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
 import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil;
 import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
 import 
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
@@ -570,7 +571,16 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
 
   private TSStatus loadTsFileAsync(final String dataBaseName, final 
List<String> absolutePaths)
       throws IOException {
-    if (!ActiveLoadUtil.loadFilesToActiveDir(dataBaseName, absolutePaths, 
true)) {
+    final Map<String, String> loadAttributes =
+        ActiveLoadPathHelper.buildAttributes(
+            dataBaseName,
+            null,
+            shouldConvertDataTypeOnTypeMismatch,
+            validateTsFile.get(),
+            null,
+            shouldMarkAsPipeRequest.get());
+
+    if (!ActiveLoadUtil.loadFilesToActiveDir(loadAttributes, absolutePaths, 
true)) {
       throw new PipeException("Load active listening pipe dir is not set.");
     }
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 4fe119566d7..076b529ab91 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -42,6 +42,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
+import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
 import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil;
 import 
org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter;
 import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
@@ -281,7 +282,17 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
       } else {
         databaseName = null;
       }
-      if (ActiveLoadUtil.loadTsFileAsyncToActiveDir(tsFiles, databaseName, 
isDeleteAfterLoad)) {
+      final Map<String, String> activeLoadAttributes =
+          ActiveLoadPathHelper.buildAttributes(
+              databaseName,
+              databaseLevel,
+              isConvertOnTypeMismatch,
+              isVerifySchema,
+              tabletConversionThresholdBytes,
+              isGeneratedByPipe);
+
+      if (ActiveLoadUtil.loadTsFileAsyncToActiveDir(
+          tsFiles, activeLoadAttributes, isDeleteAfterLoad)) {
         analysis.setFinishQueryAfterAnalyze(true);
         setRealStatement(analysis);
         return true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index dbc4cb7b64d..2d74925971c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -48,6 +48,7 @@ import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurat
 import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_DELETE_VALUE;
 import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_KEY;
 import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_NONE_VALUE;
+import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.PIPE_GENERATED_KEY;
 import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY;
 
 public class LoadTsFileStatement extends Statement {
@@ -63,8 +64,6 @@ public class LoadTsFileStatement extends Statement {
   private boolean isGeneratedByPipe = false;
   private boolean isAsyncLoad = false;
 
-  private Map<String, String> loadAttributes;
-
   private List<File> tsFiles;
   private List<Boolean> isTableModel;
   private List<TsFileResource> resources;
@@ -245,15 +244,14 @@ public class LoadTsFileStatement extends Statement {
   }
 
   public void setLoadAttributes(final Map<String, String> loadAttributes) {
-    this.loadAttributes = loadAttributes;
-    initAttributes();
+    initAttributes(loadAttributes);
   }
 
   public boolean isAsyncLoad() {
     return isAsyncLoad;
   }
 
-  private void initAttributes() {
+  private void initAttributes(final Map<String, String> loadAttributes) {
     this.databaseLevel = 
LoadTsFileConfigurator.parseOrGetDefaultDatabaseLevel(loadAttributes);
     this.database = LoadTsFileConfigurator.parseDatabaseName(loadAttributes);
     this.deleteAfterLoad = 
LoadTsFileConfigurator.parseOrGetDefaultOnSuccess(loadAttributes);
@@ -263,6 +261,9 @@ public class LoadTsFileStatement extends Statement {
         
LoadTsFileConfigurator.parseOrGetDefaultTabletConversionThresholdBytes(loadAttributes);
     this.verifySchema = 
LoadTsFileConfigurator.parseOrGetDefaultVerify(loadAttributes);
     this.isAsyncLoad = 
LoadTsFileConfigurator.parseOrGetDefaultAsyncLoad(loadAttributes);
+    if (LoadTsFileConfigurator.parseOrGetDefaultPipeGenerated(loadAttributes)) 
{
+      markIsGeneratedByPipe();
+    }
   }
 
   public boolean reconstructStatementIfMiniFileConverted(final List<Boolean> 
isMiniTsFile) {
@@ -314,7 +315,7 @@ public class LoadTsFileStatement extends Statement {
   @Override
   public org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement 
toRelationalStatement(
       MPPQueryContext context) {
-    loadAttributes = new HashMap<>();
+    final Map<String, String> loadAttributes = new HashMap<>();
 
     loadAttributes.put(DATABASE_LEVEL_KEY, String.valueOf(databaseLevel));
     if (database != null) {
@@ -326,6 +327,9 @@ public class LoadTsFileStatement extends Statement {
     loadAttributes.put(
         TABLET_CONVERSION_THRESHOLD_KEY, 
String.valueOf(tabletConversionThresholdBytes));
     loadAttributes.put(ASYNC_LOAD_KEY, String.valueOf(isAsyncLoad));
+    if (isGeneratedByPipe) {
+      loadAttributes.put(PIPE_GENERATED_KEY, String.valueOf(true));
+    }
 
     return new LoadTsFile(null, file.getAbsolutePath(), loadAttributes);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
index 5ac33dfbdd4..470cf702b22 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
@@ -39,6 +39,7 @@ import java.nio.file.Path;
 import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.util.Arrays;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
@@ -117,13 +118,23 @@ public class ActiveLoadDirScanner extends 
ActiveLoadScheduledExecutorService {
               .filter(this::isTsFileCompleted)
               .limit(currentAllowedPendingSize)
               .forEach(
-                  file -> {
-                    final File parentFile = new File(file).getParentFile();
+                  filePath -> {
+                    final File tsFile = new File(filePath);
+                    final Map<String, String> attributes =
+                        ActiveLoadPathHelper.parseAttributes(tsFile, 
listeningDirFile);
+
+                    final File parentFile = tsFile.getParentFile();
+                    final boolean isTableModel =
+                        ActiveLoadPathHelper.containsDatabaseName(attributes)
+                            || (parentFile != null
+                                && !Objects.equals(
+                                    parentFile.getAbsoluteFile(),
+                                    listeningDirFile.getAbsoluteFile()));
+
                     activeLoadTsFileLoader.tryTriggerTsFileLoad(
-                        file,
-                        parentFile != null
-                            && !Objects.equals(
-                                parentFile.getAbsoluteFile(), 
listeningDirFile.getAbsoluteFile()),
+                        tsFile.getAbsolutePath(),
+                        listeningDirFile.getAbsolutePath(),
+                        isTableModel,
                         isGeneratedByPipe);
                   });
         } catch (UncheckedIOException e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPathHelper.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPathHelper.java
new file mode 100644
index 00000000000..7c83f20f369
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPathHelper.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.active;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator;
+
+import java.io.File;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Utility methods for encoding and decoding load attributes into directory 
structures. */
+public final class ActiveLoadPathHelper {
+
+  private static final String SEGMENT_SEPARATOR = "-";
+
+  private static final List<String> KEY_ORDER =
+      Collections.unmodifiableList(
+          Arrays.asList(
+              LoadTsFileConfigurator.DATABASE_NAME_KEY,
+              LoadTsFileConfigurator.DATABASE_LEVEL_KEY,
+              LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY,
+              LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY,
+              LoadTsFileConfigurator.VERIFY_KEY,
+              LoadTsFileConfigurator.DATABASE_KEY,
+              LoadTsFileConfigurator.PIPE_GENERATED_KEY));
+
+  private ActiveLoadPathHelper() {
+    throw new IllegalStateException("Utility class");
+  }
+
+  public static Map<String, String> buildAttributes(
+      final String databaseName,
+      final Integer databaseLevel,
+      final Boolean convertOnTypeMismatch,
+      final Boolean verify,
+      final Long tabletConversionThresholdBytes,
+      final Boolean pipeGenerated) {
+    final Map<String, String> attributes = new LinkedHashMap<>();
+    if (Objects.nonNull(databaseName) && !databaseName.isEmpty()) {
+      attributes.put(LoadTsFileConfigurator.DATABASE_NAME_KEY, databaseName);
+    }
+
+    if (Objects.nonNull(databaseLevel)) {
+      attributes.put(LoadTsFileConfigurator.DATABASE_LEVEL_KEY, 
databaseLevel.toString());
+    }
+
+    if (Objects.nonNull(convertOnTypeMismatch)) {
+      attributes.put(
+          LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY,
+          Boolean.toString(convertOnTypeMismatch));
+    }
+
+    if (Objects.nonNull(tabletConversionThresholdBytes)) {
+      attributes.put(
+          LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY,
+          tabletConversionThresholdBytes.toString());
+    }
+
+    if (Objects.nonNull(verify)) {
+      attributes.put(LoadTsFileConfigurator.VERIFY_KEY, 
Boolean.toString(verify));
+    }
+
+    if (Objects.nonNull(pipeGenerated) && pipeGenerated) {
+      attributes.put(LoadTsFileConfigurator.PIPE_GENERATED_KEY, 
Boolean.TRUE.toString());
+    }
+    return attributes;
+  }
+
+  public static File resolveTargetDir(final File baseDir, final Map<String, 
String> attributes) {
+    File current = baseDir;
+    for (final String key : KEY_ORDER) {
+      final String value = attributes.get(key);
+      if (value == null) {
+        continue;
+      }
+      current = new File(current, formatSegment(key, value));
+    }
+    return current;
+  }
+
+  public static Map<String, String> parseAttributes(final File file, final 
File pendingDir) {
+    if (file == null) {
+      return Collections.emptyMap();
+    }
+
+    final Map<String, String> attributes = new HashMap<>();
+    File current = file.getParentFile();
+    while (current != null) {
+      final String dirName = current.getName();
+      if (pendingDir != null && current.equals(pendingDir)) {
+        break;
+      }
+      for (final String key : KEY_ORDER) {
+        final String prefix = key + SEGMENT_SEPARATOR;
+        if (dirName.startsWith(prefix)) {
+          extractAndValidateAttributeValue(key, dirName, prefix.length())
+              .ifPresent(value -> attributes.putIfAbsent(key, value));
+          break;
+        }
+      }
+      current = current.getParentFile();
+    }
+    return attributes;
+  }
+
+  public static File findPendingDirectory(final File file) {
+    if (file == null) {
+      return null;
+    }
+    String[] dirs = 
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs();
+    File current = file;
+    while (current != null) {
+      for (final String dir : dirs) {
+        if (current.isDirectory() && current.getAbsolutePath().equals(dir)) {
+          return current;
+        }
+      }
+      current = current.getParentFile();
+    }
+    return null;
+  }
+
+  public static void applyAttributesToStatement(
+      final Map<String, String> attributes,
+      final LoadTsFileStatement statement,
+      final boolean defaultVerify) {
+
+    
Optional.ofNullable(attributes.get(LoadTsFileConfigurator.DATABASE_NAME_KEY))
+        .filter(name -> !name.isEmpty())
+        .ifPresent(statement::setDatabase);
+
+    if (statement.getDatabase() == null || statement.getDatabase().isEmpty()) {
+      Optional.ofNullable(attributes.get(LoadTsFileConfigurator.DATABASE_KEY))
+          .filter(name -> !name.isEmpty())
+          .ifPresent(statement::setDatabase);
+    }
+
+    
Optional.ofNullable(attributes.get(LoadTsFileConfigurator.DATABASE_LEVEL_KEY))
+        .ifPresent(
+            level -> {
+              try {
+                statement.setDatabaseLevel(Integer.parseInt(level));
+              } catch (final NumberFormatException ignored) {
+                // keep the default when parsing fails
+              }
+            });
+
+    
Optional.ofNullable(attributes.get(LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY))
+        .ifPresent(value -> 
statement.setConvertOnTypeMismatch(Boolean.parseBoolean(value)));
+
+    
Optional.ofNullable(attributes.get(LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY))
+        .ifPresent(
+            threshold -> {
+              try {
+                
statement.setTabletConversionThresholdBytes(Long.parseLong(threshold));
+              } catch (final NumberFormatException ignored) {
+                // keep the default when parsing fails
+              }
+            });
+
+    if (attributes.containsKey(LoadTsFileConfigurator.VERIFY_KEY)) {
+      statement.setVerifySchema(
+          
Boolean.parseBoolean(attributes.get(LoadTsFileConfigurator.VERIFY_KEY)));
+    } else {
+      statement.setVerifySchema(defaultVerify);
+    }
+
+    if (attributes.containsKey(LoadTsFileConfigurator.PIPE_GENERATED_KEY)
+        && 
Boolean.parseBoolean(attributes.get(LoadTsFileConfigurator.PIPE_GENERATED_KEY)))
 {
+      statement.markIsGeneratedByPipe();
+    }
+  }
+
+  public static boolean containsDatabaseName(final Map<String, String> 
attributes) {
+    return attributes.containsKey(LoadTsFileConfigurator.DATABASE_NAME_KEY)
+        || attributes.containsKey(LoadTsFileConfigurator.DATABASE_KEY);
+  }
+
+  private static String formatSegment(final String key, final String value) {
+    return key + SEGMENT_SEPARATOR + encodeValue(value);
+  }
+
+  private static String encodeValue(final String value) {
+    try {
+      return URLEncoder.encode(value, StandardCharsets.UTF_8.toString());
+    } catch (final UnsupportedEncodingException e) {
+      // UTF-8 should always be supported; fallback to raw value when 
unexpected
+      return value;
+    }
+  }
+
+  private static Optional<String> extractAndValidateAttributeValue(
+      final String key, final String dirName, final int prefixLength) {
+    if (dirName.length() <= prefixLength) {
+      return Optional.empty();
+    }
+
+    final String encodedValue = dirName.substring(prefixLength);
+    final String decodedValue = decodeValue(encodedValue);
+    try {
+      validateAttributeValue(key, decodedValue);
+      return Optional.of(decodedValue);
+    } catch (final SemanticException e) {
+      return Optional.empty();
+    }
+  }
+
+  private static void validateAttributeValue(final String key, final String 
value) {
+    switch (key) {
+      case LoadTsFileConfigurator.DATABASE_NAME_KEY:
+        if (value == null || value.isEmpty()) {
+          throw new SemanticException("Database name must not be empty.");
+        }
+        break;
+      case LoadTsFileConfigurator.DATABASE_LEVEL_KEY:
+        LoadTsFileConfigurator.validateDatabaseLevelParam(value);
+        break;
+      case LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY:
+        LoadTsFileConfigurator.validateConvertOnTypeMismatchParam(value);
+        break;
+      case LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY:
+        validateTabletConversionThreshold(value);
+        break;
+      case LoadTsFileConfigurator.VERIFY_KEY:
+        LoadTsFileConfigurator.validateVerifyParam(value);
+        break;
+      default:
+        LoadTsFileConfigurator.validateParameters(key, value);
+    }
+  }
+
+  private static void validateTabletConversionThreshold(final String value) {
+    try {
+      final long threshold = Long.parseLong(value);
+      if (threshold < 0) {
+        throw new SemanticException(
+            "Tablet conversion threshold must be a non-negative long value.");
+      }
+    } catch (final NumberFormatException e) {
+      throw new SemanticException(
+          String.format("Tablet conversion threshold '%s' is not a valid long 
value.", value));
+    }
+  }
+
+  private static String decodeValue(final String value) {
+    try {
+      return URLDecoder.decode(value, StandardCharsets.UTF_8.toString());
+    } catch (final UnsupportedEncodingException e) {
+      return value;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
index 3ec283cd2e0..7b5f7166197 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
@@ -34,9 +34,13 @@ public class ActiveLoadPendingQueue {
   private final Set<String> loadingFileSet = new HashSet<>();
 
   public synchronized boolean enqueue(
-      final String file, final boolean isGeneratedByPipe, final boolean 
isTableModel) {
+      final String file,
+      final String pendingDir,
+      final boolean isGeneratedByPipe,
+      final boolean isTableModel) {
     if (!loadingFileSet.contains(file) && pendingFileSet.add(file)) {
-      pendingFileQueue.offer(new ActiveLoadEntry(file, isGeneratedByPipe, 
isTableModel));
+      pendingFileQueue.offer(
+          new ActiveLoadEntry(file, pendingDir, isGeneratedByPipe, 
isTableModel));
 
       
ActiveLoadingFilesNumberMetricsSet.getInstance().increaseQueuingFileCounter(1);
       return true;
@@ -76,11 +80,14 @@ public class ActiveLoadPendingQueue {
 
   public static class ActiveLoadEntry {
     private final String file;
+    private final String pendingDir;
     private final boolean isGeneratedByPipe;
     private final boolean isTableModel;
 
-    public ActiveLoadEntry(String file, boolean isGeneratedByPipe, boolean 
isTableModel) {
+    public ActiveLoadEntry(
+        String file, String pendingDir, boolean isGeneratedByPipe, boolean 
isTableModel) {
       this.file = file;
+      this.pendingDir = pendingDir;
       this.isGeneratedByPipe = isGeneratedByPipe;
       this.isTableModel = isTableModel;
     }
@@ -89,6 +96,10 @@ public class ActiveLoadPendingQueue {
       return file;
     }
 
+    public String getPendingDir() {
+      return pendingDir;
+    }
+
     public boolean isGeneratedByPipe() {
       return isGeneratedByPipe;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index 62feb562be7..20817c94146 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -56,6 +56,7 @@ import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.time.ZoneId;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -84,12 +85,12 @@ public class ActiveLoadTsFileLoader {
   }
 
   public void tryTriggerTsFileLoad(
-      String absolutePath, boolean isTabletMode, boolean isGeneratedByPipe) {
+      String absolutePath, String pendingDir, boolean isTabletMode, boolean 
isGeneratedByPipe) {
     if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
       return;
     }
 
-    if (pendingQueue.enqueue(absolutePath, isGeneratedByPipe, isTabletMode)) {
+    if (pendingQueue.enqueue(absolutePath, pendingDir, isGeneratedByPipe, 
isTabletMode)) {
       initFailDirIfNecessary();
       adjustExecutorIfNecessary();
     }
@@ -216,24 +217,29 @@ public class ActiveLoadTsFileLoader {
   private TSStatus loadTsFile(
       final ActiveLoadPendingQueue.ActiveLoadEntry entry, final IClientSession 
session)
       throws FileNotFoundException {
-    final LoadTsFileStatement statement = new 
LoadTsFileStatement(entry.getFile());
+    final File tsFile = new File(entry.getFile());
+    final LoadTsFileStatement statement = new 
LoadTsFileStatement(tsFile.getAbsolutePath());
     final List<File> files = statement.getTsFiles();
 
-    // It should be noted here that the instructions in this code block do not 
need to use the
-    // DataBase, so the DataBase is assigned a value of null. If the DataBase 
is used later, an
-    // exception will be thrown.
-    final File parentFile;
-    statement.setDatabase(
-        files.isEmpty()
-                || !entry.isTableModel()
-                || (parentFile = files.get(0).getParentFile()) == null
-            ? null
-            : parentFile.getName());
     statement.setDeleteAfterLoad(true);
-    statement.setConvertOnTypeMismatch(true);
-    statement.setVerifySchema(isVerify);
     statement.setAutoCreateDatabase(
         IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled());
+
+    final File pendingDir =
+        entry.getPendingDir() == null
+            ? ActiveLoadPathHelper.findPendingDirectory(tsFile)
+            : new File(entry.getPendingDir());
+    final Map<String, String> attributes = 
ActiveLoadPathHelper.parseAttributes(tsFile, pendingDir);
+    ActiveLoadPathHelper.applyAttributesToStatement(attributes, statement, 
isVerify);
+
+    final File parentFile;
+    if (statement.getDatabase() == null && entry.isTableModel()) {
+      statement.setDatabase(
+          files.isEmpty() || (parentFile = files.get(0).getParentFile()) == 
null
+              ? null
+              : parentFile.getName());
+    }
+
     return executeStatement(
         entry.isGeneratedByPipe() ? new PipeEnrichedStatement(statement) : 
statement, session);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
index 669013ec450..e3dbe43507d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
@@ -32,7 +32,9 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 import static org.apache.iotdb.commons.utils.FileUtils.copyFileWithMD5Check;
@@ -45,14 +47,16 @@ public class ActiveLoadUtil {
   private static volatile ILoadDiskSelector loadDiskSelector = 
updateLoadDiskSelector();
 
   public static boolean loadTsFileAsyncToActiveDir(
-      final List<File> tsFiles, final String dataBaseName, final boolean 
isDeleteAfterLoad) {
+      final List<File> tsFiles,
+      final Map<String, String> loadAttributes,
+      final boolean isDeleteAfterLoad) {
     if (tsFiles == null || tsFiles.isEmpty()) {
       return true;
     }
 
     try {
       for (File file : tsFiles) {
-        if (!loadTsFilesToActiveDir(dataBaseName, file, isDeleteAfterLoad)) {
+        if (!loadTsFilesToActiveDir(loadAttributes, file, isDeleteAfterLoad)) {
           return false;
         }
       }
@@ -65,7 +69,7 @@ public class ActiveLoadUtil {
   }
 
   private static boolean loadTsFilesToActiveDir(
-      final String dataBaseName, final File file, final boolean 
isDeleteAfterLoad)
+      final Map<String, String> loadAttributes, final File file, final boolean 
isDeleteAfterLoad)
       throws IOException {
     if (file == null) {
       return true;
@@ -84,12 +88,9 @@ public class ActiveLoadUtil {
       LOGGER.warn("Load active listening dir is not set.");
       return false;
     }
-    final File targetDir;
-    if (Objects.nonNull(dataBaseName)) {
-      targetDir = new File(targetFilePath, dataBaseName);
-    } else {
-      targetDir = targetFilePath;
-    }
+    final Map<String, String> attributes =
+        Objects.nonNull(loadAttributes) ? loadAttributes : 
Collections.emptyMap();
+    final File targetDir = 
ActiveLoadPathHelper.resolveTargetDir(targetFilePath, attributes);
 
     loadTsFileAsyncToTargetDir(
         targetDir, new File(file.getAbsolutePath() + ".resource"), 
isDeleteAfterLoad);
@@ -100,7 +101,9 @@ public class ActiveLoadUtil {
   }
 
   public static boolean loadFilesToActiveDir(
-      final String dataBaseName, final List<String> files, final boolean 
isDeleteAfterLoad)
+      final Map<String, String> loadAttributes,
+      final List<String> files,
+      final boolean isDeleteAfterLoad)
       throws IOException {
     if (files == null || files.isEmpty()) {
       return true;
@@ -120,12 +123,9 @@ public class ActiveLoadUtil {
       LOGGER.warn("Load active listening dir is not set.");
       return false;
     }
-    final File targetDir;
-    if (Objects.nonNull(dataBaseName)) {
-      targetDir = new File(targetFilePath, dataBaseName);
-    } else {
-      targetDir = targetFilePath;
-    }
+    final Map<String, String> attributes =
+        Objects.nonNull(loadAttributes) ? loadAttributes : 
Collections.emptyMap();
+    final File targetDir = 
ActiveLoadPathHelper.resolveTargetDir(targetFilePath, attributes);
 
     for (final String file : files) {
       loadTsFileAsyncToTargetDir(targetDir, new File(file), isDeleteAfterLoad);
@@ -138,6 +138,11 @@ public class ActiveLoadUtil {
     if (!file.exists()) {
       return;
     }
+    if (!targetDir.exists() && !targetDir.mkdirs()) {
+      if (!targetDir.exists()) {
+        throw new IOException("Failed to create target directory " + 
targetDir.getAbsolutePath());
+      }
+    }
     RetryUtils.retryOnException(
         () -> {
           if (isDeleteAfterLoad) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
index da94de532d1..8478486781b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
@@ -54,6 +54,9 @@ public class LoadTsFileConfigurator {
       case VERIFY_KEY:
         validateVerifyParam(value);
         break;
+      case PIPE_GENERATED_KEY:
+        validatePipeGeneratedParam(value);
+        break;
       case ASYNC_LOAD_KEY:
         validateAsyncLoadParam(value);
         break;
@@ -183,6 +186,21 @@ public class LoadTsFileConfigurator {
         loadAttributes.getOrDefault(VERIFY_KEY, 
String.valueOf(VERIFY_DEFAULT_VALUE)));
   }
 
+  public static final String PIPE_GENERATED_KEY = "pipe-generated";
+
+  public static void validatePipeGeneratedParam(final String pipeGenerated) {
+    if (!"true".equalsIgnoreCase(pipeGenerated) && 
!"false".equalsIgnoreCase(pipeGenerated)) {
+      throw new SemanticException(
+          String.format(
+              "Given %s value '%s' is not supported, please input a valid 
boolean value.",
+              PIPE_GENERATED_KEY, pipeGenerated));
+    }
+  }
+
+  public static boolean parseOrGetDefaultPipeGenerated(final Map<String, 
String> loadAttributes) {
+    return 
Boolean.parseBoolean(loadAttributes.getOrDefault(PIPE_GENERATED_KEY, "false"));
+  }
+
   public static final String ASYNC_LOAD_KEY = "async";
   private static final boolean ASYNC_LOAD_DEFAULT_VALUE = false;
 


Reply via email to