This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new db774554716 [to dev/1.3] Fix slow query threshold & fix a bug for
lastQuery & optimize encodeBatch for multi tvlist scene (#16766)
db774554716 is described below
commit db774554716741273fe6c2ead1761997de6debb5
Author: shuwenwei <[email protected]>
AuthorDate: Mon Nov 24 14:15:12 2025 +0800
[to dev/1.3] Fix slow query threshold & fix a bug for lastQuery & optimize
encodeBatch for multi tvlist scene (#16766)
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../plan/planner/OperatorTreeGenerator.java | 4 +-
.../db/utils/datastructure/AlignedTVList.java | 16 ++++++-
.../memtable/AlignedTVListIteratorTest.java | 56 ++++++++++++++++++++++
4 files changed, 73 insertions(+), 5 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index abd86cfef35..aacf9a58f43 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -872,7 +872,7 @@ public class IoTDBConfig {
private int thriftDefaultBufferSize = RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY;
/** time cost(ms) threshold for slow query. Unit: millisecond */
- private long slowQueryThreshold = 30000;
+ private long slowQueryThreshold = 10000;
private int patternMatchingThreshold = 1000000;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index cf34ea8609d..8aad87866b8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -156,6 +156,7 @@ import
org.apache.iotdb.db.queryengine.plan.analyze.TemplatedInfo;
import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
+import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DeviceLastCache;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
@@ -2956,7 +2957,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
@Override
public Operator visitLastQueryScan(LastQueryScanNode node,
LocalExecutionPlanContext context) {
- DATA_NODE_SCHEMA_CACHE.cleanUp();
final PartialPath devicePath = node.getDevicePath();
List<Integer> idxOfMeasurementSchemas = node.getIdxOfMeasurementSchemas();
List<Integer> unCachedMeasurementIndexes = new ArrayList<>();
@@ -2984,7 +2984,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
if (timeValuePair == null) { // last value is not cached
unCachedMeasurementIndexes.add(i);
- } else if (timeValuePair.getValue() == null) {
+ } else if (timeValuePair.getValue() ==
DeviceLastCache.EMPTY_PRIMITIVE_TYPE) {
// there is no data for this time series, just ignore
} else if (!LastQueryUtil.satisfyFilter(filter, timeValuePair)) {
// cached last value is not satisfied
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index cf701383b37..e0c08fbdd43 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -262,6 +262,7 @@ public abstract class AlignedTVList extends TVList {
return getAlignedValueByValueIndex(valueIndex, null, floatPrecision,
encodingList);
}
+ @SuppressWarnings("java:S6541")
private TsPrimitiveType getAlignedValueByValueIndex(
int valueIndex,
int[] validIndexesForTimeDuplicatedRows,
@@ -920,6 +921,7 @@ public abstract class AlignedTVList extends TVList {
}
/** Build TsBlock by column. */
+ @SuppressWarnings("java:S6541")
public TsBlock buildTsBlock(
int floatPrecision, List<TSEncoding> encodingList, List<List<TimeRange>>
deletionList) {
TsBlockBuilder builder = new TsBlockBuilder(dataTypes);
@@ -1155,6 +1157,7 @@ public abstract class AlignedTVList extends TVList {
}
}
+ @SuppressWarnings("java:S6541")
public static AlignedTVList deserialize(DataInputStream stream) throws
IOException {
TSDataType dataType = ReadWriteIOUtils.readDataType(stream);
if (dataType != TSDataType.VECTOR) {
@@ -1429,6 +1432,7 @@ public abstract class AlignedTVList extends TVList {
}
@Override
+ @SuppressWarnings("java:S6541")
protected void prepareNext() {
// find the first row that is neither deleted nor empty (all NULL values)
findValidRow = false;
@@ -1602,6 +1606,7 @@ public abstract class AlignedTVList extends TVList {
}
@Override
+ @SuppressWarnings("java:S6541")
public boolean hasNextBatch() {
if (!paginationController.hasCurLimit()) {
return false;
@@ -1884,11 +1889,18 @@ public abstract class AlignedTVList extends TVList {
}
@Override
+ @SuppressWarnings("java:S6541")
public void encodeBatch(IChunkWriter chunkWriter, BatchEncodeInfo
encodeInfo, long[] times) {
+ int maxRowCountOfCurrentBatch =
+ Math.min(
+ rows - index,
+ Math.min(
+ (int) encodeInfo.maxNumberOfPointsInChunk -
encodeInfo.pointNumInChunk, // NOSONAR
+ encodeInfo.maxNumberOfPointsInPage -
encodeInfo.pointNumInPage));
AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl)
chunkWriter;
// duplicated time or deleted time are all invalid, true if we don't
need this row
- BitMap timeDuplicateInfo = null;
+ LazyBitMap timeDuplicateInfo = null;
int startIndex = index;
// time column
@@ -1914,7 +1926,7 @@ public abstract class AlignedTVList extends TVList {
encodeInfo.pointNumInChunk++;
} else {
if (Objects.isNull(timeDuplicateInfo)) {
- timeDuplicateInfo = new BitMap(rows);
+ timeDuplicateInfo = new LazyBitMap(index,
maxRowCountOfCurrentBatch, rows - 1);
}
timeDuplicateInfo.mark(index);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java
index 27294152844..a3d1e95d954 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.storageengine.dataregion.memtable;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
@@ -27,6 +28,7 @@ import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContex
import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
+import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo;
import org.apache.iotdb.db.utils.datastructure.MemPointIterator;
import org.apache.iotdb.db.utils.datastructure.TVList;
@@ -41,6 +43,7 @@ import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.read.filter.operator.LongFilterOperators;
import org.apache.tsfile.read.filter.operator.TimeFilterOperators;
import org.apache.tsfile.read.reader.series.PaginationController;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.VectorMeasurementSchema;
import org.junit.AfterClass;
@@ -895,4 +898,57 @@ public class AlignedTVListIteratorTest {
}
Assert.assertEquals(expectedTimestamps, resultTimestamps);
}
+
+ @Test
+ public void testEncodeBatch() {
+ testEncodeBatch(largeSingleTvListMap, 400000);
+ testEncodeBatch(largeOrderedMultiTvListMap, 400000);
+ testEncodeBatch(largeMergeSortMultiTvListMap, 400000);
+ }
+
+ private void testEncodeBatch(Map<TVList, Integer> tvListMap, long
expectedCount) {
+ AlignedChunkWriterImpl alignedChunkWriter = new
AlignedChunkWriterImpl(getMeasurementSchema());
+ List<Integer> columnIdxList = Arrays.asList(0, 1, 2);
+ IMeasurementSchema measurementSchema = getMeasurementSchema();
+ AlignedReadOnlyMemChunk chunk =
+ new AlignedReadOnlyMemChunk(
+ fragmentInstanceContext,
+ columnIdxList,
+ measurementSchema,
+ tvListMap,
+ Arrays.asList(
+ Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()));
+ chunk.sortTvLists();
+ chunk.initChunkMetaFromTVListsWithFakeStatistics();
+ MemPointIterator memPointIterator =
chunk.createMemPointIterator(Ordering.ASC, null);
+ BatchEncodeInfo encodeInfo =
+ new BatchEncodeInfo(
+ 0, 0, 0, 10000, 100000,
IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize());
+ long[] times = new long[10000];
+ long count = 0;
+ while (memPointIterator.hasNextBatch()) {
+ memPointIterator.encodeBatch(alignedChunkWriter, encodeInfo, times);
+ if (encodeInfo.pointNumInPage >= encodeInfo.maxNumberOfPointsInPage) {
+ alignedChunkWriter.write(times, encodeInfo.pointNumInPage, 0);
+ encodeInfo.pointNumInPage = 0;
+ }
+
+ if (encodeInfo.pointNumInChunk >= encodeInfo.maxNumberOfPointsInChunk) {
+ alignedChunkWriter.sealCurrentPage();
+ count += alignedChunkWriter.getTimeChunkWriter().getPointNum();
+ alignedChunkWriter.clearPageWriter();
+ alignedChunkWriter = new
AlignedChunkWriterImpl(getMeasurementSchema());
+ encodeInfo.reset();
+ }
+ }
+ // Handle remaining data in the final unsealed chunk
+ if (encodeInfo.pointNumInChunk > 0 || encodeInfo.pointNumInPage > 0) {
+ if (encodeInfo.pointNumInPage > 0) {
+ alignedChunkWriter.write(times, encodeInfo.pointNumInPage, 0);
+ }
+ alignedChunkWriter.sealCurrentPage();
+ count += alignedChunkWriter.getTimeChunkWriter().getPointNum();
+ }
+ Assert.assertEquals(expectedCount, count);
+ }
}