This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch force_ci/object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 45823164d0fd4c8b23fda5af050aaca19f83061c Author: shuwenwei <[email protected]> AuthorDate: Tue Nov 25 18:18:16 2025 +0800 Fix slowQueryThreshold & optimize encodeBatch (#16765) (cherry picked from commit 03b60d1390cc399f635d4f1dac276306a5f11660) --- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +- .../db/utils/datastructure/AlignedTVList.java | 16 +++++- .../memtable/AlignedTVListIteratorTest.java | 57 ++++++++++++++++++++++ 3 files changed, 72 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 36a148430cb..70a23bbf1a6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -812,7 +812,7 @@ public class IoTDBConfig { private int thriftDefaultBufferSize = RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY; /** time cost(ms) threshold for slow query. Unit: millisecond */ - private long slowQueryThreshold = 30000; + private long slowQueryThreshold = 10000; private int patternMatchingThreshold = 1000000; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index 2a91c6f09ce..36aefd2c976 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -278,6 +278,7 @@ public abstract class AlignedTVList extends TVList { return getAlignedValueByValueIndex(valueIndex, null, floatPrecision, encodingList); } + @SuppressWarnings("java:S6541") private TsPrimitiveType getAlignedValueByValueIndex( int valueIndex, int[] validIndexesForTimeDuplicatedRows, @@ -1051,6 +1052,7 @@ public abstract class AlignedTVList extends TVList { } /** Build TsBlock by column. */ + @SuppressWarnings("java:S6541") public TsBlock buildTsBlock( int floatPrecision, List<TSEncoding> encodingList, @@ -1371,6 +1373,7 @@ public abstract class AlignedTVList extends TVList { } } + @SuppressWarnings("java:S6541") public static AlignedTVList deserialize(DataInputStream stream) throws IOException { TSDataType dataType = ReadWriteIOUtils.readDataType(stream); if (dataType != TSDataType.VECTOR) { @@ -1693,6 +1696,7 @@ public abstract class AlignedTVList extends TVList { } @Override + @SuppressWarnings("java:S6541") protected void prepareNext() { // find the first row that is neither deleted nor empty (all NULL values) findValidRow = false; @@ -1870,6 +1874,7 @@ public abstract class AlignedTVList extends TVList { } @Override + @SuppressWarnings("java:S6541") public boolean hasNextBatch() { if (!paginationController.hasCurLimit()) { return false; @@ -2150,11 +2155,18 @@ public abstract class AlignedTVList extends TVList { } @Override + @SuppressWarnings("java:S6541") public void encodeBatch(IChunkWriter chunkWriter, BatchEncodeInfo encodeInfo, long[] times) { + int maxRowCountOfCurrentBatch = + Math.min( + rows - index, + Math.min( + (int) encodeInfo.maxNumberOfPointsInChunk - encodeInfo.pointNumInChunk, // NOSONAR + encodeInfo.maxNumberOfPointsInPage - encodeInfo.pointNumInPage)); AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriter; // duplicated time or deleted time are all invalid, true if we don't need this row - BitMap timeDuplicateInfo = null; + LazyBitMap timeDuplicateInfo = null; int startIndex = index; // time column @@ -2183,7 +2195,7 @@ public abstract class AlignedTVList extends TVList { encodeInfo.pointNumInChunk++; } else { if (Objects.isNull(timeDuplicateInfo)) { - timeDuplicateInfo = new BitMap(rows); + timeDuplicateInfo = new LazyBitMap(index, maxRowCountOfCurrentBatch, rows - 1); } timeDuplicateInfo.mark(index); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java index 2dbbb5df865..b0c383e3213 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.common.PlanFragmentId; @@ -27,6 +28,7 @@ import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContex import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.utils.datastructure.AlignedTVList; +import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo; import org.apache.iotdb.db.utils.datastructure.MemPointIterator; import org.apache.iotdb.db.utils.datastructure.TVList; @@ -41,6 +43,7 @@ import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.read.filter.operator.LongFilterOperators; import org.apache.tsfile.read.filter.operator.TimeFilterOperators; import org.apache.tsfile.read.reader.series.PaginationController; +import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.VectorMeasurementSchema; import org.junit.AfterClass; @@ -952,4 +955,58 @@ public class AlignedTVListIteratorTest { } Assert.assertEquals(expectedTimestamps, resultTimestamps); } + + @Test + public void testEncodeBatch() { + testEncodeBatch(largeSingleTvListMap, 400000); + testEncodeBatch(largeOrderedMultiTvListMap, 400000); + testEncodeBatch(largeMergeSortMultiTvListMap, 400000); + } + + private void testEncodeBatch(Map<TVList, Integer> tvListMap, long expectedCount) { + AlignedChunkWriterImpl alignedChunkWriter = new AlignedChunkWriterImpl(getMeasurementSchema()); + List<Integer> columnIdxList = Arrays.asList(0, 1, 2); + IMeasurementSchema measurementSchema = getMeasurementSchema(); + AlignedReadOnlyMemChunk chunk = + new AlignedReadOnlyMemChunk( + fragmentInstanceContext, + columnIdxList, + measurementSchema, + tvListMap, + Collections.emptyList(), + Arrays.asList( + Collections.emptyList(), Collections.emptyList(), Collections.emptyList())); + chunk.sortTvLists(); + chunk.initChunkMetaFromTVListsWithFakeStatistics(); + MemPointIterator memPointIterator = chunk.createMemPointIterator(Ordering.ASC, null); + BatchEncodeInfo encodeInfo = + new BatchEncodeInfo( + 0, 0, 0, 10000, 100000, IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize()); + long[] times = new long[10000]; + long count = 0; + while (memPointIterator.hasNextBatch()) { + memPointIterator.encodeBatch(alignedChunkWriter, encodeInfo, times); + if (encodeInfo.pointNumInPage >= encodeInfo.maxNumberOfPointsInPage) { + alignedChunkWriter.write(times, encodeInfo.pointNumInPage, 0); + encodeInfo.pointNumInPage = 0; + } + + if (encodeInfo.pointNumInChunk >= encodeInfo.maxNumberOfPointsInChunk) { + alignedChunkWriter.sealCurrentPage(); + alignedChunkWriter.clearPageWriter(); + count += alignedChunkWriter.getTimeChunkWriter().getStatistics().getCount(); + alignedChunkWriter = new AlignedChunkWriterImpl(getMeasurementSchema()); + encodeInfo.reset(); + } + } + // Handle remaining data in the final unsealed chunk + if (encodeInfo.pointNumInChunk > 0 || encodeInfo.pointNumInPage > 0) { + if (encodeInfo.pointNumInPage > 0) { + alignedChunkWriter.write(times, encodeInfo.pointNumInPage, 0); + } + alignedChunkWriter.sealCurrentPage(); + count += alignedChunkWriter.getTimeChunkWriter().getStatistics().getCount(); + } + Assert.assertEquals(expectedCount, count); + } }
