This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch LimitPushDownBug in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 556def6112037536f8bd6b81e3fcb468e0430f98 Author: JackieTien97 <[email protected]> AuthorDate: Wed Nov 22 20:23:52 2023 +0800 Fix limit push down bug in aligned sensor --- .../db/it/aligned/IoTDBAlignedLimitPushDownIT.java | 109 ++++++++++++++++++++ .../tsfile/read/reader/page/AlignedPageReader.java | 87 +++++++++++----- .../tsfile/read/reader/page/ValuePageReader.java | 110 +++++++++++++-------- 3 files changed, 240 insertions(+), 66 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedLimitPushDownIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedLimitPushDownIT.java new file mode 100644 index 00000000000..381dbbb9949 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedLimitPushDownIT.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.it.aligned; + +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.itbase.category.LocalStandaloneIT; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({LocalStandaloneIT.class, ClusterIT.class}) +public class IoTDBAlignedLimitPushDownIT { + + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv().initClusterEnvironment(); + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + + statement.addBatch("insert into root.db.d1(time,s1,s2) aligned values(1,1,1)"); + statement.addBatch("insert into root.db.d1(time,s1,s2) aligned values(2,2,2)"); + statement.addBatch("insert into root.db.d1(time,s1,s2) aligned values(3,3,3)"); + statement.addBatch("insert into root.db.d1(time,s1,s2) aligned values(4,4,4)"); + statement.addBatch("flush"); + + statement.executeBatch(); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @AfterClass + public static void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void selectWithLimitPushDownTest() { + + String[] retArray = new String[] {"3,3.0", "4,4.0"}; + + String[] columnNames = {"root.db.d1.s1"}; + + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + + try (ResultSet resultSet = + statement.executeQuery("select s1 from root.db.d1 where time >= 3 limit 2;")) { + ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); + Map<String, Integer> map = new HashMap<>(); + for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { + map.put(resultSetMetaData.getColumnName(i), i); + } + assertEquals(columnNames.length + 1, resultSetMetaData.getColumnCount()); + int cnt = 0; + while (resultSet.next()) { + StringBuilder builder = new StringBuilder(); + builder.append(resultSet.getString(1)); + for (String columnName : columnNames) { + int index = map.get(columnName); + builder.append(",").append(resultSet.getString(index)); + } + assertEquals(retArray[cnt], builder.toString()); + cnt++; + } + assertEquals(retArray.length, cnt); + } + + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java index a2c8909dc19..fadc3009b63 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java @@ -191,39 +191,71 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader { long[] timeBatch = timePageReader.getNextTimeBatch(); if (canGoFastWay()) { - // skip all the page - if (paginationController.hasCurOffset(timeBatch.length)) { - paginationController.consumeOffset(timeBatch.length); - } else { - int readStartIndex = - paginationController.hasCurOffset() ? (int) paginationController.getCurOffset() : 0; - // consume the remaining offset - paginationController.consumeOffset(readStartIndex); - // not included - int readEndIndex = - (paginationController.hasCurLimit() && paginationController.getCurLimit() > 0) - && (paginationController.getCurLimit() < timeBatch.length - readStartIndex + 1) - ? readStartIndex + (int) paginationController.getCurLimit() - : timeBatch.length; - if (paginationController.hasCurLimit() && paginationController.getCurLimit() > 0) { - paginationController.consumeLimit((long) readEndIndex - readStartIndex); - } + // all page data satisfy + if (filter.allSatisfy(getTimeStatistics())) { + // skip all the page + if (paginationController.hasCurOffset(timeBatch.length)) { + paginationController.consumeOffset(timeBatch.length); + } else { + int readStartIndex = + paginationController.hasCurOffset() ? (int) paginationController.getCurOffset() : 0; + // consume the remaining offset + paginationController.consumeOffset(readStartIndex); + // not included + int readEndIndex = + (paginationController.hasCurLimit() && paginationController.getCurLimit() > 0) + && (paginationController.getCurLimit() + < timeBatch.length - readStartIndex + 1) + ? readStartIndex + (int) paginationController.getCurLimit() + : timeBatch.length; + if (paginationController.hasCurLimit() && paginationController.getCurLimit() > 0) { + paginationController.consumeLimit((long) readEndIndex - readStartIndex); + } - boolean[] keepCurrentRow = new boolean[readEndIndex - readStartIndex]; - if (filter == null) { - Arrays.fill(keepCurrentRow, true); // construct time column for (int i = readStartIndex; i < readEndIndex; i++) { builder.getTimeColumnBuilder().writeLong(timeBatch[i]); builder.declarePosition(); } + + // construct value columns + for (int i = 0; i < valueCount; i++) { + ValuePageReader pageReader = valuePageReaderList.get(i); + if (pageReader != null) { + pageReader.writeColumnBuilderWithNextBatch( + readStartIndex, readEndIndex, builder.getColumnBuilder(i)); + } else { + builder.getColumnBuilder(i).appendNull(readEndIndex - readStartIndex); + } + } + } + } else { + + // if all the sub sensors' value are null in current row, just discard it + // if !filter.satisfy, discard this row + boolean[] keepCurrentRow = new boolean[timeBatch.length]; + if (filter == null) { + Arrays.fill(keepCurrentRow, true); } else { - for (int i = readStartIndex; i < readEndIndex; i++) { - keepCurrentRow[i - readStartIndex] = filter.satisfy(timeBatch[i], null); - // construct time column - if (keepCurrentRow[i - readStartIndex]) { + for (int i = 0, n = timeBatch.length; i < n; i++) { + keepCurrentRow[i] = filter.satisfy(timeBatch[i], null); + } + } + + // construct time column + int readEndIndex = timeBatch.length; + for (int i = 0; i < timeBatch.length; i++) { + if (keepCurrentRow[i]) { + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + keepCurrentRow[i] = false; + } else if (paginationController.hasCurLimit()) { builder.getTimeColumnBuilder().writeLong(timeBatch[i]); builder.declarePosition(); + paginationController.consumeLimit(); + } else { + readEndIndex = i; + break; } } } @@ -233,16 +265,17 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader { ValuePageReader pageReader = valuePageReaderList.get(i); if (pageReader != null) { pageReader.writeColumnBuilderWithNextBatch( - readStartIndex, readEndIndex, builder.getColumnBuilder(i), keepCurrentRow); + readEndIndex, builder.getColumnBuilder(i), keepCurrentRow); } else { - for (int j = readStartIndex; j < readEndIndex; j++) { - if (keepCurrentRow[j - readStartIndex]) { + for (int j = 0; j < readEndIndex; j++) { + if (keepCurrentRow[j]) { builder.getColumnBuilder(i).appendNull(); } } } } } + } else { // if all the sub sensors' value are null in current row, just discard it // if !filter.satisfy, discard this row diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java index c158e3077eb..e362c173e31 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java @@ -330,13 +330,69 @@ public class ValuePageReader { } public void writeColumnBuilderWithNextBatch( - int readStartIndex, int readEndIndex, ColumnBuilder columnBuilder, boolean[] satisfied) { + int readEndIndex, ColumnBuilder columnBuilder, boolean[] keepCurrentRow) { if (valueBuffer == null) { - for (int i = readStartIndex; i < readEndIndex; i++) { - if (satisfied[i - readStartIndex]) { + for (int i = 0; i < readEndIndex; i++) { + if (keepCurrentRow[i]) { + columnBuilder.appendNull(); + } + } + return; + } + for (int i = 0; i < readEndIndex; i++) { + if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) { + if (keepCurrentRow[i]) { columnBuilder.appendNull(); } + continue; + } + switch (dataType) { + case BOOLEAN: + boolean aBoolean = valueDecoder.readBoolean(valueBuffer); + if (keepCurrentRow[i]) { + columnBuilder.writeBoolean(aBoolean); + } + break; + case INT32: + int anInt = valueDecoder.readInt(valueBuffer); + if (keepCurrentRow[i]) { + columnBuilder.writeInt(anInt); + } + break; + case INT64: + long aLong = valueDecoder.readLong(valueBuffer); + if (keepCurrentRow[i]) { + columnBuilder.writeLong(aLong); + } + break; + case FLOAT: + float aFloat = valueDecoder.readFloat(valueBuffer); + if (keepCurrentRow[i]) { + columnBuilder.writeFloat(aFloat); + } + break; + case DOUBLE: + double aDouble = valueDecoder.readDouble(valueBuffer); + if (keepCurrentRow[i]) { + columnBuilder.writeDouble(aDouble); + } + break; + case TEXT: + Binary aBinary = valueDecoder.readBinary(valueBuffer); + if (keepCurrentRow[i]) { + columnBuilder.writeBinary(aBinary); + } + break; + default: + throw new UnSupportedDataTypeException(String.valueOf(dataType)); } + } + } + + public void writeColumnBuilderWithNextBatch( + int readStartIndex, int readEndIndex, ColumnBuilder columnBuilder) { + if (valueBuffer == null) { + columnBuilder.appendNull(readEndIndex - readStartIndex); return; } @@ -349,15 +405,11 @@ public class ValuePageReader { for (int i = readStartIndex; i < readEndIndex; i++) { if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) { - if (satisfied[i - readStartIndex]) { - columnBuilder.appendNull(); - } + columnBuilder.appendNull(); continue; } boolean aBoolean = valueDecoder.readBoolean(valueBuffer); - if (satisfied[i - readStartIndex]) { - columnBuilder.writeBoolean(aBoolean); - } + columnBuilder.writeBoolean(aBoolean); } break; case INT32: @@ -368,15 +420,11 @@ public class ValuePageReader { for (int i = readStartIndex; i < readEndIndex; i++) { if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) { - if (satisfied[i - readStartIndex]) { - columnBuilder.appendNull(); - } + columnBuilder.appendNull(); continue; } int aInt = valueDecoder.readInt(valueBuffer); - if (satisfied[i - readStartIndex]) { - columnBuilder.writeInt(aInt); - } + columnBuilder.writeInt(aInt); } break; case INT64: @@ -387,15 +435,11 @@ public class ValuePageReader { for (int i = readStartIndex; i < readEndIndex; i++) { if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) { - if (satisfied[i - readStartIndex]) { - columnBuilder.appendNull(); - } + columnBuilder.appendNull(); continue; } long aLong = valueDecoder.readLong(valueBuffer); - if (satisfied[i - readStartIndex]) { - columnBuilder.writeLong(aLong); - } + columnBuilder.writeLong(aLong); } break; case FLOAT: @@ -406,15 +450,11 @@ public class ValuePageReader { for (int i = readStartIndex; i < readEndIndex; i++) { if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) { - if (satisfied[i - readStartIndex]) { - columnBuilder.appendNull(); - } + columnBuilder.appendNull(); continue; } float aFloat = valueDecoder.readFloat(valueBuffer); - if (satisfied[i - readStartIndex]) { - columnBuilder.writeFloat(aFloat); - } + columnBuilder.writeFloat(aFloat); } break; case DOUBLE: @@ -425,15 +465,11 @@ public class ValuePageReader { for (int i = readStartIndex; i < readEndIndex; i++) { if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) { - if (satisfied[i - readStartIndex]) { - columnBuilder.appendNull(); - } + columnBuilder.appendNull(); continue; } double aDouble = valueDecoder.readDouble(valueBuffer); - if (satisfied[i - readStartIndex]) { - columnBuilder.writeDouble(aDouble); - } + columnBuilder.writeDouble(aDouble); } break; case TEXT: @@ -444,15 +480,11 @@ public class ValuePageReader { for (int i = readStartIndex; i < readEndIndex; i++) { if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) { - if (satisfied[i - readStartIndex]) { - columnBuilder.appendNull(); - } + columnBuilder.appendNull(); continue; } Binary aBinary = valueDecoder.readBinary(valueBuffer); - if (satisfied[i - readStartIndex]) { - columnBuilder.writeBinary(aBinary); - } + columnBuilder.writeBinary(aBinary); } break; default:
