This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b5dfbfd4129 [IOTDB-6299] Fix bug in merging overlapped data process 
caused by filter & offset push down
b5dfbfd4129 is described below

commit b5dfbfd4129bca50b341e111b8a2a470ce87e2e6
Author: liuminghui233 <[email protected]>
AuthorDate: Fri Feb 23 08:56:46 2024 +0800

    [IOTDB-6299] Fix bug in merging overlapped data process caused by filter & 
offset push down
---
 .../db/it/aligned/IoTDBAlignedSeriesQueryIT.java   | 48 +++++++++++++++++++
 .../execution/operator/source/SeriesScanUtil.java  | 38 ++++++---------
 .../AlignedSeriesScanPredicatePushDownTest.java    |  7 ++-
 .../series/SeriesScanLimitOffsetPushDownTest.java  |  5 ++
 .../series/SeriesScanPredicatePushDownTest.java    | 23 ++++++---
 .../tsfile/read/common/block/TsBlockUtil.java      | 55 ++++++++++++++++++++++
 .../tsfile/read/reader/page/AlignedPageReader.java | 54 +++------------------
 7 files changed, 150 insertions(+), 80 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedSeriesQueryIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedSeriesQueryIT.java
index 18530ff77f4..016e2d59daa 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedSeriesQueryIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedSeriesQueryIT.java
@@ -1014,6 +1014,54 @@ public class IoTDBAlignedSeriesQueryIT {
     }
   }
 
+  @Test
+  public void selectAllAlignedWithLimitOffsetTest() {
+
+    String[] retArray =
+        new String[] {
+          "14,14.0,14,14,null,null",
+          "15,15.0,15,15,null,null",
+          "16,16.0,16,16,null,null",
+          "17,17.0,17,17,null,null",
+          "18,18.0,18,18,null,null",
+        };
+
+    String[] columnNames = {
+      "root.sg1.d1.s1", "root.sg1.d1.s2", "root.sg1.d1.s3", "root.sg1.d1.s4", 
"root.sg1.d1.s5"
+    };
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet =
+          statement.executeQuery(
+              "select * from root.sg1.d1 where time >= 9 and time <= 33 offset 
5 limit 5")) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        Map<String, Integer> map = new HashMap<>();
+        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+          map.put(resultSetMetaData.getColumnName(i), i);
+        }
+        assertEquals(columnNames.length + 1, 
resultSetMetaData.getColumnCount());
+        int cnt = 0;
+        while (resultSet.next()) {
+          StringBuilder builder = new StringBuilder();
+          builder.append(resultSet.getString(1));
+          for (String columnName : columnNames) {
+            int index = map.get(columnName);
+            builder.append(",").append(resultSet.getString(index));
+          }
+          assertEquals(retArray[cnt], builder.toString());
+          cnt++;
+        }
+        assertEquals(retArray.length, cnt);
+      }
+
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
   @Test
   public void selectSomeAlignedWithValueFilterTest1() {
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
index 77375d41fac..cd1dc52e575 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
@@ -40,6 +40,7 @@ import 
org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockUtil;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.reader.IPageReader;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
@@ -643,7 +644,9 @@ public class SeriesScanUtil {
 
     if (hasCachedNextOverlappedPage) {
       hasCachedNextOverlappedPage = false;
-      TsBlock res = cachedTsBlock;
+      TsBlock res =
+          applyPushDownFilterAndLimitOffset(
+              cachedTsBlock, scanOptions.getPushDownFilter(), 
paginationController);
       cachedTsBlock = null;
 
       // cached tsblock has handled by pagination controller & push down 
filter, return directly
@@ -672,6 +675,15 @@ public class SeriesScanUtil {
     }
   }
 
+  private TsBlock applyPushDownFilterAndLimitOffset(
+      TsBlock tsBlock, Filter pushDownFilter, PaginationController 
paginationController) {
+    if (pushDownFilter == null) {
+      return paginationController.applyTsBlock(tsBlock);
+    }
+    return TsBlockUtil.applyFilterAndLimitOffsetToTsBlock(
+        tsBlock, new TsBlockBuilder(getTsDataTypeList()), pushDownFilter, 
paginationController);
+  }
+
   private void filterFirstPageReader() {
     if (firstPageReader == null) {
       return;
@@ -708,7 +720,6 @@ public class SeriesScanUtil {
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
   private boolean hasNextOverlappedPage() throws IOException {
     long startTime = System.nanoTime();
-    Filter pushDownFilter = scanOptions.getPushDownFilter();
     try {
       if (hasCachedNextOverlappedPage) {
         return true;
@@ -810,9 +821,7 @@ public class SeriesScanUtil {
 
             // get the latest first point in mergeReader
             timeValuePair = mergeReader.nextTimeValuePair();
-            if (processFilterAndPagination(timeValuePair, pushDownFilter, 
builder)) {
-              break;
-            }
+            addTimeValuePairToResult(timeValuePair, builder);
           }
           hasCachedNextOverlappedPage = !builder.isEmpty();
           cachedTsBlock = builder.build();
@@ -875,25 +884,6 @@ public class SeriesScanUtil {
     unpackAllOverlappedUnseqPageReadersToMergeReader(currentPageEndpointTime);
   }
 
-  private boolean processFilterAndPagination(
-      TimeValuePair timeValuePair, Filter pushDownFilter, TsBlockBuilder 
builder) {
-    if (pushDownFilter != null
-        && !pushDownFilter.satisfyRow(timeValuePair.getTimestamp(), 
timeValuePair.getValues())) {
-      return false;
-    }
-    if (paginationController.hasCurOffset()) {
-      paginationController.consumeOffset();
-      return false;
-    }
-    if (paginationController.hasCurLimit()) {
-      addTimeValuePairToResult(timeValuePair, builder);
-      paginationController.consumeLimit();
-      return false;
-    } else {
-      return true;
-    }
-  }
-
   private void addTimeValuePairToResult(TimeValuePair timeValuePair, 
TsBlockBuilder builder) {
     builder.getTimeColumnBuilder().writeLong(timeValuePair.getTimestamp());
     switch (dataType) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/AlignedSeriesScanPredicatePushDownTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/AlignedSeriesScanPredicatePushDownTest.java
index 09f848bf3e6..1f22f01c20c 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/AlignedSeriesScanPredicatePushDownTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/AlignedSeriesScanPredicatePushDownTest.java
@@ -216,7 +216,12 @@ public class AlignedSeriesScanPredicatePushDownTest 
extends AbstractAlignedSerie
     Assert.assertTrue(seriesScanUtil.hasNextPage());
     Assert.assertFalse(seriesScanUtil.canUseCurrentPageStatistics());
     tsBlock = seriesScanUtil.nextPage();
-    Assert.assertNull(tsBlock);
+    Assert.assertTrue(tsBlock == null || tsBlock.isEmpty());
+
+    Assert.assertTrue(seriesScanUtil.hasNextPage());
+    Assert.assertFalse(seriesScanUtil.canUseCurrentPageStatistics());
+    tsBlock = seriesScanUtil.nextPage();
+    Assert.assertTrue(tsBlock == null || tsBlock.isEmpty());
 
     Assert.assertFalse(seriesScanUtil.hasNextPage());
     Assert.assertFalse(seriesScanUtil.hasNextChunk());
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/SeriesScanLimitOffsetPushDownTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/SeriesScanLimitOffsetPushDownTest.java
index dcd16984464..2043e747f0f 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/SeriesScanLimitOffsetPushDownTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/SeriesScanLimitOffsetPushDownTest.java
@@ -215,6 +215,11 @@ public class SeriesScanLimitOffsetPushDownTest extends 
AbstractSeriesScanTest {
     TsBlock tsBlock = seriesScanUtil.nextPage();
     Assert.assertTrue(tsBlock == null || tsBlock.isEmpty());
 
+    Assert.assertTrue(seriesScanUtil.hasNextPage());
+
+    tsBlock = seriesScanUtil.nextPage();
+    Assert.assertTrue(tsBlock == null || tsBlock.isEmpty());
+
     Assert.assertFalse(seriesScanUtil.hasNextPage());
 
     Assert.assertTrue(seriesScanUtil.hasNextChunk());
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/SeriesScanPredicatePushDownTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/SeriesScanPredicatePushDownTest.java
index 020361bcf23..b9d4560cf87 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/SeriesScanPredicatePushDownTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/SeriesScanPredicatePushDownTest.java
@@ -294,24 +294,33 @@ public class SeriesScanPredicatePushDownTest extends 
AbstractSeriesScanTest {
   @Test
   public void testSkipMergeReaderByGlobalTimeFilter() throws 
IllegalPathException, IOException {
     SeriesScanUtil seriesScanUtil = getSeriesScanUtil(TimeFilterApi.gtEq(60), 
null);
-    checkFile1AndFile2AndMergeReaderSkipped(seriesScanUtil);
+    checkFile1AndFile2AndFile3Chunk1Skipped(seriesScanUtil);
+
+    // (File 3 - Chunk 1) merge (File 4 - Chunk 1) skipped
+    // File 4 - Chunk 2
+    Assert.assertTrue(seriesScanUtil.hasNextPage());
+    Assert.assertTrue(seriesScanUtil.canUseCurrentPageStatistics());
+    TsBlock tsBlock = seriesScanUtil.nextPage();
+    Assert.assertEquals(10, tsBlock.getPositionCount());
+    Assert.assertEquals(60, tsBlock.getTimeByIndex(0));
   }
 
   @Test
   public void testSkipMergeReaderByPushDownFilter() throws 
IllegalPathException, IOException {
     SeriesScanUtil seriesScanUtil = getSeriesScanUtil(TimeFilterApi.gt(0), 
ValueFilterApi.gtEq(60));
-    checkFile1AndFile2AndMergeReaderSkipped(seriesScanUtil);
-  }
 
-  private void checkFile1AndFile2AndMergeReaderSkipped(SeriesScanUtil 
seriesScanUtil)
-      throws IOException {
     checkFile1AndFile2AndFile3Chunk1Skipped(seriesScanUtil);
 
-    // (File 3 - Chunk 1) merge (File 4 - Chunk 1) skipped
+    // (File 3 - Chunk 1) merge (File 4 - Chunk 1)
+    Assert.assertTrue(seriesScanUtil.hasNextPage());
+    Assert.assertFalse(seriesScanUtil.canUseCurrentPageStatistics());
+    TsBlock tsBlock = seriesScanUtil.nextPage();
+    Assert.assertTrue(tsBlock == null || tsBlock.isEmpty());
+
     // File 4 - Chunk 2
     Assert.assertTrue(seriesScanUtil.hasNextPage());
     Assert.assertTrue(seriesScanUtil.canUseCurrentPageStatistics());
-    TsBlock tsBlock = seriesScanUtil.nextPage();
+    tsBlock = seriesScanUtil.nextPage();
     Assert.assertEquals(10, tsBlock.getPositionCount());
     Assert.assertEquals(60, tsBlock.getTimeByIndex(0));
   }
diff --git 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockUtil.java
 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockUtil.java
index 8e10bcea023..3bf472088a7 100644
--- 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockUtil.java
+++ 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockUtil.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.tsfile.read.common.block;
 
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.reader.series.PaginationController;
 
 public class TsBlockUtil {
 
@@ -65,4 +67,57 @@ public class TsBlockUtil {
     }
     return left;
   }
+
+  public static TsBlock applyFilterAndLimitOffsetToTsBlock(
+      TsBlock unFilteredBlock,
+      TsBlockBuilder builder,
+      Filter pushDownFilter,
+      PaginationController paginationController) {
+    boolean[] keepCurrentRow = pushDownFilter.satisfyTsBlock(unFilteredBlock);
+
+    // construct time column
+    int readEndIndex =
+        buildTimeColumnWithPagination(
+            unFilteredBlock, builder, keepCurrentRow, paginationController);
+
+    // construct value columns
+    for (int i = 0; i < builder.getValueColumnBuilders().length; i++) {
+      for (int rowIndex = 0; rowIndex < readEndIndex; rowIndex++) {
+        if (keepCurrentRow[rowIndex]) {
+          if (unFilteredBlock.getValueColumns()[i].isNull(rowIndex)) {
+            builder.getColumnBuilder(i).appendNull();
+          } else {
+            builder
+                .getColumnBuilder(i)
+                
.writeObject(unFilteredBlock.getValueColumns()[i].getObject(rowIndex));
+          }
+        }
+      }
+    }
+    return builder.build();
+  }
+
+  private static int buildTimeColumnWithPagination(
+      TsBlock unFilteredBlock,
+      TsBlockBuilder builder,
+      boolean[] keepCurrentRow,
+      PaginationController paginationController) {
+    int readEndIndex = unFilteredBlock.getPositionCount();
+    for (int rowIndex = 0; rowIndex < readEndIndex; rowIndex++) {
+      if (keepCurrentRow[rowIndex]) {
+        if (paginationController.hasCurOffset()) {
+          paginationController.consumeOffset();
+          keepCurrentRow[rowIndex] = false;
+        } else if (paginationController.hasCurLimit()) {
+          
builder.getTimeColumnBuilder().writeLong(unFilteredBlock.getTimeByIndex(rowIndex));
+          builder.declarePosition();
+          paginationController.consumeLimit();
+        } else {
+          readEndIndex = rowIndex;
+          break;
+        }
+      }
+    }
+    return readEndIndex;
+  }
 }
diff --git 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
index b32a64cfdb9..c24b76798e4 100644
--- 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
+++ 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockUtil;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.reader.IPageReader;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
@@ -205,11 +206,14 @@ public class AlignedPageReader implements IPageReader {
     // construct value columns
     buildValueColumns(readEndIndex, keepCurrentRow, isDeleted);
 
+    TsBlock unFilteredBlock = builder.build();
     if (pushDownFilterAllSatisfy) {
       // OFFSET & LIMIT has been consumed in buildTimeColumn
-      return builder.build();
+      return unFilteredBlock;
     }
-    return applyPushDownFilter();
+    builder.reset();
+    return TsBlockUtil.applyFilterAndLimitOffsetToTsBlock(
+        unFilteredBlock, builder, pushDownFilter, paginationController);
   }
 
   private void buildResultWithoutAnyFilterAndDelete(long[] timeBatch) {
@@ -279,26 +283,6 @@ public class AlignedPageReader implements IPageReader {
     return readEndIndex;
   }
 
-  private int buildTimeColumnWithPagination(TsBlock unFilteredBlock, boolean[] 
keepCurrentRow) {
-    int readEndIndex = unFilteredBlock.getPositionCount();
-    for (int rowIndex = 0; rowIndex < readEndIndex; rowIndex++) {
-      if (keepCurrentRow[rowIndex]) {
-        if (paginationController.hasCurOffset()) {
-          paginationController.consumeOffset();
-          keepCurrentRow[rowIndex] = false;
-        } else if (paginationController.hasCurLimit()) {
-          
builder.getTimeColumnBuilder().writeLong(unFilteredBlock.getTimeByIndex(rowIndex));
-          builder.declarePosition();
-          paginationController.consumeLimit();
-        } else {
-          readEndIndex = rowIndex;
-          break;
-        }
-      }
-    }
-    return readEndIndex;
-  }
-
   private int buildTimeColumnWithoutPagination(long[] timeBatch, boolean[] 
keepCurrentRow) {
     int readEndIndex = 0;
     for (int i = 0; i < timeBatch.length; i++) {
@@ -386,32 +370,6 @@ public class AlignedPageReader implements IPageReader {
     }
   }
 
-  private TsBlock applyPushDownFilter() {
-    TsBlock unFilteredBlock = builder.build();
-    builder.reset();
-
-    boolean[] keepCurrentRow = pushDownFilter.satisfyTsBlock(unFilteredBlock);
-
-    // construct time column
-    int readEndIndex = buildTimeColumnWithPagination(unFilteredBlock, 
keepCurrentRow);
-
-    // construct value columns
-    for (int i = 0; i < valueCount; i++) {
-      for (int rowIndex = 0; rowIndex < readEndIndex; rowIndex++) {
-        if (keepCurrentRow[rowIndex]) {
-          if (unFilteredBlock.getValueColumns()[i].isNull(rowIndex)) {
-            builder.getColumnBuilder(i).appendNull();
-          } else {
-            builder
-                .getColumnBuilder(i)
-                
.writeObject(unFilteredBlock.getValueColumns()[i].getObject(rowIndex));
-          }
-        }
-      }
-    }
-    return builder.build();
-  }
-
   public void setDeleteIntervalList(List<List<TimeRange>> list) {
     for (int i = 0; i < valueCount; i++) {
       if (valuePageReaderList.get(i) != null) {

Reply via email to