This is an automated email from the ASF dual-hosted git repository. volodymyr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 8bd81920c05a46e567526f1f913ce70552204734 Author: Arina Ielchiieva <[email protected]> AuthorDate: Tue Nov 20 20:08:38 2018 +0200 DRILL-6857: Read only required row groups in a file when limit push down is applied closes #1548 --- .../store/parquet/AbstractParquetGroupScan.java | 91 +++++++++++----------- .../store/parquet/TestParquetLimitPushDown.java | 75 ++++++++++++++++++ 2 files changed, 119 insertions(+), 47 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java index 9bc969f..0d35ddb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java @@ -246,7 +246,6 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { final Set<SchemaPath> schemaPathsInExpr = filterExpr.accept(new ParquetRGFilterEvaluator.FieldReferenceFinder(), null); final List<RowGroupInfo> qualifiedRGs = new ArrayList<>(rowGroupInfos.size()); - Set<String> qualifiedFilePath = new HashSet<>(); // HashSet keeps a fileName unique. ParquetFilterPredicate filterPredicate = null; @@ -289,17 +288,15 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { rowGroup.setRowsMatch(match); qualifiedRGs.add(rowGroup); - qualifiedFilePath.add(rowGroup.getPath()); } if (qualifiedRGs.size() == rowGroupInfos.size() ) { // There is no reduction of rowGroups. Return the original groupScan. - logger.debug("applyFilter does not have any pruning!"); + logger.debug("applyFilter() does not have any pruning!"); return null; - } else if (qualifiedFilePath.size() == 0) { - logger.debug("All rowgroups have been filtered out. Add back one to get schema from scannner"); + } else if (qualifiedRGs.size() == 0) { + logger.debug("All row groups have been filtered out. Add back one to get schema from scanner."); RowGroupInfo rg = rowGroupInfos.iterator().next(); - qualifiedFilePath.add(rg.getPath()); qualifiedRGs.add(rg); } @@ -307,11 +304,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { ExpressionStringBuilder.toString(filterExpr), rowGroupInfos.size(), qualifiedRGs.size()); try { - AbstractParquetGroupScan cloneGroupScan = cloneWithFileSelection(qualifiedFilePath); - cloneGroupScan.rowGroupInfos = qualifiedRGs; - cloneGroupScan.parquetGroupScanStatistics.collect(cloneGroupScan.rowGroupInfos, cloneGroupScan.parquetTableMetadata); - return cloneGroupScan; - + return cloneWithRowGroupInfos(qualifiedRGs); } catch (IOException e) { logger.warn("Could not apply filter prune due to Exception : {}", e); return null; @@ -330,29 +323,41 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { maxRecords = Math.max(maxRecords, 1); // Make sure it request at least 1 row -> 1 rowGroup. // further optimization : minimize # of files chosen, or the affinity of files chosen. + if (parquetGroupScanStatistics.getRowCount() <= maxRecords) { + logger.debug("limit push down does not apply, since total number of rows [{}] is less or equal to the required [{}].", + parquetGroupScanStatistics.getRowCount(), maxRecords); + return null; + } + // Calculate number of rowGroups to read based on maxRecords and update // number of records to read for each of those rowGroups. - int index = updateRowGroupInfo(maxRecords); - - Set<String> filePaths = rowGroupInfos.subList(0, index).stream() - .map(ReadEntryWithPath::getPath) - .collect(Collectors.toSet()); // HashSet keeps a filePath unique. + List<RowGroupInfo> qualifiedRowGroupInfos = new ArrayList<>(rowGroupInfos.size()); + int currentRowCount = 0; + for (RowGroupInfo rowGroupInfo : rowGroupInfos) { + long rowCount = rowGroupInfo.getRowCount(); + if (currentRowCount + rowCount <= maxRecords) { + currentRowCount += rowCount; + rowGroupInfo.setNumRecordsToRead(rowCount); + qualifiedRowGroupInfos.add(rowGroupInfo); + continue; + } else if (currentRowCount < maxRecords) { + rowGroupInfo.setNumRecordsToRead(maxRecords - currentRowCount); + qualifiedRowGroupInfos.add(rowGroupInfo); + } + break; + } - // If there is no change in fileSet, no need to create new groupScan. - if (filePaths.size() == fileSet.size() ) { - // There is no reduction of rowGroups. Return the original groupScan. - logger.debug("applyLimit() does not apply!"); + if (rowGroupInfos.size() == qualifiedRowGroupInfos.size()) { + logger.debug("limit push down does not apply, since number of row groups was not reduced."); return null; } - logger.debug("applyLimit() reduce parquet file # from {} to {}", fileSet.size(), filePaths.size()); + logger.debug("applyLimit() reduce parquet row groups # from {} to {}.", rowGroupInfos.size(), qualifiedRowGroupInfos.size()); try { - AbstractParquetGroupScan newScan = cloneWithFileSelection(filePaths); - newScan.updateRowGroupInfo(maxRecords); - return newScan; + return cloneWithRowGroupInfos(qualifiedRowGroupInfos); } catch (IOException e) { - logger.warn("Could not apply rowcount based prune due to Exception : {}", e); + logger.warn("Could not apply row count based prune due to Exception: {}", e); return null; } } @@ -454,30 +459,22 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { // private methods block start /** - * Based on maxRecords to read for the scan, - * figure out how many rowGroups to read - * and update number of records to read for each of them. + * Clones current group scan with set of file paths from given row groups, + * updates new scan with list of given row groups, + * re-calculates statistics and endpoint affinities. * - * @param maxRecords max records to read - * @return total number of rowGroups to read + * @param rowGroupInfos list of row group infos + * @return new parquet group scan */ - private int updateRowGroupInfo(int maxRecords) { - long count = 0; - int index = 0; - for (RowGroupInfo rowGroupInfo : rowGroupInfos) { - long rowCount = rowGroupInfo.getRowCount(); - if (count + rowCount <= maxRecords) { - count += rowCount; - rowGroupInfo.setNumRecordsToRead(rowCount); - index++; - continue; - } else if (count < maxRecords) { - rowGroupInfo.setNumRecordsToRead(maxRecords - count); - index++; - } - break; - } - return index; + private AbstractParquetGroupScan cloneWithRowGroupInfos(List<RowGroupInfo> rowGroupInfos) throws IOException { + Set<String> filePaths = rowGroupInfos.stream() + .map(ReadEntryWithPath::getPath) + .collect(Collectors.toSet()); // set keeps file names unique + AbstractParquetGroupScan scan = cloneWithFileSelection(filePaths); + scan.rowGroupInfos = rowGroupInfos; + scan.parquetGroupScanStatistics.collect(scan.rowGroupInfos, scan.parquetTableMetadata); + scan.endpointAffinities = AffinityCreator.getAffinityMap(scan.rowGroupInfos); + return scan; } // private methods block end diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLimitPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLimitPushDown.java new file mode 100644 index 0000000..7749796 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLimitPushDown.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet; + +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterFixtureBuilder; +import org.apache.drill.test.ClusterTest; +import org.apache.drill.test.QueryBuilder; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.nio.file.Paths; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestParquetLimitPushDown extends ClusterTest { + + @BeforeClass + public static void setup() throws Exception { + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher); + dirTestWatcher.copyResourceToRoot(Paths.get("parquet", "multirowgroup.parquet")); + dirTestWatcher.copyResourceToRoot(Paths.get("parquet", "users")); + startCluster(builder); + } + + @Test + public void testMultipleFiles() throws Exception { + String query = "select * from dfs.`parquet/users` limit 1"; + QueryBuilder.QuerySummary summary = queryBuilder().sql(query).run(); + assertTrue(summary.succeeded()); + assertEquals(1, summary.recordCount()); + + String plan = queryBuilder().sql(query).explainText(); + assertTrue(plan.contains("numRowGroups=1")); + } + + @Test + public void testMultipleRowGroups() throws Exception { + String query = "select * from dfs.`parquet/multirowgroup.parquet` limit 1"; + QueryBuilder.QuerySummary summary = queryBuilder().sql(query).run(); + assertTrue(summary.succeeded()); + assertEquals(1, summary.recordCount()); + + String plan = queryBuilder().sql(query).explainText(); + assertTrue(plan.contains("numRowGroups=1")); + } + + @Test + public void testLimitZero() throws Exception { + String query = "select * from dfs.`parquet/users` limit 0"; + QueryBuilder.QuerySummary summary = queryBuilder().sql(query).run(); + assertTrue(summary.succeeded()); + assertEquals(0, summary.recordCount()); + + String plan = queryBuilder().sql(query).explainText(); + assertTrue(plan.contains("numRowGroups=1")); + } + +}
