This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 699c90eff9d Load: add the function of transferring too many time 
partitions of files to tablets and fixed the problem that the data written to 
tablets is more than expected. (#16320)
699c90eff9d is described below

commit 699c90eff9d1c4969935dc94a2b4479e0d0e5d61
Author: Zhenyu Luo <zhe...@apache.org>
AuthorDate: Thu Sep 11 10:23:30 2025 +0800

    Load: add the function of transferring too many time partitions of files to 
tablets and fixed the problem that the data written to tablets is more than 
expected. (#16320)
    
    * update
    
    * update
    
    * update
    
    * update
    
    * update
    
    * update
    
    * simplify
    
    * Update pom.xml
    
    * fix
    
    * fix
    
    * fix IT
    
    ---------
    
    Co-authored-by: Caideyipi <87789683+caidey...@users.noreply.github.com>
---
 .../org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java  |  5 ++++-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 23 ++++++++++++++++++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  6 ++++++
 ...ileInsertionEventTableParserTabletIterator.java | 25 ++++++++++++++++++++--
 .../planner/plan/node/write/InsertTabletNode.java  |  4 ++--
 .../plan/statement/crud/InsertTabletStatement.java | 18 ++++++++++------
 .../converter/LoadTsFileDataTypeConverter.java     | 11 ++++++----
 .../load/splitter/TsFileSplitter.java              | 21 ++++++++++++++++++
 .../planner/node/write/WritePlanNodeSplitTest.java |  3 +++
 9 files changed, 101 insertions(+), 15 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
index 7d5f7be8a1e..c838bc843ae 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
@@ -83,11 +83,14 @@ public class IoTDBLoadTsFileIT {
     tmpDir = new File(Files.createTempDirectory("load").toUri());
     
EnvFactory.getEnv().getConfig().getCommonConfig().setTimePartitionInterval(PARTITION_INTERVAL);
     
EnvFactory.getEnv().getConfig().getCommonConfig().setEnforceStrongPassword(false);
+    
EnvFactory.getEnv().getConfig().getCommonConfig().setPipeMemoryManagementEnabled(false);
+    
EnvFactory.getEnv().getConfig().getCommonConfig().setDatanodeMemoryProportion("1:10:1:1:1:1");
     EnvFactory.getEnv()
         .getConfig()
         .getDataNodeConfig()
         .setConnectionTimeoutInMS(connectionTimeoutInMS)
         
.setLoadTsFileAnalyzeSchemaMemorySizeInBytes(loadTsFileAnalyzeSchemaMemorySizeInBytes);
+
     EnvFactory.getEnv().initClusterEnvironment();
   }
 
@@ -224,7 +227,7 @@ public class IoTDBLoadTsFileIT {
       generator.generateData(SchemaConfig.DEVICE_2, 10000, PARTITION_INTERVAL 
/ 10_000, false);
       generator.generateData(SchemaConfig.DEVICE_3, 10000, PARTITION_INTERVAL 
/ 10_000, false);
       generator.generateData(SchemaConfig.DEVICE_4, 10000, PARTITION_INTERVAL 
/ 10_000, true);
-      for (int i = 0; i < 10000; i++) {
+      for (int i = 0; i < 1000; i++) {
         generator.generateData(SchemaConfig.DEVICE_4, 1, PARTITION_INTERVAL - 
10, true);
       }
       writtenPoint2 = generator.getTotalNumber();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index ecb18b3c575..55c5410a555 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1108,6 +1108,8 @@ public class IoTDBConfig {
 
   private long loadMeasurementIdCacheSizeInBytes = 2 * 1024 * 1024L; // 2MB
 
+  private int loadTsFileSpiltPartitionMaxSize = 10;
+
   private String[] loadActiveListeningDirs =
       new String[] {
         IoTDBConstant.EXT_FOLDER_NAME
@@ -3997,6 +3999,27 @@ public class IoTDBConfig {
     this.pipeReceiverFileDirs = pipeReceiverFileDirs;
   }
 
+  public int getLoadTsFileSpiltPartitionMaxSize() {
+    return loadTsFileSpiltPartitionMaxSize;
+  }
+
+  public void setLoadTsFileSpiltPartitionMaxSize(int 
loadTsFileSpiltPartitionMaxSize) {
+    if (loadTsFileSpiltPartitionMaxSize <= 0) {
+      throw new IllegalArgumentException(
+          "loadTsFileSpiltPartitionMaxSize should be greater than or equal to 
0");
+    }
+
+    if (this.loadTsFileSpiltPartitionMaxSize == 
loadTsFileSpiltPartitionMaxSize) {
+      return;
+    }
+
+    logger.info(
+        "Set loadTsFileSpiltPartitionMaxSize from {} to {}",
+        this.loadTsFileSpiltPartitionMaxSize,
+        loadTsFileSpiltPartitionMaxSize);
+    this.loadTsFileSpiltPartitionMaxSize = loadTsFileSpiltPartitionMaxSize;
+  }
+
   public String[] getPipeReceiverFileDirs() {
     return (Objects.isNull(this.pipeReceiverFileDirs) || 
this.pipeReceiverFileDirs.length == 0)
         ? new String[] {systemDir + File.separator + "pipe" + File.separator + 
"receiver"}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 8dc51d1964f..2aa679e6db8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2404,6 +2404,12 @@ public class IoTDBDescriptor {
         properties.getProperty(
             "load_active_listening_fail_dir",
             
ConfigurationFileUtils.getConfigurationDefaultValue("load_active_listening_fail_dir")));
+
+    conf.setLoadTsFileSpiltPartitionMaxSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "load_tsfile_split_partition_max_size",
+                Integer.toString(conf.getLoadTsFileSpiltPartitionMaxSize()))));
   }
 
   private void loadPipeHotModifiedProp(TrimProperties properties) throws 
IOException {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
index 1665a8b3e0e..746d9d5b4a0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
@@ -40,6 +40,7 @@ import org.apache.tsfile.read.controller.IMetadataQuerier;
 import org.apache.tsfile.read.controller.MetadataQuerierByFileImpl;
 import org.apache.tsfile.read.reader.IChunkReader;
 import org.apache.tsfile.read.reader.chunk.TableChunkReader;
+import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.DateUtils;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.TsPrimitiveType;
@@ -394,6 +395,13 @@ public class TsFileInsertionEventTableParserTabletIterator 
implements Iterator<T
     for (int i = deviceIdSize, size = dataTypeList.size(); i < size; i++) {
       final TsPrimitiveType primitiveType = primitiveTypes[i - deviceIdSize];
       if (primitiveType == null) {
+        switch (dataTypeList.get(i)) {
+          case TEXT:
+          case BLOB:
+          case STRING:
+            tablet.addValue(rowIndex, i, Binary.EMPTY_VALUE.getValues());
+        }
+        tablet.getBitMaps()[i].mark(rowIndex);
         continue;
       }
 
@@ -420,7 +428,11 @@ public class TsFileInsertionEventTableParserTabletIterator 
implements Iterator<T
         case TEXT:
         case BLOB:
         case STRING:
-          tablet.addValue(rowIndex, i, primitiveType.getBinary().getValues());
+          Binary binary = primitiveType.getBinary();
+          tablet.addValue(
+              rowIndex,
+              i,
+              binary.getValues() == null ? Binary.EMPTY_VALUE.getValues() : 
binary.getValues());
           break;
         default:
           throw new UnSupportedDataTypeException("UnSupported" + 
primitiveType.getDataType());
@@ -431,11 +443,20 @@ public class 
TsFileInsertionEventTableParserTabletIterator implements Iterator<T
   private void fillDeviceIdColumns(
       final IDeviceID deviceID, final Tablet tablet, final int rowIndex) {
     final String[] deviceIdSegments = (String[]) deviceID.getSegments();
-    for (int i = 1, totalColumns = deviceIdSegments.length; i < totalColumns; 
i++) {
+    int i = 1;
+    for (int totalColumns = deviceIdSegments.length; i < totalColumns; i++) {
       if (deviceIdSegments[i] == null) {
+        tablet.addValue(rowIndex, i - 1, Binary.EMPTY_VALUE.getValues());
+        tablet.getBitMaps()[i - 1].mark(rowIndex);
         continue;
       }
       tablet.addValue(rowIndex, i - 1, deviceIdSegments[i]);
     }
+
+    while (i <= deviceIdSize) {
+      tablet.addValue(rowIndex, i - 1, Binary.EMPTY_VALUE.getValues());
+      tablet.getBitMaps()[i - 1].mark(rowIndex);
+      i++;
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index be0b00931fd..edbc262971a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -231,7 +231,7 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
 
     Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap = new 
LinkedHashMap<>();
 
-    for (int i = 1; i < times.length; i++) { // times are sorted in session 
API.
+    for (int i = 1; i < rowCount; i++) { // times are sorted in session API.
       IDeviceID nextDeviceId = getDeviceID(i);
       if (times[i] >= upperBoundOfTimePartition || 
!currDeviceId.equals(nextDeviceId)) {
         final PartitionSplitInfo splitInfo =
@@ -253,7 +253,7 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
         deviceIDSplitInfoMap.computeIfAbsent(currDeviceId, deviceID1 -> new 
PartitionSplitInfo());
     // the final range
     splitInfo.ranges.add(startLoc); // included
-    splitInfo.ranges.add(times.length); // excluded
+    splitInfo.ranges.add(rowCount); // excluded
     splitInfo.timePartitionSlots.add(timePartitionSlot);
 
     return deviceIDSplitInfoMap;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index e7eafc4df72..4c255e0882f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -124,12 +124,18 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
   }
 
   private Object convertTableColumn(final Object input) {
-    return input instanceof LocalDate[]
-        ? Arrays.stream(((LocalDate[]) input))
-            .map(date -> Objects.nonNull(date) ? 
DateUtils.parseDateExpressionToInt(date) : 0)
-            .mapToInt(Integer::intValue)
-            .toArray()
-        : input;
+    if (input instanceof LocalDate[]) {
+      return Arrays.stream(((LocalDate[]) input))
+          .map(date -> Objects.nonNull(date) ? 
DateUtils.parseDateExpressionToInt(date) : 0)
+          .mapToInt(Integer::intValue)
+          .toArray();
+    } else if (input instanceof Binary[]) {
+      return Arrays.stream(((Binary[]) input))
+          .map(binary -> Objects.nonNull(binary) ? binary : Binary.EMPTY_VALUE)
+          .toArray(Binary[]::new);
+    }
+
+    return input;
   }
 
   public InsertTabletStatement(InsertTabletNode node) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
index 59ff67f902d..62deb10b368 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
@@ -62,16 +62,19 @@ public class LoadTsFileDataTypeConverter {
 
   private final SqlParser relationalSqlParser = new SqlParser();
   private final LoadTableStatementDataTypeConvertExecutionVisitor
-      tableStatementDataTypeConvertExecutionVisitor =
-          new 
LoadTableStatementDataTypeConvertExecutionVisitor(this::executeForTableModel);
+      tableStatementDataTypeConvertExecutionVisitor;
   private final LoadTreeStatementDataTypeConvertExecutionVisitor
-      treeStatementDataTypeConvertExecutionVisitor =
-          new 
LoadTreeStatementDataTypeConvertExecutionVisitor(this::executeForTreeModel);
+      treeStatementDataTypeConvertExecutionVisitor;
 
   public LoadTsFileDataTypeConverter(
       final MPPQueryContext context, final boolean isGeneratedByPipe) {
     this.context = context;
     this.isGeneratedByPipe = isGeneratedByPipe;
+
+    tableStatementDataTypeConvertExecutionVisitor =
+        new 
LoadTableStatementDataTypeConvertExecutionVisitor(this::executeForTableModel);
+    treeStatementDataTypeConvertExecutionVisitor =
+        new 
LoadTreeStatementDataTypeConvertExecutionVisitor(this::executeForTreeModel);
   }
 
   public Optional<TSStatus> convertForTableModel(final LoadTsFile 
loadTsFileTableStatement) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
index d194919c18d..5a75f4fb8e0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.storageengine.load.splitter;
 
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.load.LoadFileException;
 import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
@@ -62,6 +64,8 @@ import java.util.Set;
 public class TsFileSplitter {
   private static final Logger logger = 
LoggerFactory.getLogger(TsFileSplitter.class);
 
+  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+
   private final File tsFile;
   private final TsFileDataConsumer consumer;
   private Map<Long, IChunkMetadata> offset2ChunkMetadata = new HashMap<>();
@@ -72,6 +76,7 @@ public class TsFileSplitter {
   private IDeviceID curDevice = null;
   private boolean isAligned;
   private int timeChunkIndexOfCurrentValueColumn = 0;
+  private Set<TTimePartitionSlot> timePartitionSlots = new HashSet<>();
 
   // Maintain the number of times the chunk of each measurement appears.
   private Map<String, Integer> valueColumn2TimeChunkIndex = new HashMap<>();
@@ -445,6 +450,14 @@ public class TsFileSplitter {
       }
     }
     for (AlignedChunkData chunkData : chunkDataMap.keySet()) {
+      timePartitionSlots.add(chunkData.getTimePartitionSlot());
+      if (deletions.isEmpty()
+          && timePartitionSlots.size() > 
CONFIG.getLoadTsFileSpiltPartitionMaxSize()) {
+        throw new LoadFileException(
+            String.format(
+                "Time partition slots size is greater than %s",
+                CONFIG.getLoadTsFileSpiltPartitionMaxSize()));
+      }
       if (Boolean.FALSE.equals(consumer.apply(chunkData))) {
         throw new IllegalStateException(
             String.format(
@@ -457,6 +470,14 @@ public class TsFileSplitter {
 
   private void consumeChunkData(String measurement, long offset, ChunkData 
chunkData)
       throws LoadFileException {
+    timePartitionSlots.add(chunkData.getTimePartitionSlot());
+    if (deletions.isEmpty()
+        && timePartitionSlots.size() > 
CONFIG.getLoadTsFileSpiltPartitionMaxSize()) {
+      throw new LoadFileException(
+          String.format(
+              "Time partition slots size is greater than %s",
+              CONFIG.getLoadTsFileSpiltPartitionMaxSize()));
+    }
     if (Boolean.FALSE.equals(consumer.apply(chunkData))) {
       throw new IllegalStateException(
           String.format(
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
index 807186c26da..8d23bc848b9 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
@@ -204,6 +204,7 @@ public class WritePlanNodeSplitTest {
     insertTabletNode.setDataTypes(new TSDataType[] {TSDataType.INT32});
     insertTabletNode.setColumns(
         new Object[] {new int[] {-20, -10, 10, 20, 30, 40, 50, 60, 70, 80, 90, 
100}});
+    insertTabletNode.setRowCount(insertTabletNode.getTimes().length);
 
     DataPartitionQueryParam dataPartitionQueryParam = new 
DataPartitionQueryParam();
     dataPartitionQueryParam.setDeviceID(
@@ -314,6 +315,7 @@ public class WritePlanNodeSplitTest {
       insertTabletNode.setDataTypes(new TSDataType[] {TSDataType.INT32});
       insertTabletNode.setColumns(
           new Object[] {new int[] {10, 20, 30, 40, 50, 60, 70, 80, 90, 100}});
+      insertTabletNode.setRowCount(insertTabletNode.getTimes().length);
       insertMultiTabletsNode.addInsertTabletNode(insertTabletNode, 2 * i);
 
       insertTabletNode = new InsertTabletNode(new PlanNodeId("plan node 3"));
@@ -322,6 +324,7 @@ public class WritePlanNodeSplitTest {
       insertTabletNode.setDataTypes(new TSDataType[] {TSDataType.INT32});
       insertTabletNode.setColumns(
           new Object[] {new int[] {10, 20, 30, 40, 50, 60, 70, 80, 90, 100}});
+      insertTabletNode.setRowCount(insertTabletNode.getTimes().length);
       insertMultiTabletsNode.addInsertTabletNode(insertTabletNode, 2 * i);
     }
 

Reply via email to