This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new db774554716 [to dev/1.3] Fix slow query threshold & fix a bug for 
lastQuery & optimize encodeBatch for multi tvlist scene (#16766)
db774554716 is described below

commit db774554716741273fe6c2ead1761997de6debb5
Author: shuwenwei <[email protected]>
AuthorDate: Mon Nov 24 14:15:12 2025 +0800

    [to dev/1.3] Fix slow query threshold & fix a bug for lastQuery & optimize 
encodeBatch for multi tvlist scene (#16766)
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../plan/planner/OperatorTreeGenerator.java        |  4 +-
 .../db/utils/datastructure/AlignedTVList.java      | 16 ++++++-
 .../memtable/AlignedTVListIteratorTest.java        | 56 ++++++++++++++++++++++
 4 files changed, 73 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index abd86cfef35..aacf9a58f43 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -872,7 +872,7 @@ public class IoTDBConfig {
   private int thriftDefaultBufferSize = RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY;
 
   /** time cost(ms) threshold for slow query. Unit: millisecond */
-  private long slowQueryThreshold = 30000;
+  private long slowQueryThreshold = 10000;
 
   private int patternMatchingThreshold = 1000000;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index cf34ea8609d..8aad87866b8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -156,6 +156,7 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.TemplatedInfo;
 import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DeviceLastCache;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
 import org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory;
 import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
@@ -2956,7 +2957,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
 
   @Override
   public Operator visitLastQueryScan(LastQueryScanNode node, 
LocalExecutionPlanContext context) {
-    DATA_NODE_SCHEMA_CACHE.cleanUp();
     final PartialPath devicePath = node.getDevicePath();
     List<Integer> idxOfMeasurementSchemas = node.getIdxOfMeasurementSchemas();
     List<Integer> unCachedMeasurementIndexes = new ArrayList<>();
@@ -2984,7 +2984,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
 
       if (timeValuePair == null) { // last value is not cached
         unCachedMeasurementIndexes.add(i);
-      } else if (timeValuePair.getValue() == null) {
+      } else if (timeValuePair.getValue() == 
DeviceLastCache.EMPTY_PRIMITIVE_TYPE) {
         // there is no data for this time series, just ignore
       } else if (!LastQueryUtil.satisfyFilter(filter, timeValuePair)) {
         // cached last value is not satisfied
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index cf701383b37..e0c08fbdd43 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -262,6 +262,7 @@ public abstract class AlignedTVList extends TVList {
     return getAlignedValueByValueIndex(valueIndex, null, floatPrecision, 
encodingList);
   }
 
+  @SuppressWarnings("java:S6541")
   private TsPrimitiveType getAlignedValueByValueIndex(
       int valueIndex,
       int[] validIndexesForTimeDuplicatedRows,
@@ -920,6 +921,7 @@ public abstract class AlignedTVList extends TVList {
   }
 
   /** Build TsBlock by column. */
+  @SuppressWarnings("java:S6541")
   public TsBlock buildTsBlock(
       int floatPrecision, List<TSEncoding> encodingList, List<List<TimeRange>> 
deletionList) {
     TsBlockBuilder builder = new TsBlockBuilder(dataTypes);
@@ -1155,6 +1157,7 @@ public abstract class AlignedTVList extends TVList {
     }
   }
 
+  @SuppressWarnings("java:S6541")
   public static AlignedTVList deserialize(DataInputStream stream) throws 
IOException {
     TSDataType dataType = ReadWriteIOUtils.readDataType(stream);
     if (dataType != TSDataType.VECTOR) {
@@ -1429,6 +1432,7 @@ public abstract class AlignedTVList extends TVList {
     }
 
     @Override
+    @SuppressWarnings("java:S6541")
     protected void prepareNext() {
       // find the first row that is neither deleted nor empty (all NULL values)
       findValidRow = false;
@@ -1602,6 +1606,7 @@ public abstract class AlignedTVList extends TVList {
     }
 
     @Override
+    @SuppressWarnings("java:S6541")
     public boolean hasNextBatch() {
       if (!paginationController.hasCurLimit()) {
         return false;
@@ -1884,11 +1889,18 @@ public abstract class AlignedTVList extends TVList {
     }
 
     @Override
+    @SuppressWarnings("java:S6541")
     public void encodeBatch(IChunkWriter chunkWriter, BatchEncodeInfo 
encodeInfo, long[] times) {
+      int maxRowCountOfCurrentBatch =
+          Math.min(
+              rows - index,
+              Math.min(
+                  (int) encodeInfo.maxNumberOfPointsInChunk - 
encodeInfo.pointNumInChunk, // NOSONAR
+                  encodeInfo.maxNumberOfPointsInPage - 
encodeInfo.pointNumInPage));
       AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) 
chunkWriter;
 
       // duplicated time or deleted time are all invalid, true if we don't 
need this row
-      BitMap timeDuplicateInfo = null;
+      LazyBitMap timeDuplicateInfo = null;
 
       int startIndex = index;
       // time column
@@ -1914,7 +1926,7 @@ public abstract class AlignedTVList extends TVList {
           encodeInfo.pointNumInChunk++;
         } else {
           if (Objects.isNull(timeDuplicateInfo)) {
-            timeDuplicateInfo = new BitMap(rows);
+            timeDuplicateInfo = new LazyBitMap(index, 
maxRowCountOfCurrentBatch, rows - 1);
           }
           timeDuplicateInfo.mark(index);
         }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java
index 27294152844..a3d1e95d954 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
 import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
@@ -27,6 +28,7 @@ import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContex
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
+import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo;
 import org.apache.iotdb.db.utils.datastructure.MemPointIterator;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 
@@ -41,6 +43,7 @@ import org.apache.tsfile.read.filter.basic.Filter;
 import org.apache.tsfile.read.filter.operator.LongFilterOperators;
 import org.apache.tsfile.read.filter.operator.TimeFilterOperators;
 import org.apache.tsfile.read.reader.series.PaginationController;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.apache.tsfile.write.schema.VectorMeasurementSchema;
 import org.junit.AfterClass;
@@ -895,4 +898,57 @@ public class AlignedTVListIteratorTest {
     }
     Assert.assertEquals(expectedTimestamps, resultTimestamps);
   }
+
+  @Test
+  public void testEncodeBatch() {
+    testEncodeBatch(largeSingleTvListMap, 400000);
+    testEncodeBatch(largeOrderedMultiTvListMap, 400000);
+    testEncodeBatch(largeMergeSortMultiTvListMap, 400000);
+  }
+
+  private void testEncodeBatch(Map<TVList, Integer> tvListMap, long 
expectedCount) {
+    AlignedChunkWriterImpl alignedChunkWriter = new 
AlignedChunkWriterImpl(getMeasurementSchema());
+    List<Integer> columnIdxList = Arrays.asList(0, 1, 2);
+    IMeasurementSchema measurementSchema = getMeasurementSchema();
+    AlignedReadOnlyMemChunk chunk =
+        new AlignedReadOnlyMemChunk(
+            fragmentInstanceContext,
+            columnIdxList,
+            measurementSchema,
+            tvListMap,
+            Arrays.asList(
+                Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList()));
+    chunk.sortTvLists();
+    chunk.initChunkMetaFromTVListsWithFakeStatistics();
+    MemPointIterator memPointIterator = 
chunk.createMemPointIterator(Ordering.ASC, null);
+    BatchEncodeInfo encodeInfo =
+        new BatchEncodeInfo(
+            0, 0, 0, 10000, 100000, 
IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize());
+    long[] times = new long[10000];
+    long count = 0;
+    while (memPointIterator.hasNextBatch()) {
+      memPointIterator.encodeBatch(alignedChunkWriter, encodeInfo, times);
+      if (encodeInfo.pointNumInPage >= encodeInfo.maxNumberOfPointsInPage) {
+        alignedChunkWriter.write(times, encodeInfo.pointNumInPage, 0);
+        encodeInfo.pointNumInPage = 0;
+      }
+
+      if (encodeInfo.pointNumInChunk >= encodeInfo.maxNumberOfPointsInChunk) {
+        alignedChunkWriter.sealCurrentPage();
+        count += alignedChunkWriter.getTimeChunkWriter().getPointNum();
+        alignedChunkWriter.clearPageWriter();
+        alignedChunkWriter = new 
AlignedChunkWriterImpl(getMeasurementSchema());
+        encodeInfo.reset();
+      }
+    }
+    // Handle remaining data in the final unsealed chunk
+    if (encodeInfo.pointNumInChunk > 0 || encodeInfo.pointNumInPage > 0) {
+      if (encodeInfo.pointNumInPage > 0) {
+        alignedChunkWriter.write(times, encodeInfo.pointNumInPage, 0);
+      }
+      alignedChunkWriter.sealCurrentPage();
+      count += alignedChunkWriter.getTimeChunkWriter().getPointNum();
+    }
+    Assert.assertEquals(expectedCount, count);
+  }
 }

Reply via email to