This is an automated email from the ASF dual-hosted git repository. justinchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push: new 699c90eff9d Load: add the function of transferring too many time partitions of files to tablets and fixed the problem that the data written to tablets is more than expected. (#16320) 699c90eff9d is described below commit 699c90eff9d1c4969935dc94a2b4479e0d0e5d61 Author: Zhenyu Luo <zhe...@apache.org> AuthorDate: Thu Sep 11 10:23:30 2025 +0800 Load: add the function of transferring too many time partitions of files to tablets and fixed the problem that the data written to tablets is more than expected. (#16320) * update * update * update * update * update * update * simplify * Update pom.xml * fix * fix * fix IT --------- Co-authored-by: Caideyipi <87789683+caidey...@users.noreply.github.com> --- .../org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java | 5 ++++- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 23 ++++++++++++++++++++ .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 ++++++ ...ileInsertionEventTableParserTabletIterator.java | 25 ++++++++++++++++++++-- .../planner/plan/node/write/InsertTabletNode.java | 4 ++-- .../plan/statement/crud/InsertTabletStatement.java | 18 ++++++++++------ .../converter/LoadTsFileDataTypeConverter.java | 11 ++++++---- .../load/splitter/TsFileSplitter.java | 21 ++++++++++++++++++ .../planner/node/write/WritePlanNodeSplitTest.java | 3 +++ 9 files changed, 101 insertions(+), 15 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java index 7d5f7be8a1e..c838bc843ae 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java @@ -83,11 +83,14 @@ public class IoTDBLoadTsFileIT { tmpDir = new File(Files.createTempDirectory("load").toUri()); EnvFactory.getEnv().getConfig().getCommonConfig().setTimePartitionInterval(PARTITION_INTERVAL); EnvFactory.getEnv().getConfig().getCommonConfig().setEnforceStrongPassword(false); + EnvFactory.getEnv().getConfig().getCommonConfig().setPipeMemoryManagementEnabled(false); + EnvFactory.getEnv().getConfig().getCommonConfig().setDatanodeMemoryProportion("1:10:1:1:1:1"); EnvFactory.getEnv() .getConfig() .getDataNodeConfig() .setConnectionTimeoutInMS(connectionTimeoutInMS) .setLoadTsFileAnalyzeSchemaMemorySizeInBytes(loadTsFileAnalyzeSchemaMemorySizeInBytes); + EnvFactory.getEnv().initClusterEnvironment(); } @@ -224,7 +227,7 @@ public class IoTDBLoadTsFileIT { generator.generateData(SchemaConfig.DEVICE_2, 10000, PARTITION_INTERVAL / 10_000, false); generator.generateData(SchemaConfig.DEVICE_3, 10000, PARTITION_INTERVAL / 10_000, false); generator.generateData(SchemaConfig.DEVICE_4, 10000, PARTITION_INTERVAL / 10_000, true); - for (int i = 0; i < 10000; i++) { + for (int i = 0; i < 1000; i++) { generator.generateData(SchemaConfig.DEVICE_4, 1, PARTITION_INTERVAL - 10, true); } writtenPoint2 = generator.getTotalNumber(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index ecb18b3c575..55c5410a555 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1108,6 +1108,8 @@ public class IoTDBConfig { private long loadMeasurementIdCacheSizeInBytes = 2 * 1024 * 1024L; // 2MB + private int loadTsFileSpiltPartitionMaxSize = 10; + private String[] loadActiveListeningDirs = new String[] { IoTDBConstant.EXT_FOLDER_NAME @@ -3997,6 +3999,27 @@ public class IoTDBConfig { this.pipeReceiverFileDirs = pipeReceiverFileDirs; } + public int getLoadTsFileSpiltPartitionMaxSize() { + return loadTsFileSpiltPartitionMaxSize; + } + + public void setLoadTsFileSpiltPartitionMaxSize(int loadTsFileSpiltPartitionMaxSize) { + if (loadTsFileSpiltPartitionMaxSize <= 0) { + throw new IllegalArgumentException( + "loadTsFileSpiltPartitionMaxSize should be greater than or equal to 0"); + } + + if (this.loadTsFileSpiltPartitionMaxSize == loadTsFileSpiltPartitionMaxSize) { + return; + } + + logger.info( + "Set loadTsFileSpiltPartitionMaxSize from {} to {}", + this.loadTsFileSpiltPartitionMaxSize, + loadTsFileSpiltPartitionMaxSize); + this.loadTsFileSpiltPartitionMaxSize = loadTsFileSpiltPartitionMaxSize; + } + public String[] getPipeReceiverFileDirs() { return (Objects.isNull(this.pipeReceiverFileDirs) || this.pipeReceiverFileDirs.length == 0) ? new String[] {systemDir + File.separator + "pipe" + File.separator + "receiver"} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 8dc51d1964f..2aa679e6db8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -2404,6 +2404,12 @@ public class IoTDBDescriptor { properties.getProperty( "load_active_listening_fail_dir", ConfigurationFileUtils.getConfigurationDefaultValue("load_active_listening_fail_dir"))); + + conf.setLoadTsFileSpiltPartitionMaxSize( + Integer.parseInt( + properties.getProperty( + "load_tsfile_split_partition_max_size", + Integer.toString(conf.getLoadTsFileSpiltPartitionMaxSize())))); } private void loadPipeHotModifiedProp(TrimProperties properties) throws IOException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java index 1665a8b3e0e..746d9d5b4a0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java @@ -40,6 +40,7 @@ import org.apache.tsfile.read.controller.IMetadataQuerier; import org.apache.tsfile.read.controller.MetadataQuerierByFileImpl; import org.apache.tsfile.read.reader.IChunkReader; import org.apache.tsfile.read.reader.chunk.TableChunkReader; +import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.DateUtils; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.TsPrimitiveType; @@ -394,6 +395,13 @@ public class TsFileInsertionEventTableParserTabletIterator implements Iterator<T for (int i = deviceIdSize, size = dataTypeList.size(); i < size; i++) { final TsPrimitiveType primitiveType = primitiveTypes[i - deviceIdSize]; if (primitiveType == null) { + switch (dataTypeList.get(i)) { + case TEXT: + case BLOB: + case STRING: + tablet.addValue(rowIndex, i, Binary.EMPTY_VALUE.getValues()); + } + tablet.getBitMaps()[i].mark(rowIndex); continue; } @@ -420,7 +428,11 @@ public class TsFileInsertionEventTableParserTabletIterator implements Iterator<T case TEXT: case BLOB: case STRING: - tablet.addValue(rowIndex, i, primitiveType.getBinary().getValues()); + Binary binary = primitiveType.getBinary(); + tablet.addValue( + rowIndex, + i, + binary.getValues() == null ? Binary.EMPTY_VALUE.getValues() : binary.getValues()); break; default: throw new UnSupportedDataTypeException("UnSupported" + primitiveType.getDataType()); @@ -431,11 +443,20 @@ public class TsFileInsertionEventTableParserTabletIterator implements Iterator<T private void fillDeviceIdColumns( final IDeviceID deviceID, final Tablet tablet, final int rowIndex) { final String[] deviceIdSegments = (String[]) deviceID.getSegments(); - for (int i = 1, totalColumns = deviceIdSegments.length; i < totalColumns; i++) { + int i = 1; + for (int totalColumns = deviceIdSegments.length; i < totalColumns; i++) { if (deviceIdSegments[i] == null) { + tablet.addValue(rowIndex, i - 1, Binary.EMPTY_VALUE.getValues()); + tablet.getBitMaps()[i - 1].mark(rowIndex); continue; } tablet.addValue(rowIndex, i - 1, deviceIdSegments[i]); } + + while (i <= deviceIdSize) { + tablet.addValue(rowIndex, i - 1, Binary.EMPTY_VALUE.getValues()); + tablet.getBitMaps()[i - 1].mark(rowIndex); + i++; + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java index be0b00931fd..edbc262971a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java @@ -231,7 +231,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap = new LinkedHashMap<>(); - for (int i = 1; i < times.length; i++) { // times are sorted in session API. + for (int i = 1; i < rowCount; i++) { // times are sorted in session API. IDeviceID nextDeviceId = getDeviceID(i); if (times[i] >= upperBoundOfTimePartition || !currDeviceId.equals(nextDeviceId)) { final PartitionSplitInfo splitInfo = @@ -253,7 +253,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { deviceIDSplitInfoMap.computeIfAbsent(currDeviceId, deviceID1 -> new PartitionSplitInfo()); // the final range splitInfo.ranges.add(startLoc); // included - splitInfo.ranges.add(times.length); // excluded + splitInfo.ranges.add(rowCount); // excluded splitInfo.timePartitionSlots.add(timePartitionSlot); return deviceIDSplitInfoMap; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java index e7eafc4df72..4c255e0882f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java @@ -124,12 +124,18 @@ public class InsertTabletStatement extends InsertBaseStatement implements ISchem } private Object convertTableColumn(final Object input) { - return input instanceof LocalDate[] - ? Arrays.stream(((LocalDate[]) input)) - .map(date -> Objects.nonNull(date) ? DateUtils.parseDateExpressionToInt(date) : 0) - .mapToInt(Integer::intValue) - .toArray() - : input; + if (input instanceof LocalDate[]) { + return Arrays.stream(((LocalDate[]) input)) + .map(date -> Objects.nonNull(date) ? DateUtils.parseDateExpressionToInt(date) : 0) + .mapToInt(Integer::intValue) + .toArray(); + } else if (input instanceof Binary[]) { + return Arrays.stream(((Binary[]) input)) + .map(binary -> Objects.nonNull(binary) ? binary : Binary.EMPTY_VALUE) + .toArray(Binary[]::new); + } + + return input; } public InsertTabletStatement(InsertTabletNode node) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java index 59ff67f902d..62deb10b368 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java @@ -62,16 +62,19 @@ public class LoadTsFileDataTypeConverter { private final SqlParser relationalSqlParser = new SqlParser(); private final LoadTableStatementDataTypeConvertExecutionVisitor - tableStatementDataTypeConvertExecutionVisitor = - new LoadTableStatementDataTypeConvertExecutionVisitor(this::executeForTableModel); + tableStatementDataTypeConvertExecutionVisitor; private final LoadTreeStatementDataTypeConvertExecutionVisitor - treeStatementDataTypeConvertExecutionVisitor = - new LoadTreeStatementDataTypeConvertExecutionVisitor(this::executeForTreeModel); + treeStatementDataTypeConvertExecutionVisitor; public LoadTsFileDataTypeConverter( final MPPQueryContext context, final boolean isGeneratedByPipe) { this.context = context; this.isGeneratedByPipe = isGeneratedByPipe; + + tableStatementDataTypeConvertExecutionVisitor = + new LoadTableStatementDataTypeConvertExecutionVisitor(this::executeForTableModel); + treeStatementDataTypeConvertExecutionVisitor = + new LoadTreeStatementDataTypeConvertExecutionVisitor(this::executeForTreeModel); } public Optional<TSStatus> convertForTableModel(final LoadTsFile loadTsFileTableStatement) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java index d194919c18d..5a75f4fb8e0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java @@ -21,6 +21,8 @@ package org.apache.iotdb.db.storageengine.load.splitter; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.load.LoadFileException; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; @@ -62,6 +64,8 @@ import java.util.Set; public class TsFileSplitter { private static final Logger logger = LoggerFactory.getLogger(TsFileSplitter.class); + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + private final File tsFile; private final TsFileDataConsumer consumer; private Map<Long, IChunkMetadata> offset2ChunkMetadata = new HashMap<>(); @@ -72,6 +76,7 @@ public class TsFileSplitter { private IDeviceID curDevice = null; private boolean isAligned; private int timeChunkIndexOfCurrentValueColumn = 0; + private Set<TTimePartitionSlot> timePartitionSlots = new HashSet<>(); // Maintain the number of times the chunk of each measurement appears. private Map<String, Integer> valueColumn2TimeChunkIndex = new HashMap<>(); @@ -445,6 +450,14 @@ public class TsFileSplitter { } } for (AlignedChunkData chunkData : chunkDataMap.keySet()) { + timePartitionSlots.add(chunkData.getTimePartitionSlot()); + if (deletions.isEmpty() + && timePartitionSlots.size() > CONFIG.getLoadTsFileSpiltPartitionMaxSize()) { + throw new LoadFileException( + String.format( + "Time partition slots size is greater than %s", + CONFIG.getLoadTsFileSpiltPartitionMaxSize())); + } if (Boolean.FALSE.equals(consumer.apply(chunkData))) { throw new IllegalStateException( String.format( @@ -457,6 +470,14 @@ public class TsFileSplitter { private void consumeChunkData(String measurement, long offset, ChunkData chunkData) throws LoadFileException { + timePartitionSlots.add(chunkData.getTimePartitionSlot()); + if (deletions.isEmpty() + && timePartitionSlots.size() > CONFIG.getLoadTsFileSpiltPartitionMaxSize()) { + throw new LoadFileException( + String.format( + "Time partition slots size is greater than %s", + CONFIG.getLoadTsFileSpiltPartitionMaxSize())); + } if (Boolean.FALSE.equals(consumer.apply(chunkData))) { throw new IllegalStateException( String.format( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java index 807186c26da..8d23bc848b9 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java @@ -204,6 +204,7 @@ public class WritePlanNodeSplitTest { insertTabletNode.setDataTypes(new TSDataType[] {TSDataType.INT32}); insertTabletNode.setColumns( new Object[] {new int[] {-20, -10, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100}}); + insertTabletNode.setRowCount(insertTabletNode.getTimes().length); DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam(); dataPartitionQueryParam.setDeviceID( @@ -314,6 +315,7 @@ public class WritePlanNodeSplitTest { insertTabletNode.setDataTypes(new TSDataType[] {TSDataType.INT32}); insertTabletNode.setColumns( new Object[] {new int[] {10, 20, 30, 40, 50, 60, 70, 80, 90, 100}}); + insertTabletNode.setRowCount(insertTabletNode.getTimes().length); insertMultiTabletsNode.addInsertTabletNode(insertTabletNode, 2 * i); insertTabletNode = new InsertTabletNode(new PlanNodeId("plan node 3")); @@ -322,6 +324,7 @@ public class WritePlanNodeSplitTest { insertTabletNode.setDataTypes(new TSDataType[] {TSDataType.INT32}); insertTabletNode.setColumns( new Object[] {new int[] {10, 20, 30, 40, 50, 60, 70, 80, 90, 100}}); + insertTabletNode.setRowCount(insertTabletNode.getTimes().length); insertMultiTabletsNode.addInsertTabletNode(insertTabletNode, 2 * i); }