This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 03b60d1390c Fix slowQueryThreshold & optimize encodeBatch (#16765)
03b60d1390c is described below
commit 03b60d1390cc399f635d4f1dac276306a5f11660
Author: shuwenwei <[email protected]>
AuthorDate: Tue Nov 25 18:18:16 2025 +0800
Fix slowQueryThreshold & optimize encodeBatch (#16765)
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../db/utils/datastructure/AlignedTVList.java | 16 +++++-
.../memtable/AlignedTVListIteratorTest.java | 57 ++++++++++++++++++++++
3 files changed, 72 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index b0b0c083229..26885a56ff1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -812,7 +812,7 @@ public class IoTDBConfig {
private int thriftDefaultBufferSize = RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY;
/** time cost(ms) threshold for slow query. Unit: millisecond */
- private long slowQueryThreshold = 30000;
+ private long slowQueryThreshold = 10000;
private int patternMatchingThreshold = 1000000;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index 7cc84904348..74326a22726 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -277,6 +277,7 @@ public abstract class AlignedTVList extends TVList {
return getAlignedValueByValueIndex(valueIndex, null, floatPrecision,
encodingList);
}
+ @SuppressWarnings("java:S6541")
private TsPrimitiveType getAlignedValueByValueIndex(
int valueIndex,
int[] validIndexesForTimeDuplicatedRows,
@@ -1046,6 +1047,7 @@ public abstract class AlignedTVList extends TVList {
}
/** Build TsBlock by column. */
+ @SuppressWarnings("java:S6541")
public TsBlock buildTsBlock(
int floatPrecision,
List<TSEncoding> encodingList,
@@ -1363,6 +1365,7 @@ public abstract class AlignedTVList extends TVList {
}
}
+ @SuppressWarnings("java:S6541")
public static AlignedTVList deserialize(DataInputStream stream) throws
IOException {
TSDataType dataType = ReadWriteIOUtils.readDataType(stream);
if (dataType != TSDataType.VECTOR) {
@@ -1684,6 +1687,7 @@ public abstract class AlignedTVList extends TVList {
}
@Override
+ @SuppressWarnings("java:S6541")
protected void prepareNext() {
// find the first row that is neither deleted nor empty (all NULL values)
findValidRow = false;
@@ -1860,6 +1864,7 @@ public abstract class AlignedTVList extends TVList {
}
@Override
+ @SuppressWarnings("java:S6541")
public boolean hasNextBatch() {
if (!paginationController.hasCurLimit()) {
return false;
@@ -2139,11 +2144,18 @@ public abstract class AlignedTVList extends TVList {
}
@Override
+ @SuppressWarnings("java:S6541")
public void encodeBatch(IChunkWriter chunkWriter, BatchEncodeInfo
encodeInfo, long[] times) {
+ int maxRowCountOfCurrentBatch =
+ Math.min(
+ rows - index,
+ Math.min(
+ (int) encodeInfo.maxNumberOfPointsInChunk -
encodeInfo.pointNumInChunk, // NOSONAR
+ encodeInfo.maxNumberOfPointsInPage -
encodeInfo.pointNumInPage));
AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl)
chunkWriter;
// duplicated time or deleted time are all invalid, true if we don't
need this row
- BitMap timeDuplicateInfo = null;
+ LazyBitMap timeDuplicateInfo = null;
int startIndex = index;
// time column
@@ -2172,7 +2184,7 @@ public abstract class AlignedTVList extends TVList {
encodeInfo.pointNumInChunk++;
} else {
if (Objects.isNull(timeDuplicateInfo)) {
- timeDuplicateInfo = new BitMap(rows);
+ timeDuplicateInfo = new LazyBitMap(index,
maxRowCountOfCurrentBatch, rows - 1);
}
timeDuplicateInfo.mark(index);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java
index 2dbbb5df865..b0c383e3213 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.storageengine.dataregion.memtable;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
@@ -27,6 +28,7 @@ import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContex
import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
+import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo;
import org.apache.iotdb.db.utils.datastructure.MemPointIterator;
import org.apache.iotdb.db.utils.datastructure.TVList;
@@ -41,6 +43,7 @@ import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.read.filter.operator.LongFilterOperators;
import org.apache.tsfile.read.filter.operator.TimeFilterOperators;
import org.apache.tsfile.read.reader.series.PaginationController;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.VectorMeasurementSchema;
import org.junit.AfterClass;
@@ -952,4 +955,58 @@ public class AlignedTVListIteratorTest {
}
Assert.assertEquals(expectedTimestamps, resultTimestamps);
}
+
+ @Test
+ public void testEncodeBatch() {
+ testEncodeBatch(largeSingleTvListMap, 400000);
+ testEncodeBatch(largeOrderedMultiTvListMap, 400000);
+ testEncodeBatch(largeMergeSortMultiTvListMap, 400000);
+ }
+
+ private void testEncodeBatch(Map<TVList, Integer> tvListMap, long
expectedCount) {
+ AlignedChunkWriterImpl alignedChunkWriter = new
AlignedChunkWriterImpl(getMeasurementSchema());
+ List<Integer> columnIdxList = Arrays.asList(0, 1, 2);
+ IMeasurementSchema measurementSchema = getMeasurementSchema();
+ AlignedReadOnlyMemChunk chunk =
+ new AlignedReadOnlyMemChunk(
+ fragmentInstanceContext,
+ columnIdxList,
+ measurementSchema,
+ tvListMap,
+ Collections.emptyList(),
+ Arrays.asList(
+ Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()));
+ chunk.sortTvLists();
+ chunk.initChunkMetaFromTVListsWithFakeStatistics();
+ MemPointIterator memPointIterator =
chunk.createMemPointIterator(Ordering.ASC, null);
+ BatchEncodeInfo encodeInfo =
+ new BatchEncodeInfo(
+ 0, 0, 0, 10000, 100000,
IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize());
+ long[] times = new long[10000];
+ long count = 0;
+ while (memPointIterator.hasNextBatch()) {
+ memPointIterator.encodeBatch(alignedChunkWriter, encodeInfo, times);
+ if (encodeInfo.pointNumInPage >= encodeInfo.maxNumberOfPointsInPage) {
+ alignedChunkWriter.write(times, encodeInfo.pointNumInPage, 0);
+ encodeInfo.pointNumInPage = 0;
+ }
+
+ if (encodeInfo.pointNumInChunk >= encodeInfo.maxNumberOfPointsInChunk) {
+ alignedChunkWriter.sealCurrentPage();
+ alignedChunkWriter.clearPageWriter();
+ count +=
alignedChunkWriter.getTimeChunkWriter().getStatistics().getCount();
+ alignedChunkWriter = new
AlignedChunkWriterImpl(getMeasurementSchema());
+ encodeInfo.reset();
+ }
+ }
+ // Handle remaining data in the final unsealed chunk
+ if (encodeInfo.pointNumInChunk > 0 || encodeInfo.pointNumInPage > 0) {
+ if (encodeInfo.pointNumInPage > 0) {
+ alignedChunkWriter.write(times, encodeInfo.pointNumInPage, 0);
+ }
+ alignedChunkWriter.sealCurrentPage();
+ count +=
alignedChunkWriter.getTimeChunkWriter().getStatistics().getCount();
+ }
+ Assert.assertEquals(expectedCount, count);
+ }
}