This is an automated email from the ASF dual-hosted git repository. volodymyr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit d1a082cd11c79497449fda06189cd00d3510b2e9 Author: Volodymyr Vysotskyi <[email protected]> AuthorDate: Wed Nov 21 14:08:17 2018 +0200 DRILL-6865: Query returns wrong result when filter pruning happens --- .../store/parquet/AbstractParquetGroupScan.java | 76 +++++++++++++++------- .../exec/store/parquet/ParquetFilterBuilder.java | 21 ++++-- .../exec/store/parquet/ParquetPushDownFilter.java | 41 ++++++++++-- .../store/parquet/ParquetRGFilterEvaluator.java | 4 +- .../store/parquet/TestParquetFilterPushDown.java | 5 ++ 5 files changed, 112 insertions(+), 35 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java index 0d35ddb..1bbf63b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java @@ -247,7 +247,11 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { final List<RowGroupInfo> qualifiedRGs = new ArrayList<>(rowGroupInfos.size()); - ParquetFilterPredicate filterPredicate = null; + ParquetFilterPredicate filterPredicate = getParquetFilterPredicate(filterExpr, udfUtilities, functionImplementationRegistry, optionManager, true); + + if (filterPredicate == null) { + return null; + } for (RowGroupInfo rowGroup : rowGroupInfos) { final ColumnExplorer columnExplorer = new ColumnExplorer(optionManager, columns); @@ -261,27 +265,8 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { Map<SchemaPath, ColumnStatistics> columnStatisticsMap = statCollector.collectColStat(schemaPathsInExpr); - if (filterPredicate == null) { - ErrorCollector errorCollector = new ErrorCollectorImpl(); - LogicalExpression materializedFilter = ExpressionTreeMaterializer.materializeFilterExpr( - filterExpr, columnStatisticsMap, errorCollector, functionImplementationRegistry); - - if (errorCollector.hasErrors()) { - logger.error("{} error(s) encountered when materialize filter expression : {}", - errorCollector.getErrorCount(), errorCollector.toErrorString()); - return null; - } - logger.debug("materializedFilter : {}", ExpressionStringBuilder.toString(materializedFilter)); - - Set<LogicalExpression> constantBoundaries = ConstantExpressionIdentifier.getConstantExpressionSet(materializedFilter); - filterPredicate = ParquetFilterBuilder.buildParquetFilterPredicate(materializedFilter, constantBoundaries, udfUtilities); - - if (filterPredicate == null) { - return null; - } - } - - ParquetFilterPredicate.RowsMatch match = ParquetRGFilterEvaluator.matches(filterPredicate, columnStatisticsMap, rowGroup.getRowCount(), parquetTableMetadata, rowGroup.getColumns(), schemaPathsInExpr); + ParquetFilterPredicate.RowsMatch match = ParquetRGFilterEvaluator.matches(filterPredicate, + columnStatisticsMap, rowGroup.getRowCount(), parquetTableMetadata, rowGroup.getColumns(), schemaPathsInExpr); if (match == ParquetFilterPredicate.RowsMatch.NONE) { continue; // No row comply to the filter => drop the row group } @@ -310,6 +295,53 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { return null; } } + + /** + * Returns parquet filter predicate built from specified {@code filterExpr}. + * + * @param filterExpr filter expression to build + * @param udfUtilities udf utilities + * @param functionImplementationRegistry context to find drill function holder + * @param optionManager option manager + * @param omitUnsupportedExprs whether expressions which cannot be converted + * may be omitted from the resulting expression + * @return parquet filter predicate + */ + public ParquetFilterPredicate getParquetFilterPredicate(LogicalExpression filterExpr, + UdfUtilities udfUtilities, FunctionImplementationRegistry functionImplementationRegistry, + OptionManager optionManager, boolean omitUnsupportedExprs) { + // used first row group to receive fields list + assert rowGroupInfos.size() > 0 : "row groups count cannot be 0"; + RowGroupInfo rowGroup = rowGroupInfos.iterator().next(); + ColumnExplorer columnExplorer = new ColumnExplorer(optionManager, columns); + + Map<String, String> implicitColValues = columnExplorer.populateImplicitColumns( + rowGroup.getPath(), + getPartitionValues(rowGroup), + supportsFileImplicitColumns()); + + ParquetMetaStatCollector statCollector = new ParquetMetaStatCollector( + parquetTableMetadata, + rowGroup.getColumns(), + implicitColValues); + + Set<SchemaPath> schemaPathsInExpr = filterExpr.accept(new ParquetRGFilterEvaluator.FieldReferenceFinder(), null); + Map<SchemaPath, ColumnStatistics> columnStatisticsMap = statCollector.collectColStat(schemaPathsInExpr); + + ErrorCollector errorCollector = new ErrorCollectorImpl(); + LogicalExpression materializedFilter = ExpressionTreeMaterializer.materializeFilterExpr( + filterExpr, columnStatisticsMap, errorCollector, functionImplementationRegistry); + + if (errorCollector.hasErrors()) { + logger.error("{} error(s) encountered when materialize filter expression : {}", + errorCollector.getErrorCount(), errorCollector.toErrorString()); + return null; + } + logger.debug("materializedFilter : {}", ExpressionStringBuilder.toString(materializedFilter)); + + Set<LogicalExpression> constantBoundaries = ConstantExpressionIdentifier.getConstantExpressionSet(materializedFilter); + return ParquetFilterBuilder.buildParquetFilterPredicate(materializedFilter, constantBoundaries, udfUtilities, omitUnsupportedExprs); + } // filter push down methods block end // limit push down methods start diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFilterBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFilterBuilder.java index f0f1029..86e207f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFilterBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFilterBuilder.java @@ -63,6 +63,12 @@ public class ParquetFilterBuilder extends AbstractExprVisitor<LogicalExpression, static final Logger logger = LoggerFactory.getLogger(ParquetFilterBuilder.class); private final UdfUtilities udfUtilities; + // Flag to check whether predicate cannot be fully converted + // to parquet filter predicate without omitting its parts. + // It should be set to false for the case when we want to + // verify that predicate is fully convertible to parquet filter predicate, + // otherwise null is returned instead of the converted expression. + private final boolean omitUnsupportedExprs; /** * @param expr materialized filter expression @@ -71,18 +77,24 @@ public class ParquetFilterBuilder extends AbstractExprVisitor<LogicalExpression, * * @return parquet filter predicate */ - public static ParquetFilterPredicate buildParquetFilterPredicate(LogicalExpression expr, final Set<LogicalExpression> constantBoundaries, UdfUtilities udfUtilities) { - LogicalExpression logicalExpression = expr.accept(new ParquetFilterBuilder(udfUtilities), constantBoundaries); + public static ParquetFilterPredicate buildParquetFilterPredicate(LogicalExpression expr, + Set<LogicalExpression> constantBoundaries, UdfUtilities udfUtilities, boolean omitUnsupportedExprs) { + LogicalExpression logicalExpression = + expr.accept(new ParquetFilterBuilder(udfUtilities, omitUnsupportedExprs), constantBoundaries); if (logicalExpression instanceof ParquetFilterPredicate) { return (ParquetFilterPredicate) logicalExpression; + } else if (logicalExpression instanceof TypedFieldExpr) { + // Calcite simplifies `= true` expression to field name, wrap it with is true predicate + return (ParquetFilterPredicate) ParquetIsPredicate.createIsPredicate(FunctionGenerationHelper.IS_TRUE, logicalExpression); } logger.debug("Logical expression {} was not qualified for filter push down", logicalExpression); return null; } - private ParquetFilterBuilder(UdfUtilities udfUtilities) { + private ParquetFilterBuilder(UdfUtilities udfUtilities, boolean omitUnsupportedExprs) { this.udfUtilities = udfUtilities; + this.omitUnsupportedExprs = omitUnsupportedExprs; } @Override @@ -159,8 +171,9 @@ public class ParquetFilterBuilder extends AbstractExprVisitor<LogicalExpression, for (LogicalExpression arg : op.args) { LogicalExpression childPredicate = arg.accept(this, value); if (childPredicate == null) { - if (functionName.equals("booleanOr")) { + if (functionName.equals("booleanOr") || !omitUnsupportedExprs) { // we can't include any leg of the OR if any of the predicates cannot be converted + // or prohibited omitting of unconverted operands return null; } } else { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java index c59cdce..95a0534 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java @@ -134,13 +134,32 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule { // get a conjunctions of the filter condition. For each conjunction, if it refers to ITEM or FLATTEN expression // then we could not pushed down. Otherwise, it's qualified to be pushed down. - final List<RexNode> predList = RelOptUtil.conjunctions(condition); + final List<RexNode> predList = RelOptUtil.conjunctions(RexUtil.toCnf(filter.getCluster().getRexBuilder(), condition)); final List<RexNode> qualifiedPredList = new ArrayList<>(); - for (final RexNode pred : predList) { + // list of predicates which cannot be converted to parquet filter predicate + List<RexNode> nonConvertedPredList = new ArrayList<>(); + + for (RexNode pred : predList) { if (DrillRelOptUtil.findOperators(pred, Collections.emptyList(), BANNED_OPERATORS) == null) { + LogicalExpression drillPredicate = DrillOptiq.toDrill( + new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, pred); + + // checks whether predicate may be used for filter pushdown + ParquetFilterPredicate parquetFilterPredicate = + groupScan.getParquetFilterPredicate(drillPredicate, + optimizerContext, + optimizerContext.getFunctionRegistry(), + optimizerContext.getPlannerSettings().getOptions(), false); + // collects predicates that contain unsupported for filter pushdown expressions + // to build filter with them + if (parquetFilterPredicate == null) { + nonConvertedPredList.add(pred); + } qualifiedPredList.add(pred); + } else { + nonConvertedPredList.add(pred); } } @@ -155,7 +174,7 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule { Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; - final GroupScan newGroupScan = groupScan.applyFilter(conditionExp,optimizerContext, + final GroupScan newGroupScan = groupScan.applyFilter(conditionExp, optimizerContext, optimizerContext.getFunctionRegistry(), optimizerContext.getPlannerSettings().getOptions()); if (timer != null) { logger.debug("Took {} ms to apply filter on parquet row groups. ", timer.elapsed(TimeUnit.MILLISECONDS)); @@ -166,10 +185,10 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule { return; } - RelNode newScan = new ScanPrel(scan.getCluster(), scan.getTraitSet(), newGroupScan, scan.getRowType(), scan.getTable()); + RelNode newNode = new ScanPrel(scan.getCluster(), scan.getTraitSet(), newGroupScan, scan.getRowType(), scan.getTable()); if (project != null) { - newScan = project.copy(project.getTraitSet(), Collections.singletonList(newScan)); + newNode = project.copy(project.getTraitSet(), Collections.singletonList(newNode)); } if (newGroupScan instanceof AbstractParquetGroupScan) { @@ -182,12 +201,20 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule { } } if (matchAll == ParquetFilterPredicate.RowsMatch.ALL) { - call.transformTo(newScan); + // creates filter from the expressions which can't be pushed to the scan + if (nonConvertedPredList.size() > 0) { + newNode = filter.copy(filter.getTraitSet(), newNode, + RexUtil.composeConjunction( + filter.getCluster().getRexBuilder(), + nonConvertedPredList, + true)); + } + call.transformTo(newNode); return; } } - final RelNode newFilter = filter.copy(filter.getTraitSet(), Collections.singletonList(newScan)); + final RelNode newFilter = filter.copy(filter.getTraitSet(), Collections.singletonList(newNode)); call.transformTo(newFilter); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java index 281e865..0125149 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java @@ -87,8 +87,8 @@ public class ParquetRGFilterEvaluator { } Set<LogicalExpression> constantBoundaries = ConstantExpressionIdentifier.getConstantExpressionSet(materializedFilter); - ParquetFilterPredicate parquetPredicate = (ParquetFilterPredicate) ParquetFilterBuilder.buildParquetFilterPredicate( - materializedFilter, constantBoundaries, udfUtilities); + ParquetFilterPredicate parquetPredicate = ParquetFilterBuilder.buildParquetFilterPredicate( + materializedFilter, constantBoundaries, udfUtilities, true); return matches(parquetPredicate, columnStatisticsMap, rowCount); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java index ea12f40..ccc1480 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java @@ -70,6 +70,7 @@ public class TestParquetFilterPushDown extends PlanTestBase { @AfterClass public static void teardown() throws IOException { + fragContext.close(); fs.close(); } @@ -294,6 +295,10 @@ public class TestParquetFilterPushDown extends PlanTestBase { PlanTestBase.testPlanMatchingPatterns(sql + "a < 1 or a > 1", new String[]{"numRowGroups=3"}); // No filter pruning PlanTestBase.testPlanMatchingPatterns(sql + "a < 1 or a > 2", new String[]{"numRowGroups=2"}, new String[]{"Filter\\("}); //Filter pruning + + // Partial filter pruning + testParquetFilterPruning(sql + "a >=1 and cast(a as varchar) like '%3%'", 1, 2, new String[]{">\\($1, 1\\)"}); + testParquetFilterPruning(sql + "a >=1 and a/3>=1", 2, 2, new String[]{">\\($1, 1\\)"}); } @Test
