This is an automated email from the ASF dual-hosted git repository. jt2594838 pushed a commit to branch fix_load_time_only_chunk in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ce979f2327e3e76c86ba3c12c3b7c6a4d7276877 Author: Tian Jiang <[email protected]> AuthorDate: Sat May 9 12:26:59 2026 +0800 Fix that load tsfile may skip time-only aligned chunks --- .../load/splitter/TsFileSplitter.java | 9 +- .../db/storageengine/load/TsFileSplitterTest.java | 157 +++++++++++++++++++++ 2 files changed, 163 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java index bbfd8f1bb30..e8480a93517 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java @@ -166,6 +166,12 @@ public class TsFileSplitter { isAligned = ((header.getChunkType() & TsFileConstant.TIME_COLUMN_MASK) == TsFileConstant.TIME_COLUMN_MASK); + if (isAligned) { + pageIndex2Times = new HashMap<>(); + pageIndex2ChunkData = new HashMap<>(); + isTimeChunkNeedDecode = true; + } + IChunkMetadata chunkMetadata = offset2ChunkMetadata.get(chunkOffset - Byte.BYTES); // When loading TsFile with Chunk in data zone but no matched ChunkMetadata // at the end of file, this Chunk needs to be skipped. @@ -359,9 +365,6 @@ public class TsFileSplitter { pageIndex2TimesList.add(pageIndex2Times); pageIndex2ChunkDataList.add(pageIndex2ChunkData); isTimeChunkNeedDecodeList.add(isTimeChunkNeedDecode); - pageIndex2Times = new HashMap<>(); - pageIndex2ChunkData = new HashMap<>(); - isTimeChunkNeedDecode = true; } private void switchToTimeChunkContextOfCurrentMeasurement( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/TsFileSplitterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/TsFileSplitterTest.java new file mode 100644 index 00000000000..6610880567e --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/TsFileSplitterTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.load.splitter; + +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.schema.Schema; +import org.apache.tsfile.write.writer.TsFileIOWriter; +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +public class TsFileSplitterTest { + + @Test + public void testSplitTableTimeOnlyAlignedChunk() throws Exception { + final File sourceTsFile = new File("split-table-time-only-source.tsfile"); + final File targetTsFile = new File("split-table-time-only-target.tsfile"); + final IDeviceID deviceID = new StringArrayDeviceID("table1", "tagA"); + + try { + writeTableTsFileWithTimeOnlyChunk(sourceTsFile, deviceID); + + final List<ChunkData> emittedChunkDataList = new ArrayList<>(); + final TsFileSplitter splitter = + new TsFileSplitter( + sourceTsFile, + tsFileData -> { + if (tsFileData instanceof ChunkData) { + emittedChunkDataList.add((ChunkData) tsFileData); + } + return true; + }); + splitter.splitTsFileByDataPartition(); + + if (targetTsFile.exists()) { + Assert.assertTrue(targetTsFile.delete()); + } + try (final TsFileIOWriter writer = new TsFileIOWriter(targetTsFile)) { + writer.setSchema(createSchema()); + IDeviceID currentDeviceID = null; + for (final ChunkData chunkData : emittedChunkDataList) { + if (!Objects.equals(currentDeviceID, chunkData.getDevice())) { + if (Objects.nonNull(currentDeviceID)) { + writer.endChunkGroup(); + } + writer.startChunkGroup(chunkData.getDevice()); + currentDeviceID = chunkData.getDevice(); + } + + writeSerializedChunkDataToWriter(chunkData, writer); + } + if (Objects.nonNull(currentDeviceID)) { + writer.endChunkGroup(); + } + writer.endFile(); + } + + Assert.assertEquals(1, emittedChunkDataList.size()); + try (final TsFileSequenceReader reader = + new TsFileSequenceReader(targetTsFile.getAbsolutePath())) { + final List<AbstractAlignedChunkMetadata> chunkMetadataList = + reader.getAlignedChunkMetadata(deviceID, false); + Assert.assertEquals(1, chunkMetadataList.size()); + Assert.assertEquals( + 2, chunkMetadataList.get(0).getTimeChunkMetadata().getStatistics().getCount()); + Assert.assertTrue(chunkMetadataList.get(0).getValueChunkMetadataList().isEmpty()); + } + } finally { + if (sourceTsFile.exists()) { + Assert.assertTrue(sourceTsFile.delete()); + } + if (targetTsFile.exists()) { + Assert.assertTrue(targetTsFile.delete()); + } + } + } + + private void writeTableTsFileWithTimeOnlyChunk(final File tsFile, final IDeviceID deviceID) + throws Exception { + if (tsFile.exists()) { + Assert.assertTrue(tsFile.delete()); + } + + try (final TsFileIOWriter writer = new TsFileIOWriter(tsFile)) { + writer.setSchema(createSchema()); + writer.startChunkGroup(deviceID); + + final AlignedChunkWriterImpl chunkWriter = + new AlignedChunkWriterImpl(Collections.emptyList()); + chunkWriter.write(100); + chunkWriter.write(101); + chunkWriter.writeToFileWriter(writer); + + writer.endChunkGroup(); + writer.endFile(); + } + } + + private Schema createSchema() { + final List<IMeasurementSchema> tableSchemaList = + Arrays.asList( + new MeasurementSchema("tag1", TSDataType.STRING), + new MeasurementSchema("s1", TSDataType.INT64)); + final List<ColumnCategory> columnCategoryList = + Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD); + + final Schema schema = new Schema(); + schema.registerTableSchema(new TableSchema("table1", tableSchemaList, columnCategoryList)); + return schema; + } + + private void writeSerializedChunkDataToWriter( + final ChunkData chunkData, final TsFileIOWriter writer) throws Exception { + final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (final DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) { + chunkData.serialize(dataOutputStream); + } + ((ChunkData) + TsFileData.deserialize(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()))) + .writeToFileWriter(writer); + } +}
