This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch skipNotSatisfiedTimeRange in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 11e63cba39294526d9ad209e5044f08a3fdda205 Author: shuwenwei <[email protected]> AuthorDate: Mon Oct 13 17:37:53 2025 +0800 optimize nextBatch --- .../db/utils/datastructure/AlignedTVList.java | 31 +++++--- .../iotdb/db/utils/datastructure/TVList.java | 84 +++++++++++++++++----- 2 files changed, 88 insertions(+), 27 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index 9a7e86c99ba..3b80383a8bb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -1875,7 +1875,8 @@ public abstract class AlignedTVList extends TVList { @Override public TsBlock nextBatch() { - TsBlockBuilder builder = new TsBlockBuilder(maxNumberOfPointsInPage, dataTypeList); + int maxRowCountOfCurrentBatch = Math.min(rows - index, maxNumberOfPointsInPage); + TsBlockBuilder builder = new TsBlockBuilder(maxRowCountOfCurrentBatch, dataTypeList); // Time column TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder(); @@ -2022,9 +2023,9 @@ public abstract class AlignedTVList extends TVList { TsBlock tsBlock = builder.build(); if (ignoreAllNullRows && needRebuildTsBlock(hasAnyNonNullValue)) { // if exist all null rows, at most have validRowCount - 1 valid rows + // When rebuilding TsBlock, pushDownFilter and paginationController are also processed. tsBlock = reBuildTsBlock(hasAnyNonNullValue, validRowCount, dataTypeList, tsBlock); - } - if (pushDownFilter != null) { + } else if (pushDownFilter != null) { tsBlock = TsBlockUtil.applyFilterAndLimitOffsetToTsBlock( tsBlock, @@ -2086,21 +2087,35 @@ public abstract class AlignedTVList extends TVList { int previousValidRowCount, List<TSDataType> tsDataTypeList, TsBlock previousTsBlock) { + boolean[] selection = hasAnyNonNullValue; + if (pushDownFilter != null) { + selection = pushDownFilter.satisfyTsBlock(hasAnyNonNullValue, previousTsBlock); + } TsBlockBuilder builder = new TsBlockBuilder(previousValidRowCount - 1, tsDataTypeList); TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder(); Column timeColumn = previousTsBlock.getTimeColumn(); + int stopIndex = 0; for (int i = 0; i < previousValidRowCount; i++) { - if (hasAnyNonNullValue[i]) { - timeColumnBuilder.writeLong(timeColumn.getLong(i)); - builder.declarePosition(); + if (selection[i]) { + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + selection[i] = false; + } else if (paginationController.hasCurLimit()) { + timeColumnBuilder.writeLong(timeColumn.getLong(i)); + builder.declarePosition(); + paginationController.consumeLimit(); + } else { + stopIndex = i; + break; + } } } for (int columnIndex = 0; columnIndex < tsDataTypeList.size(); columnIndex++) { ColumnBuilder columnBuilder = builder.getColumnBuilder(columnIndex); Column column = previousTsBlock.getColumn(columnIndex); - for (int i = 0; i < previousValidRowCount; i++) { - if (hasAnyNonNullValue[i]) { + for (int i = 0; i < stopIndex; i++) { + if (selection[i]) { if (column.isNull(i)) { columnBuilder.appendNull(); } else { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java index 7b98c548867..f4a571be220 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java @@ -787,15 +787,6 @@ public abstract class TVList implements WALEntryValue { } } int newIndex = getScanOrderIndex(indexInTVList); - System.out.println( - "skip to newIndex: " - + newIndex - + ", old index is " - + getScanOrderIndex(index) - + ", current time range is " - + timeRange - + ", current time is " - + getTime(getScanOrderIndex(newIndex))); if (newIndex > index) { index = newIndex; } @@ -912,10 +903,19 @@ public abstract class TVList implements WALEntryValue { @Override public TsBlock nextBatch() { TSDataType dataType = getDataType(); - TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(dataType)); + int maxRowCountOfCurrentBatch = + Math.min( + paginationController.hasLimit() + ? (int) paginationController.getCurLimit() + : Integer.MAX_VALUE, + Math.min(maxNumberOfPointsInPage, rows - index)); + TsBlockBuilder builder = + new TsBlockBuilder(maxRowCountOfCurrentBatch, Collections.singletonList(dataType)); switch (dataType) { case BOOLEAN: - while (index < rows && builder.getPositionCount() < maxNumberOfPointsInPage) { + while (index < rows + && builder.getPositionCount() < maxNumberOfPointsInPage + && paginationController.hasCurLimit()) { long time = getTime(getScanOrderIndex(index)); if (isCurrentTimeExceedTimeRange(time)) { break; @@ -926,6 +926,12 @@ public abstract class TVList implements WALEntryValue { && isTimeSatisfied(time)) { boolean aBoolean = getBoolean(getScanOrderIndex(index)); if (pushDownFilter == null || pushDownFilter.satisfyBoolean(time, aBoolean)) { + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + index++; + continue; + } + paginationController.consumeLimit(); builder.getTimeColumnBuilder().writeLong(time); builder.getColumnBuilder(0).writeBoolean(aBoolean); builder.declarePosition(); @@ -936,7 +942,9 @@ public abstract class TVList implements WALEntryValue { break; case INT32: case DATE: - while (index < rows && builder.getPositionCount() < maxNumberOfPointsInPage) { + while (index < rows + && builder.getPositionCount() < maxNumberOfPointsInPage + && paginationController.hasCurLimit()) { long time = getTime(getScanOrderIndex(index)); if (isCurrentTimeExceedTimeRange(time)) { break; @@ -947,6 +955,12 @@ public abstract class TVList implements WALEntryValue { && isTimeSatisfied(time)) { int anInt = getInt(getScanOrderIndex(index)); if (pushDownFilter == null || pushDownFilter.satisfyInteger(time, anInt)) { + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + index++; + continue; + } + paginationController.consumeLimit(); builder.getTimeColumnBuilder().writeLong(time); builder.getColumnBuilder(0).writeInt(anInt); builder.declarePosition(); @@ -957,7 +971,9 @@ public abstract class TVList implements WALEntryValue { break; case INT64: case TIMESTAMP: - while (index < rows && builder.getPositionCount() < maxNumberOfPointsInPage) { + while (index < rows + && builder.getPositionCount() < maxNumberOfPointsInPage + && paginationController.hasCurLimit()) { long time = getTime(getScanOrderIndex(index)); if (isCurrentTimeExceedTimeRange(time)) { break; @@ -968,6 +984,12 @@ public abstract class TVList implements WALEntryValue { && isTimeSatisfied(time)) { long aLong = getLong(getScanOrderIndex(index)); if (pushDownFilter == null || pushDownFilter.satisfyLong(time, aLong)) { + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + index++; + continue; + } + paginationController.consumeLimit(); builder.getTimeColumnBuilder().writeLong(time); builder.getColumnBuilder(0).writeLong(aLong); builder.declarePosition(); @@ -977,7 +999,9 @@ public abstract class TVList implements WALEntryValue { } break; case FLOAT: - while (index < rows && builder.getPositionCount() < maxNumberOfPointsInPage) { + while (index < rows + && builder.getPositionCount() < maxNumberOfPointsInPage + && paginationController.hasCurLimit()) { long time = getTime(getScanOrderIndex(index)); if (isCurrentTimeExceedTimeRange(time)) { break; @@ -990,6 +1014,12 @@ public abstract class TVList implements WALEntryValue { roundValueWithGivenPrecision( getFloat(getScanOrderIndex(index)), floatPrecision, encoding); if (pushDownFilter == null || pushDownFilter.satisfyFloat(time, aFloat)) { + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + index++; + continue; + } + paginationController.consumeLimit(); builder.getTimeColumnBuilder().writeLong(time); builder.getColumnBuilder(0).writeFloat(aFloat); builder.declarePosition(); @@ -999,7 +1029,9 @@ public abstract class TVList implements WALEntryValue { } break; case DOUBLE: - while (index < rows && builder.getPositionCount() < maxNumberOfPointsInPage) { + while (index < rows + && builder.getPositionCount() < maxNumberOfPointsInPage + && paginationController.hasCurLimit()) { long time = getTime(getScanOrderIndex(index)); if (isCurrentTimeExceedTimeRange(time)) { break; @@ -1012,6 +1044,12 @@ public abstract class TVList implements WALEntryValue { roundValueWithGivenPrecision( getDouble(getScanOrderIndex(index)), floatPrecision, encoding); if (pushDownFilter == null || pushDownFilter.satisfyDouble(time, aDouble)) { + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + index++; + continue; + } + paginationController.consumeLimit(); builder.getTimeColumnBuilder().writeLong(time); builder.getColumnBuilder(0).writeDouble(aDouble); builder.declarePosition(); @@ -1023,7 +1061,9 @@ public abstract class TVList implements WALEntryValue { case TEXT: case BLOB: case STRING: - while (index < rows && builder.getPositionCount() < maxNumberOfPointsInPage) { + while (index < rows + && builder.getPositionCount() < maxNumberOfPointsInPage + && paginationController.hasCurLimit()) { long time = getTime(getScanOrderIndex(index)); if (isCurrentTimeExceedTimeRange(time)) { break; @@ -1034,6 +1074,12 @@ public abstract class TVList implements WALEntryValue { && isTimeSatisfied(time)) { Binary binary = getBinary(getScanOrderIndex(index)); if (pushDownFilter == null || pushDownFilter.satisfyBinary(time, binary)) { + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + index++; + continue; + } + paginationController.consumeLimit(); builder.getTimeColumnBuilder().writeLong(time); builder.getColumnBuilder(0).writeBinary(binary); builder.declarePosition(); @@ -1046,9 +1092,9 @@ public abstract class TVList implements WALEntryValue { throw new UnSupportedDataTypeException( String.format("Data type %s is not supported.", dataType)); } - // There is no need to process pushDownFilter here because it has been applied when - // constructing the tsBlock - TsBlock tsBlock = paginationController.applyTsBlock(builder.build()); + // There is no need to process pushDownFilter and paginationController here because it has + // been applied when constructing the tsBlock + TsBlock tsBlock = builder.build(); addTsBlock(tsBlock); return tsBlock; }
