This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch skipNotSatisfiedTimeRange
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 11e63cba39294526d9ad209e5044f08a3fdda205
Author: shuwenwei <[email protected]>
AuthorDate: Mon Oct 13 17:37:53 2025 +0800

    optimize nextBatch
---
 .../db/utils/datastructure/AlignedTVList.java      | 31 +++++---
 .../iotdb/db/utils/datastructure/TVList.java       | 84 +++++++++++++++++-----
 2 files changed, 88 insertions(+), 27 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index 9a7e86c99ba..3b80383a8bb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -1875,7 +1875,8 @@ public abstract class AlignedTVList extends TVList {
 
     @Override
     public TsBlock nextBatch() {
-      TsBlockBuilder builder = new TsBlockBuilder(maxNumberOfPointsInPage, 
dataTypeList);
+      int maxRowCountOfCurrentBatch = Math.min(rows - index, 
maxNumberOfPointsInPage);
+      TsBlockBuilder builder = new TsBlockBuilder(maxRowCountOfCurrentBatch, 
dataTypeList);
       // Time column
       TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
 
@@ -2022,9 +2023,9 @@ public abstract class AlignedTVList extends TVList {
       TsBlock tsBlock = builder.build();
       if (ignoreAllNullRows && needRebuildTsBlock(hasAnyNonNullValue)) {
         // if exist all null rows, at most have validRowCount - 1 valid rows
+        // When rebuilding TsBlock, pushDownFilter and paginationController 
are also processed.
         tsBlock = reBuildTsBlock(hasAnyNonNullValue, validRowCount, 
dataTypeList, tsBlock);
-      }
-      if (pushDownFilter != null) {
+      } else if (pushDownFilter != null) {
         tsBlock =
             TsBlockUtil.applyFilterAndLimitOffsetToTsBlock(
                 tsBlock,
@@ -2086,21 +2087,35 @@ public abstract class AlignedTVList extends TVList {
         int previousValidRowCount,
         List<TSDataType> tsDataTypeList,
         TsBlock previousTsBlock) {
+      boolean[] selection = hasAnyNonNullValue;
+      if (pushDownFilter != null) {
+        selection = pushDownFilter.satisfyTsBlock(hasAnyNonNullValue, 
previousTsBlock);
+      }
       TsBlockBuilder builder = new TsBlockBuilder(previousValidRowCount - 1, 
tsDataTypeList);
       TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
       Column timeColumn = previousTsBlock.getTimeColumn();
+      int stopIndex = 0;
       for (int i = 0; i < previousValidRowCount; i++) {
-        if (hasAnyNonNullValue[i]) {
-          timeColumnBuilder.writeLong(timeColumn.getLong(i));
-          builder.declarePosition();
+        if (selection[i]) {
+          if (paginationController.hasCurOffset()) {
+            paginationController.consumeOffset();
+            selection[i] = false;
+          } else if (paginationController.hasCurLimit()) {
+            timeColumnBuilder.writeLong(timeColumn.getLong(i));
+            builder.declarePosition();
+            paginationController.consumeLimit();
+          } else {
+            stopIndex = i;
+            break;
+          }
         }
       }
 
       for (int columnIndex = 0; columnIndex < tsDataTypeList.size(); 
columnIndex++) {
         ColumnBuilder columnBuilder = builder.getColumnBuilder(columnIndex);
         Column column = previousTsBlock.getColumn(columnIndex);
-        for (int i = 0; i < previousValidRowCount; i++) {
-          if (hasAnyNonNullValue[i]) {
+        for (int i = 0; i < stopIndex; i++) {
+          if (selection[i]) {
             if (column.isNull(i)) {
               columnBuilder.appendNull();
             } else {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index 7b98c548867..f4a571be220 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -787,15 +787,6 @@ public abstract class TVList implements WALEntryValue {
         }
       }
       int newIndex = getScanOrderIndex(indexInTVList);
-      System.out.println(
-          "skip to newIndex: "
-              + newIndex
-              + ", old index is "
-              + getScanOrderIndex(index)
-              + ", current time range is "
-              + timeRange
-              + ", current time is "
-              + getTime(getScanOrderIndex(newIndex)));
       if (newIndex > index) {
         index = newIndex;
       }
@@ -912,10 +903,19 @@ public abstract class TVList implements WALEntryValue {
     @Override
     public TsBlock nextBatch() {
       TSDataType dataType = getDataType();
-      TsBlockBuilder builder = new 
TsBlockBuilder(Collections.singletonList(dataType));
+      int maxRowCountOfCurrentBatch =
+          Math.min(
+              paginationController.hasLimit()
+                  ? (int) paginationController.getCurLimit()
+                  : Integer.MAX_VALUE,
+              Math.min(maxNumberOfPointsInPage, rows - index));
+      TsBlockBuilder builder =
+          new TsBlockBuilder(maxRowCountOfCurrentBatch, 
Collections.singletonList(dataType));
       switch (dataType) {
         case BOOLEAN:
-          while (index < rows && builder.getPositionCount() < 
maxNumberOfPointsInPage) {
+          while (index < rows
+              && builder.getPositionCount() < maxNumberOfPointsInPage
+              && paginationController.hasCurLimit()) {
             long time = getTime(getScanOrderIndex(index));
             if (isCurrentTimeExceedTimeRange(time)) {
               break;
@@ -926,6 +926,12 @@ public abstract class TVList implements WALEntryValue {
                 && isTimeSatisfied(time)) {
               boolean aBoolean = getBoolean(getScanOrderIndex(index));
               if (pushDownFilter == null || 
pushDownFilter.satisfyBoolean(time, aBoolean)) {
+                if (paginationController.hasCurOffset()) {
+                  paginationController.consumeOffset();
+                  index++;
+                  continue;
+                }
+                paginationController.consumeLimit();
                 builder.getTimeColumnBuilder().writeLong(time);
                 builder.getColumnBuilder(0).writeBoolean(aBoolean);
                 builder.declarePosition();
@@ -936,7 +942,9 @@ public abstract class TVList implements WALEntryValue {
           break;
         case INT32:
         case DATE:
-          while (index < rows && builder.getPositionCount() < 
maxNumberOfPointsInPage) {
+          while (index < rows
+              && builder.getPositionCount() < maxNumberOfPointsInPage
+              && paginationController.hasCurLimit()) {
             long time = getTime(getScanOrderIndex(index));
             if (isCurrentTimeExceedTimeRange(time)) {
               break;
@@ -947,6 +955,12 @@ public abstract class TVList implements WALEntryValue {
                 && isTimeSatisfied(time)) {
               int anInt = getInt(getScanOrderIndex(index));
               if (pushDownFilter == null || 
pushDownFilter.satisfyInteger(time, anInt)) {
+                if (paginationController.hasCurOffset()) {
+                  paginationController.consumeOffset();
+                  index++;
+                  continue;
+                }
+                paginationController.consumeLimit();
                 builder.getTimeColumnBuilder().writeLong(time);
                 builder.getColumnBuilder(0).writeInt(anInt);
                 builder.declarePosition();
@@ -957,7 +971,9 @@ public abstract class TVList implements WALEntryValue {
           break;
         case INT64:
         case TIMESTAMP:
-          while (index < rows && builder.getPositionCount() < 
maxNumberOfPointsInPage) {
+          while (index < rows
+              && builder.getPositionCount() < maxNumberOfPointsInPage
+              && paginationController.hasCurLimit()) {
             long time = getTime(getScanOrderIndex(index));
             if (isCurrentTimeExceedTimeRange(time)) {
               break;
@@ -968,6 +984,12 @@ public abstract class TVList implements WALEntryValue {
                 && isTimeSatisfied(time)) {
               long aLong = getLong(getScanOrderIndex(index));
               if (pushDownFilter == null || pushDownFilter.satisfyLong(time, 
aLong)) {
+                if (paginationController.hasCurOffset()) {
+                  paginationController.consumeOffset();
+                  index++;
+                  continue;
+                }
+                paginationController.consumeLimit();
                 builder.getTimeColumnBuilder().writeLong(time);
                 builder.getColumnBuilder(0).writeLong(aLong);
                 builder.declarePosition();
@@ -977,7 +999,9 @@ public abstract class TVList implements WALEntryValue {
           }
           break;
         case FLOAT:
-          while (index < rows && builder.getPositionCount() < 
maxNumberOfPointsInPage) {
+          while (index < rows
+              && builder.getPositionCount() < maxNumberOfPointsInPage
+              && paginationController.hasCurLimit()) {
             long time = getTime(getScanOrderIndex(index));
             if (isCurrentTimeExceedTimeRange(time)) {
               break;
@@ -990,6 +1014,12 @@ public abstract class TVList implements WALEntryValue {
                   roundValueWithGivenPrecision(
                       getFloat(getScanOrderIndex(index)), floatPrecision, 
encoding);
               if (pushDownFilter == null || pushDownFilter.satisfyFloat(time, 
aFloat)) {
+                if (paginationController.hasCurOffset()) {
+                  paginationController.consumeOffset();
+                  index++;
+                  continue;
+                }
+                paginationController.consumeLimit();
                 builder.getTimeColumnBuilder().writeLong(time);
                 builder.getColumnBuilder(0).writeFloat(aFloat);
                 builder.declarePosition();
@@ -999,7 +1029,9 @@ public abstract class TVList implements WALEntryValue {
           }
           break;
         case DOUBLE:
-          while (index < rows && builder.getPositionCount() < 
maxNumberOfPointsInPage) {
+          while (index < rows
+              && builder.getPositionCount() < maxNumberOfPointsInPage
+              && paginationController.hasCurLimit()) {
             long time = getTime(getScanOrderIndex(index));
             if (isCurrentTimeExceedTimeRange(time)) {
               break;
@@ -1012,6 +1044,12 @@ public abstract class TVList implements WALEntryValue {
                   roundValueWithGivenPrecision(
                       getDouble(getScanOrderIndex(index)), floatPrecision, 
encoding);
               if (pushDownFilter == null || pushDownFilter.satisfyDouble(time, 
aDouble)) {
+                if (paginationController.hasCurOffset()) {
+                  paginationController.consumeOffset();
+                  index++;
+                  continue;
+                }
+                paginationController.consumeLimit();
                 builder.getTimeColumnBuilder().writeLong(time);
                 builder.getColumnBuilder(0).writeDouble(aDouble);
                 builder.declarePosition();
@@ -1023,7 +1061,9 @@ public abstract class TVList implements WALEntryValue {
         case TEXT:
         case BLOB:
         case STRING:
-          while (index < rows && builder.getPositionCount() < 
maxNumberOfPointsInPage) {
+          while (index < rows
+              && builder.getPositionCount() < maxNumberOfPointsInPage
+              && paginationController.hasCurLimit()) {
             long time = getTime(getScanOrderIndex(index));
             if (isCurrentTimeExceedTimeRange(time)) {
               break;
@@ -1034,6 +1074,12 @@ public abstract class TVList implements WALEntryValue {
                 && isTimeSatisfied(time)) {
               Binary binary = getBinary(getScanOrderIndex(index));
               if (pushDownFilter == null || pushDownFilter.satisfyBinary(time, 
binary)) {
+                if (paginationController.hasCurOffset()) {
+                  paginationController.consumeOffset();
+                  index++;
+                  continue;
+                }
+                paginationController.consumeLimit();
                 builder.getTimeColumnBuilder().writeLong(time);
                 builder.getColumnBuilder(0).writeBinary(binary);
                 builder.declarePosition();
@@ -1046,9 +1092,9 @@ public abstract class TVList implements WALEntryValue {
           throw new UnSupportedDataTypeException(
               String.format("Data type %s is not supported.", dataType));
       }
-      // There is no need to process pushDownFilter here because it has been 
applied when
-      // constructing the tsBlock
-      TsBlock tsBlock = paginationController.applyTsBlock(builder.build());
+      // There is no need to process pushDownFilter and paginationController 
here because it has
+      // been applied when constructing the tsBlock
+      TsBlock tsBlock = builder.build();
       addTsBlock(tsBlock);
       return tsBlock;
     }

Reply via email to