This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b5dfbfd4129 [IOTDB-6299] Fix bug in merging overlapped data process
caused by filter & offset push down
b5dfbfd4129 is described below
commit b5dfbfd4129bca50b341e111b8a2a470ce87e2e6
Author: liuminghui233 <[email protected]>
AuthorDate: Fri Feb 23 08:56:46 2024 +0800
[IOTDB-6299] Fix bug in merging overlapped data process caused by filter &
offset push down
---
.../db/it/aligned/IoTDBAlignedSeriesQueryIT.java | 48 +++++++++++++++++++
.../execution/operator/source/SeriesScanUtil.java | 38 ++++++---------
.../AlignedSeriesScanPredicatePushDownTest.java | 7 ++-
.../series/SeriesScanLimitOffsetPushDownTest.java | 5 ++
.../series/SeriesScanPredicatePushDownTest.java | 23 ++++++---
.../tsfile/read/common/block/TsBlockUtil.java | 55 ++++++++++++++++++++++
.../tsfile/read/reader/page/AlignedPageReader.java | 54 +++------------------
7 files changed, 150 insertions(+), 80 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedSeriesQueryIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedSeriesQueryIT.java
index 18530ff77f4..016e2d59daa 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedSeriesQueryIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedSeriesQueryIT.java
@@ -1014,6 +1014,54 @@ public class IoTDBAlignedSeriesQueryIT {
}
}
+ @Test
+ public void selectAllAlignedWithLimitOffsetTest() {
+
+ String[] retArray =
+ new String[] {
+ "14,14.0,14,14,null,null",
+ "15,15.0,15,15,null,null",
+ "16,16.0,16,16,null,null",
+ "17,17.0,17,17,null,null",
+ "18,18.0,18,18,null,null",
+ };
+
+ String[] columnNames = {
+ "root.sg1.d1.s1", "root.sg1.d1.s2", "root.sg1.d1.s3", "root.sg1.d1.s4",
"root.sg1.d1.s5"
+ };
+
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ try (ResultSet resultSet =
+ statement.executeQuery(
+ "select * from root.sg1.d1 where time >= 9 and time <= 33 offset
5 limit 5")) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ Map<String, Integer> map = new HashMap<>();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ map.put(resultSetMetaData.getColumnName(i), i);
+ }
+ assertEquals(columnNames.length + 1,
resultSetMetaData.getColumnCount());
+ int cnt = 0;
+ while (resultSet.next()) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(resultSet.getString(1));
+ for (String columnName : columnNames) {
+ int index = map.get(columnName);
+ builder.append(",").append(resultSet.getString(index));
+ }
+ assertEquals(retArray[cnt], builder.toString());
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
@Test
public void selectSomeAlignedWithValueFilterTest1() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
index 77375d41fac..cd1dc52e575 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
@@ -40,6 +40,7 @@ import
org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockUtil;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
@@ -643,7 +644,9 @@ public class SeriesScanUtil {
if (hasCachedNextOverlappedPage) {
hasCachedNextOverlappedPage = false;
- TsBlock res = cachedTsBlock;
+ TsBlock res =
+ applyPushDownFilterAndLimitOffset(
+ cachedTsBlock, scanOptions.getPushDownFilter(),
paginationController);
cachedTsBlock = null;
// cached tsblock has handled by pagination controller & push down
filter, return directly
@@ -672,6 +675,15 @@ public class SeriesScanUtil {
}
}
+ private TsBlock applyPushDownFilterAndLimitOffset(
+ TsBlock tsBlock, Filter pushDownFilter, PaginationController
paginationController) {
+ if (pushDownFilter == null) {
+ return paginationController.applyTsBlock(tsBlock);
+ }
+ return TsBlockUtil.applyFilterAndLimitOffsetToTsBlock(
+ tsBlock, new TsBlockBuilder(getTsDataTypeList()), pushDownFilter,
paginationController);
+ }
+
private void filterFirstPageReader() {
if (firstPageReader == null) {
return;
@@ -708,7 +720,6 @@ public class SeriesScanUtil {
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
private boolean hasNextOverlappedPage() throws IOException {
long startTime = System.nanoTime();
- Filter pushDownFilter = scanOptions.getPushDownFilter();
try {
if (hasCachedNextOverlappedPage) {
return true;
@@ -810,9 +821,7 @@ public class SeriesScanUtil {
// get the latest first point in mergeReader
timeValuePair = mergeReader.nextTimeValuePair();
- if (processFilterAndPagination(timeValuePair, pushDownFilter,
builder)) {
- break;
- }
+ addTimeValuePairToResult(timeValuePair, builder);
}
hasCachedNextOverlappedPage = !builder.isEmpty();
cachedTsBlock = builder.build();
@@ -875,25 +884,6 @@ public class SeriesScanUtil {
unpackAllOverlappedUnseqPageReadersToMergeReader(currentPageEndpointTime);
}
- private boolean processFilterAndPagination(
- TimeValuePair timeValuePair, Filter pushDownFilter, TsBlockBuilder
builder) {
- if (pushDownFilter != null
- && !pushDownFilter.satisfyRow(timeValuePair.getTimestamp(),
timeValuePair.getValues())) {
- return false;
- }
- if (paginationController.hasCurOffset()) {
- paginationController.consumeOffset();
- return false;
- }
- if (paginationController.hasCurLimit()) {
- addTimeValuePairToResult(timeValuePair, builder);
- paginationController.consumeLimit();
- return false;
- } else {
- return true;
- }
- }
-
private void addTimeValuePairToResult(TimeValuePair timeValuePair,
TsBlockBuilder builder) {
builder.getTimeColumnBuilder().writeLong(timeValuePair.getTimestamp());
switch (dataType) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/AlignedSeriesScanPredicatePushDownTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/AlignedSeriesScanPredicatePushDownTest.java
index 09f848bf3e6..1f22f01c20c 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/AlignedSeriesScanPredicatePushDownTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/AlignedSeriesScanPredicatePushDownTest.java
@@ -216,7 +216,12 @@ public class AlignedSeriesScanPredicatePushDownTest
extends AbstractAlignedSerie
Assert.assertTrue(seriesScanUtil.hasNextPage());
Assert.assertFalse(seriesScanUtil.canUseCurrentPageStatistics());
tsBlock = seriesScanUtil.nextPage();
- Assert.assertNull(tsBlock);
+ Assert.assertTrue(tsBlock == null || tsBlock.isEmpty());
+
+ Assert.assertTrue(seriesScanUtil.hasNextPage());
+ Assert.assertFalse(seriesScanUtil.canUseCurrentPageStatistics());
+ tsBlock = seriesScanUtil.nextPage();
+ Assert.assertTrue(tsBlock == null || tsBlock.isEmpty());
Assert.assertFalse(seriesScanUtil.hasNextPage());
Assert.assertFalse(seriesScanUtil.hasNextChunk());
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/SeriesScanLimitOffsetPushDownTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/SeriesScanLimitOffsetPushDownTest.java
index dcd16984464..2043e747f0f 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/SeriesScanLimitOffsetPushDownTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/SeriesScanLimitOffsetPushDownTest.java
@@ -215,6 +215,11 @@ public class SeriesScanLimitOffsetPushDownTest extends
AbstractSeriesScanTest {
TsBlock tsBlock = seriesScanUtil.nextPage();
Assert.assertTrue(tsBlock == null || tsBlock.isEmpty());
+ Assert.assertTrue(seriesScanUtil.hasNextPage());
+
+ tsBlock = seriesScanUtil.nextPage();
+ Assert.assertTrue(tsBlock == null || tsBlock.isEmpty());
+
Assert.assertFalse(seriesScanUtil.hasNextPage());
Assert.assertTrue(seriesScanUtil.hasNextChunk());
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/SeriesScanPredicatePushDownTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/SeriesScanPredicatePushDownTest.java
index 020361bcf23..b9d4560cf87 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/SeriesScanPredicatePushDownTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/SeriesScanPredicatePushDownTest.java
@@ -294,24 +294,33 @@ public class SeriesScanPredicatePushDownTest extends
AbstractSeriesScanTest {
@Test
public void testSkipMergeReaderByGlobalTimeFilter() throws
IllegalPathException, IOException {
SeriesScanUtil seriesScanUtil = getSeriesScanUtil(TimeFilterApi.gtEq(60),
null);
- checkFile1AndFile2AndMergeReaderSkipped(seriesScanUtil);
+ checkFile1AndFile2AndFile3Chunk1Skipped(seriesScanUtil);
+
+ // (File 3 - Chunk 1) merge (File 4 - Chunk 1) skipped
+ // File 4 - Chunk 2
+ Assert.assertTrue(seriesScanUtil.hasNextPage());
+ Assert.assertTrue(seriesScanUtil.canUseCurrentPageStatistics());
+ TsBlock tsBlock = seriesScanUtil.nextPage();
+ Assert.assertEquals(10, tsBlock.getPositionCount());
+ Assert.assertEquals(60, tsBlock.getTimeByIndex(0));
}
@Test
public void testSkipMergeReaderByPushDownFilter() throws
IllegalPathException, IOException {
SeriesScanUtil seriesScanUtil = getSeriesScanUtil(TimeFilterApi.gt(0),
ValueFilterApi.gtEq(60));
- checkFile1AndFile2AndMergeReaderSkipped(seriesScanUtil);
- }
- private void checkFile1AndFile2AndMergeReaderSkipped(SeriesScanUtil
seriesScanUtil)
- throws IOException {
checkFile1AndFile2AndFile3Chunk1Skipped(seriesScanUtil);
- // (File 3 - Chunk 1) merge (File 4 - Chunk 1) skipped
+ // (File 3 - Chunk 1) merge (File 4 - Chunk 1)
+ Assert.assertTrue(seriesScanUtil.hasNextPage());
+ Assert.assertFalse(seriesScanUtil.canUseCurrentPageStatistics());
+ TsBlock tsBlock = seriesScanUtil.nextPage();
+ Assert.assertTrue(tsBlock == null || tsBlock.isEmpty());
+
// File 4 - Chunk 2
Assert.assertTrue(seriesScanUtil.hasNextPage());
Assert.assertTrue(seriesScanUtil.canUseCurrentPageStatistics());
- TsBlock tsBlock = seriesScanUtil.nextPage();
+ tsBlock = seriesScanUtil.nextPage();
Assert.assertEquals(10, tsBlock.getPositionCount());
Assert.assertEquals(60, tsBlock.getTimeByIndex(0));
}
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockUtil.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockUtil.java
index 8e10bcea023..3bf472088a7 100644
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockUtil.java
+++
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockUtil.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.tsfile.read.common.block;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.reader.series.PaginationController;
public class TsBlockUtil {
@@ -65,4 +67,57 @@ public class TsBlockUtil {
}
return left;
}
+
+ public static TsBlock applyFilterAndLimitOffsetToTsBlock(
+ TsBlock unFilteredBlock,
+ TsBlockBuilder builder,
+ Filter pushDownFilter,
+ PaginationController paginationController) {
+ boolean[] keepCurrentRow = pushDownFilter.satisfyTsBlock(unFilteredBlock);
+
+ // construct time column
+ int readEndIndex =
+ buildTimeColumnWithPagination(
+ unFilteredBlock, builder, keepCurrentRow, paginationController);
+
+ // construct value columns
+ for (int i = 0; i < builder.getValueColumnBuilders().length; i++) {
+ for (int rowIndex = 0; rowIndex < readEndIndex; rowIndex++) {
+ if (keepCurrentRow[rowIndex]) {
+ if (unFilteredBlock.getValueColumns()[i].isNull(rowIndex)) {
+ builder.getColumnBuilder(i).appendNull();
+ } else {
+ builder
+ .getColumnBuilder(i)
+
.writeObject(unFilteredBlock.getValueColumns()[i].getObject(rowIndex));
+ }
+ }
+ }
+ }
+ return builder.build();
+ }
+
+ private static int buildTimeColumnWithPagination(
+ TsBlock unFilteredBlock,
+ TsBlockBuilder builder,
+ boolean[] keepCurrentRow,
+ PaginationController paginationController) {
+ int readEndIndex = unFilteredBlock.getPositionCount();
+ for (int rowIndex = 0; rowIndex < readEndIndex; rowIndex++) {
+ if (keepCurrentRow[rowIndex]) {
+ if (paginationController.hasCurOffset()) {
+ paginationController.consumeOffset();
+ keepCurrentRow[rowIndex] = false;
+ } else if (paginationController.hasCurLimit()) {
+
builder.getTimeColumnBuilder().writeLong(unFilteredBlock.getTimeByIndex(rowIndex));
+ builder.declarePosition();
+ paginationController.consumeLimit();
+ } else {
+ readEndIndex = rowIndex;
+ break;
+ }
+ }
+ }
+ return readEndIndex;
+ }
}
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
index b32a64cfdb9..c24b76798e4 100644
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
+++
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockUtil;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
@@ -205,11 +206,14 @@ public class AlignedPageReader implements IPageReader {
// construct value columns
buildValueColumns(readEndIndex, keepCurrentRow, isDeleted);
+ TsBlock unFilteredBlock = builder.build();
if (pushDownFilterAllSatisfy) {
// OFFSET & LIMIT has been consumed in buildTimeColumn
- return builder.build();
+ return unFilteredBlock;
}
- return applyPushDownFilter();
+ builder.reset();
+ return TsBlockUtil.applyFilterAndLimitOffsetToTsBlock(
+ unFilteredBlock, builder, pushDownFilter, paginationController);
}
private void buildResultWithoutAnyFilterAndDelete(long[] timeBatch) {
@@ -279,26 +283,6 @@ public class AlignedPageReader implements IPageReader {
return readEndIndex;
}
- private int buildTimeColumnWithPagination(TsBlock unFilteredBlock, boolean[]
keepCurrentRow) {
- int readEndIndex = unFilteredBlock.getPositionCount();
- for (int rowIndex = 0; rowIndex < readEndIndex; rowIndex++) {
- if (keepCurrentRow[rowIndex]) {
- if (paginationController.hasCurOffset()) {
- paginationController.consumeOffset();
- keepCurrentRow[rowIndex] = false;
- } else if (paginationController.hasCurLimit()) {
-
builder.getTimeColumnBuilder().writeLong(unFilteredBlock.getTimeByIndex(rowIndex));
- builder.declarePosition();
- paginationController.consumeLimit();
- } else {
- readEndIndex = rowIndex;
- break;
- }
- }
- }
- return readEndIndex;
- }
-
private int buildTimeColumnWithoutPagination(long[] timeBatch, boolean[]
keepCurrentRow) {
int readEndIndex = 0;
for (int i = 0; i < timeBatch.length; i++) {
@@ -386,32 +370,6 @@ public class AlignedPageReader implements IPageReader {
}
}
- private TsBlock applyPushDownFilter() {
- TsBlock unFilteredBlock = builder.build();
- builder.reset();
-
- boolean[] keepCurrentRow = pushDownFilter.satisfyTsBlock(unFilteredBlock);
-
- // construct time column
- int readEndIndex = buildTimeColumnWithPagination(unFilteredBlock,
keepCurrentRow);
-
- // construct value columns
- for (int i = 0; i < valueCount; i++) {
- for (int rowIndex = 0; rowIndex < readEndIndex; rowIndex++) {
- if (keepCurrentRow[rowIndex]) {
- if (unFilteredBlock.getValueColumns()[i].isNull(rowIndex)) {
- builder.getColumnBuilder(i).appendNull();
- } else {
- builder
- .getColumnBuilder(i)
-
.writeObject(unFilteredBlock.getValueColumns()[i].getObject(rowIndex));
- }
- }
- }
- }
- return builder.build();
- }
-
public void setDeleteIntervalList(List<List<TimeRange>> list) {
for (int i = 0; i < valueCount; i++) {
if (valuePageReaderList.get(i) != null) {