This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch LimitPushDownBug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 556def6112037536f8bd6b81e3fcb468e0430f98
Author: JackieTien97 <[email protected]>
AuthorDate: Wed Nov 22 20:23:52 2023 +0800

    Fix limit push down bug in aligned sensor
---
 .../db/it/aligned/IoTDBAlignedLimitPushDownIT.java | 109 ++++++++++++++++++++
 .../tsfile/read/reader/page/AlignedPageReader.java |  87 +++++++++++-----
 .../tsfile/read/reader/page/ValuePageReader.java   | 110 +++++++++++++--------
 3 files changed, 240 insertions(+), 66 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedLimitPushDownIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedLimitPushDownIT.java
new file mode 100644
index 00000000000..381dbbb9949
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedLimitPushDownIT.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.aligned;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBAlignedLimitPushDownIT {
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv().initClusterEnvironment();
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      statement.addBatch("insert into root.db.d1(time,s1,s2) aligned 
values(1,1,1)");
+      statement.addBatch("insert into root.db.d1(time,s1,s2) aligned 
values(2,2,2)");
+      statement.addBatch("insert into root.db.d1(time,s1,s2) aligned 
values(3,3,3)");
+      statement.addBatch("insert into root.db.d1(time,s1,s2) aligned 
values(4,4,4)");
+      statement.addBatch("flush");
+
+      statement.executeBatch();
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void selectWithLimitPushDownTest() {
+
+    String[] retArray = new String[] {"3,3.0", "4,4.0"};
+
+    String[] columnNames = {"root.db.d1.s1"};
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet =
+          statement.executeQuery("select s1 from root.db.d1 where time >= 3 
limit 2;")) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        Map<String, Integer> map = new HashMap<>();
+        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+          map.put(resultSetMetaData.getColumnName(i), i);
+        }
+        assertEquals(columnNames.length + 1, 
resultSetMetaData.getColumnCount());
+        int cnt = 0;
+        while (resultSet.next()) {
+          StringBuilder builder = new StringBuilder();
+          builder.append(resultSet.getString(1));
+          for (String columnName : columnNames) {
+            int index = map.get(columnName);
+            builder.append(",").append(resultSet.getString(index));
+          }
+          assertEquals(retArray[cnt], builder.toString());
+          cnt++;
+        }
+        assertEquals(retArray.length, cnt);
+      }
+
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+}
diff --git 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
index a2c8909dc19..fadc3009b63 100644
--- 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
+++ 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
@@ -191,39 +191,71 @@ public class AlignedPageReader implements IPageReader, 
IAlignedPageReader {
     long[] timeBatch = timePageReader.getNextTimeBatch();
 
     if (canGoFastWay()) {
-      // skip all the page
-      if (paginationController.hasCurOffset(timeBatch.length)) {
-        paginationController.consumeOffset(timeBatch.length);
-      } else {
-        int readStartIndex =
-            paginationController.hasCurOffset() ? (int) 
paginationController.getCurOffset() : 0;
-        // consume the remaining offset
-        paginationController.consumeOffset(readStartIndex);
-        // not included
-        int readEndIndex =
-            (paginationController.hasCurLimit() && 
paginationController.getCurLimit() > 0)
-                    && (paginationController.getCurLimit() < timeBatch.length 
- readStartIndex + 1)
-                ? readStartIndex + (int) paginationController.getCurLimit()
-                : timeBatch.length;
-        if (paginationController.hasCurLimit() && 
paginationController.getCurLimit() > 0) {
-          paginationController.consumeLimit((long) readEndIndex - 
readStartIndex);
-        }
+      // all page data satisfy
+      if (filter.allSatisfy(getTimeStatistics())) {
+        // skip all the page
+        if (paginationController.hasCurOffset(timeBatch.length)) {
+          paginationController.consumeOffset(timeBatch.length);
+        } else {
+          int readStartIndex =
+              paginationController.hasCurOffset() ? (int) 
paginationController.getCurOffset() : 0;
+          // consume the remaining offset
+          paginationController.consumeOffset(readStartIndex);
+          // not included
+          int readEndIndex =
+              (paginationController.hasCurLimit() && 
paginationController.getCurLimit() > 0)
+                      && (paginationController.getCurLimit()
+                          < timeBatch.length - readStartIndex + 1)
+                  ? readStartIndex + (int) paginationController.getCurLimit()
+                  : timeBatch.length;
+          if (paginationController.hasCurLimit() && 
paginationController.getCurLimit() > 0) {
+            paginationController.consumeLimit((long) readEndIndex - 
readStartIndex);
+          }
 
-        boolean[] keepCurrentRow = new boolean[readEndIndex - readStartIndex];
-        if (filter == null) {
-          Arrays.fill(keepCurrentRow, true);
           // construct time column
           for (int i = readStartIndex; i < readEndIndex; i++) {
             builder.getTimeColumnBuilder().writeLong(timeBatch[i]);
             builder.declarePosition();
           }
+
+          // construct value columns
+          for (int i = 0; i < valueCount; i++) {
+            ValuePageReader pageReader = valuePageReaderList.get(i);
+            if (pageReader != null) {
+              pageReader.writeColumnBuilderWithNextBatch(
+                  readStartIndex, readEndIndex, builder.getColumnBuilder(i));
+            } else {
+              builder.getColumnBuilder(i).appendNull(readEndIndex - 
readStartIndex);
+            }
+          }
+        }
+      } else {
+
+        // if all the sub sensors' value are null in current row, just discard 
it
+        // if !filter.satisfy, discard this row
+        boolean[] keepCurrentRow = new boolean[timeBatch.length];
+        if (filter == null) {
+          Arrays.fill(keepCurrentRow, true);
         } else {
-          for (int i = readStartIndex; i < readEndIndex; i++) {
-            keepCurrentRow[i - readStartIndex] = filter.satisfy(timeBatch[i], 
null);
-            // construct time column
-            if (keepCurrentRow[i - readStartIndex]) {
+          for (int i = 0, n = timeBatch.length; i < n; i++) {
+            keepCurrentRow[i] = filter.satisfy(timeBatch[i], null);
+          }
+        }
+
+        // construct time column
+        int readEndIndex = timeBatch.length;
+        for (int i = 0; i < timeBatch.length; i++) {
+          if (keepCurrentRow[i]) {
+            if (paginationController.hasCurOffset()) {
+              paginationController.consumeOffset();
+              keepCurrentRow[i] = false;
+            } else if (paginationController.hasCurLimit()) {
               builder.getTimeColumnBuilder().writeLong(timeBatch[i]);
               builder.declarePosition();
+              paginationController.consumeLimit();
+            } else {
+              readEndIndex = i;
+              break;
             }
           }
         }
@@ -233,16 +265,17 @@ public class AlignedPageReader implements IPageReader, 
IAlignedPageReader {
           ValuePageReader pageReader = valuePageReaderList.get(i);
           if (pageReader != null) {
             pageReader.writeColumnBuilderWithNextBatch(
-                readStartIndex, readEndIndex, builder.getColumnBuilder(i), 
keepCurrentRow);
+                readEndIndex, builder.getColumnBuilder(i), keepCurrentRow);
           } else {
-            for (int j = readStartIndex; j < readEndIndex; j++) {
-              if (keepCurrentRow[j - readStartIndex]) {
+            for (int j = 0; j < readEndIndex; j++) {
+              if (keepCurrentRow[j]) {
                 builder.getColumnBuilder(i).appendNull();
               }
             }
           }
         }
       }
+
     } else {
       // if all the sub sensors' value are null in current row, just discard it
       // if !filter.satisfy, discard this row
diff --git 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java
 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java
index c158e3077eb..e362c173e31 100644
--- 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java
+++ 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java
@@ -330,13 +330,69 @@ public class ValuePageReader {
   }
 
   public void writeColumnBuilderWithNextBatch(
-      int readStartIndex, int readEndIndex, ColumnBuilder columnBuilder, 
boolean[] satisfied) {
+      int readEndIndex, ColumnBuilder columnBuilder, boolean[] keepCurrentRow) 
{
     if (valueBuffer == null) {
-      for (int i = readStartIndex; i < readEndIndex; i++) {
-        if (satisfied[i - readStartIndex]) {
+      for (int i = 0; i < readEndIndex; i++) {
+        if (keepCurrentRow[i]) {
+          columnBuilder.appendNull();
+        }
+      }
+      return;
+    }
+    for (int i = 0; i < readEndIndex; i++) {
+      if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) {
+        if (keepCurrentRow[i]) {
           columnBuilder.appendNull();
         }
+        continue;
+      }
+      switch (dataType) {
+        case BOOLEAN:
+          boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
+          if (keepCurrentRow[i]) {
+            columnBuilder.writeBoolean(aBoolean);
+          }
+          break;
+        case INT32:
+          int anInt = valueDecoder.readInt(valueBuffer);
+          if (keepCurrentRow[i]) {
+            columnBuilder.writeInt(anInt);
+          }
+          break;
+        case INT64:
+          long aLong = valueDecoder.readLong(valueBuffer);
+          if (keepCurrentRow[i]) {
+            columnBuilder.writeLong(aLong);
+          }
+          break;
+        case FLOAT:
+          float aFloat = valueDecoder.readFloat(valueBuffer);
+          if (keepCurrentRow[i]) {
+            columnBuilder.writeFloat(aFloat);
+          }
+          break;
+        case DOUBLE:
+          double aDouble = valueDecoder.readDouble(valueBuffer);
+          if (keepCurrentRow[i]) {
+            columnBuilder.writeDouble(aDouble);
+          }
+          break;
+        case TEXT:
+          Binary aBinary = valueDecoder.readBinary(valueBuffer);
+          if (keepCurrentRow[i]) {
+            columnBuilder.writeBinary(aBinary);
+          }
+          break;
+        default:
+          throw new UnSupportedDataTypeException(String.valueOf(dataType));
       }
+    }
+  }
+
+  public void writeColumnBuilderWithNextBatch(
+      int readStartIndex, int readEndIndex, ColumnBuilder columnBuilder) {
+    if (valueBuffer == null) {
+      columnBuilder.appendNull(readEndIndex - readStartIndex);
       return;
     }
 
@@ -349,15 +405,11 @@ public class ValuePageReader {
 
         for (int i = readStartIndex; i < readEndIndex; i++) {
           if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) {
-            if (satisfied[i - readStartIndex]) {
-              columnBuilder.appendNull();
-            }
+            columnBuilder.appendNull();
             continue;
           }
           boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
-          if (satisfied[i - readStartIndex]) {
-            columnBuilder.writeBoolean(aBoolean);
-          }
+          columnBuilder.writeBoolean(aBoolean);
         }
         break;
       case INT32:
@@ -368,15 +420,11 @@ public class ValuePageReader {
 
         for (int i = readStartIndex; i < readEndIndex; i++) {
           if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) {
-            if (satisfied[i - readStartIndex]) {
-              columnBuilder.appendNull();
-            }
+            columnBuilder.appendNull();
             continue;
           }
           int aInt = valueDecoder.readInt(valueBuffer);
-          if (satisfied[i - readStartIndex]) {
-            columnBuilder.writeInt(aInt);
-          }
+          columnBuilder.writeInt(aInt);
         }
         break;
       case INT64:
@@ -387,15 +435,11 @@ public class ValuePageReader {
 
         for (int i = readStartIndex; i < readEndIndex; i++) {
           if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) {
-            if (satisfied[i - readStartIndex]) {
-              columnBuilder.appendNull();
-            }
+            columnBuilder.appendNull();
             continue;
           }
           long aLong = valueDecoder.readLong(valueBuffer);
-          if (satisfied[i - readStartIndex]) {
-            columnBuilder.writeLong(aLong);
-          }
+          columnBuilder.writeLong(aLong);
         }
         break;
       case FLOAT:
@@ -406,15 +450,11 @@ public class ValuePageReader {
 
         for (int i = readStartIndex; i < readEndIndex; i++) {
           if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) {
-            if (satisfied[i - readStartIndex]) {
-              columnBuilder.appendNull();
-            }
+            columnBuilder.appendNull();
             continue;
           }
           float aFloat = valueDecoder.readFloat(valueBuffer);
-          if (satisfied[i - readStartIndex]) {
-            columnBuilder.writeFloat(aFloat);
-          }
+          columnBuilder.writeFloat(aFloat);
         }
         break;
       case DOUBLE:
@@ -425,15 +465,11 @@ public class ValuePageReader {
 
         for (int i = readStartIndex; i < readEndIndex; i++) {
           if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) {
-            if (satisfied[i - readStartIndex]) {
-              columnBuilder.appendNull();
-            }
+            columnBuilder.appendNull();
             continue;
           }
           double aDouble = valueDecoder.readDouble(valueBuffer);
-          if (satisfied[i - readStartIndex]) {
-            columnBuilder.writeDouble(aDouble);
-          }
+          columnBuilder.writeDouble(aDouble);
         }
         break;
       case TEXT:
@@ -444,15 +480,11 @@ public class ValuePageReader {
 
         for (int i = readStartIndex; i < readEndIndex; i++) {
           if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) {
-            if (satisfied[i - readStartIndex]) {
-              columnBuilder.appendNull();
-            }
+            columnBuilder.appendNull();
             continue;
           }
           Binary aBinary = valueDecoder.readBinary(valueBuffer);
-          if (satisfied[i - readStartIndex]) {
-            columnBuilder.writeBinary(aBinary);
-          }
+          columnBuilder.writeBinary(aBinary);
         }
         break;
       default:

Reply via email to