This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 8bd81920c05a46e567526f1f913ce70552204734
Author: Arina Ielchiieva <[email protected]>
AuthorDate: Tue Nov 20 20:08:38 2018 +0200

    DRILL-6857: Read only required row groups in a file when limit push down is 
applied
    
    closes #1548
---
 .../store/parquet/AbstractParquetGroupScan.java    | 91 +++++++++++-----------
 .../store/parquet/TestParquetLimitPushDown.java    | 75 ++++++++++++++++++
 2 files changed, 119 insertions(+), 47 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
index 9bc969f..0d35ddb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
@@ -246,7 +246,6 @@ public abstract class AbstractParquetGroupScan extends 
AbstractFileGroupScan {
     final Set<SchemaPath> schemaPathsInExpr = filterExpr.accept(new 
ParquetRGFilterEvaluator.FieldReferenceFinder(), null);
 
     final List<RowGroupInfo> qualifiedRGs = new 
ArrayList<>(rowGroupInfos.size());
-    Set<String> qualifiedFilePath = new HashSet<>(); // HashSet keeps a 
fileName unique.
 
     ParquetFilterPredicate filterPredicate = null;
 
@@ -289,17 +288,15 @@ public abstract class AbstractParquetGroupScan extends 
AbstractFileGroupScan {
       rowGroup.setRowsMatch(match);
 
       qualifiedRGs.add(rowGroup);
-      qualifiedFilePath.add(rowGroup.getPath());
     }
 
     if (qualifiedRGs.size() == rowGroupInfos.size() ) {
       // There is no reduction of rowGroups. Return the original groupScan.
-      logger.debug("applyFilter does not have any pruning!");
+      logger.debug("applyFilter() does not have any pruning!");
       return null;
-    } else if (qualifiedFilePath.size() == 0) {
-      logger.debug("All rowgroups have been filtered out. Add back one to get 
schema from scannner");
+    } else if (qualifiedRGs.size() == 0) {
+      logger.debug("All row groups have been filtered out. Add back one to get 
schema from scanner.");
       RowGroupInfo rg = rowGroupInfos.iterator().next();
-      qualifiedFilePath.add(rg.getPath());
       qualifiedRGs.add(rg);
     }
 
@@ -307,11 +304,7 @@ public abstract class AbstractParquetGroupScan extends 
AbstractFileGroupScan {
       ExpressionStringBuilder.toString(filterExpr), rowGroupInfos.size(), 
qualifiedRGs.size());
 
     try {
-      AbstractParquetGroupScan cloneGroupScan = 
cloneWithFileSelection(qualifiedFilePath);
-      cloneGroupScan.rowGroupInfos = qualifiedRGs;
-      
cloneGroupScan.parquetGroupScanStatistics.collect(cloneGroupScan.rowGroupInfos, 
cloneGroupScan.parquetTableMetadata);
-      return cloneGroupScan;
-
+      return cloneWithRowGroupInfos(qualifiedRGs);
     } catch (IOException e) {
       logger.warn("Could not apply filter prune due to Exception : {}", e);
       return null;
@@ -330,29 +323,41 @@ public abstract class AbstractParquetGroupScan extends 
AbstractFileGroupScan {
     maxRecords = Math.max(maxRecords, 1); // Make sure it request at least 1 
row -> 1 rowGroup.
     // further optimization : minimize # of files chosen, or the affinity of 
files chosen.
 
+    if (parquetGroupScanStatistics.getRowCount() <= maxRecords) {
+      logger.debug("limit push down does not apply, since total number of rows 
[{}] is less or equal to the required [{}].",
+        parquetGroupScanStatistics.getRowCount(), maxRecords);
+      return null;
+    }
+
     // Calculate number of rowGroups to read based on maxRecords and update
     // number of records to read for each of those rowGroups.
-    int index = updateRowGroupInfo(maxRecords);
-
-    Set<String> filePaths = rowGroupInfos.subList(0, index).stream()
-        .map(ReadEntryWithPath::getPath)
-        .collect(Collectors.toSet()); // HashSet keeps a filePath unique.
+    List<RowGroupInfo> qualifiedRowGroupInfos = new 
ArrayList<>(rowGroupInfos.size());
+    int currentRowCount = 0;
+    for (RowGroupInfo rowGroupInfo : rowGroupInfos) {
+      long rowCount = rowGroupInfo.getRowCount();
+      if (currentRowCount + rowCount <= maxRecords) {
+        currentRowCount += rowCount;
+        rowGroupInfo.setNumRecordsToRead(rowCount);
+        qualifiedRowGroupInfos.add(rowGroupInfo);
+        continue;
+      } else if (currentRowCount < maxRecords) {
+        rowGroupInfo.setNumRecordsToRead(maxRecords - currentRowCount);
+        qualifiedRowGroupInfos.add(rowGroupInfo);
+      }
+      break;
+    }
 
-    // If there is no change in fileSet, no need to create new groupScan.
-    if (filePaths.size() == fileSet.size() ) {
-      // There is no reduction of rowGroups. Return the original groupScan.
-      logger.debug("applyLimit() does not apply!");
+    if (rowGroupInfos.size() == qualifiedRowGroupInfos.size()) {
+      logger.debug("limit push down does not apply, since number of row groups 
was not reduced.");
       return null;
     }
 
-    logger.debug("applyLimit() reduce parquet file # from {} to {}", 
fileSet.size(), filePaths.size());
+    logger.debug("applyLimit() reduce parquet row groups # from {} to {}.", 
rowGroupInfos.size(), qualifiedRowGroupInfos.size());
 
     try {
-      AbstractParquetGroupScan newScan = cloneWithFileSelection(filePaths);
-      newScan.updateRowGroupInfo(maxRecords);
-      return newScan;
+      return cloneWithRowGroupInfos(qualifiedRowGroupInfos);
     } catch (IOException e) {
-      logger.warn("Could not apply rowcount based prune due to Exception : 
{}", e);
+      logger.warn("Could not apply row count based prune due to Exception: 
{}", e);
       return null;
     }
   }
@@ -454,30 +459,22 @@ public abstract class AbstractParquetGroupScan extends 
AbstractFileGroupScan {
 
   // private methods block start
   /**
-   * Based on maxRecords to read for the scan,
-   * figure out how many rowGroups to read
-   * and update number of records to read for each of them.
+   * Clones current group scan with set of file paths from given row groups,
+   * updates new scan with list of given row groups,
+   * re-calculates statistics and endpoint affinities.
    *
-   * @param maxRecords max records to read
-   * @return total number of rowGroups to read
+   * @param rowGroupInfos list of row group infos
+   * @return new parquet group scan
    */
-  private int updateRowGroupInfo(int maxRecords) {
-    long count = 0;
-    int index = 0;
-    for (RowGroupInfo rowGroupInfo : rowGroupInfos) {
-      long rowCount = rowGroupInfo.getRowCount();
-      if (count + rowCount <= maxRecords) {
-        count += rowCount;
-        rowGroupInfo.setNumRecordsToRead(rowCount);
-        index++;
-        continue;
-      } else if (count < maxRecords) {
-        rowGroupInfo.setNumRecordsToRead(maxRecords - count);
-        index++;
-      }
-      break;
-    }
-    return index;
+  private AbstractParquetGroupScan cloneWithRowGroupInfos(List<RowGroupInfo> 
rowGroupInfos) throws IOException {
+    Set<String> filePaths = rowGroupInfos.stream()
+      .map(ReadEntryWithPath::getPath)
+      .collect(Collectors.toSet()); // set keeps file names unique
+    AbstractParquetGroupScan scan = cloneWithFileSelection(filePaths);
+    scan.rowGroupInfos = rowGroupInfos;
+    scan.parquetGroupScanStatistics.collect(scan.rowGroupInfos, 
scan.parquetTableMetadata);
+    scan.endpointAffinities = 
AffinityCreator.getAffinityMap(scan.rowGroupInfos);
+    return scan;
   }
   // private methods block end
 
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLimitPushDown.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLimitPushDown.java
new file mode 100644
index 0000000..7749796
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLimitPushDown.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.nio.file.Paths;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestParquetLimitPushDown extends ClusterTest {
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    dirTestWatcher.copyResourceToRoot(Paths.get("parquet", 
"multirowgroup.parquet"));
+    dirTestWatcher.copyResourceToRoot(Paths.get("parquet", "users"));
+    startCluster(builder);
+  }
+
+  @Test
+  public void testMultipleFiles() throws Exception {
+    String query = "select * from dfs.`parquet/users` limit 1";
+    QueryBuilder.QuerySummary summary = queryBuilder().sql(query).run();
+    assertTrue(summary.succeeded());
+    assertEquals(1, summary.recordCount());
+
+    String plan = queryBuilder().sql(query).explainText();
+    assertTrue(plan.contains("numRowGroups=1"));
+  }
+
+  @Test
+  public void testMultipleRowGroups() throws Exception {
+    String query = "select * from dfs.`parquet/multirowgroup.parquet` limit 1";
+    QueryBuilder.QuerySummary summary = queryBuilder().sql(query).run();
+    assertTrue(summary.succeeded());
+    assertEquals(1, summary.recordCount());
+
+    String plan = queryBuilder().sql(query).explainText();
+    assertTrue(plan.contains("numRowGroups=1"));
+  }
+
+  @Test
+  public void testLimitZero() throws Exception {
+    String query = "select * from dfs.`parquet/users` limit 0";
+    QueryBuilder.QuerySummary summary = queryBuilder().sql(query).run();
+    assertTrue(summary.succeeded());
+    assertEquals(0, summary.recordCount());
+
+    String plan = queryBuilder().sql(query).explainText();
+    assertTrue(plan.contains("numRowGroups=1"));
+  }
+
+}

Reply via email to