paul-rogers commented on a change in pull request #1914: DRILL-7458: Base framework for storage plugins URL: https://github.com/apache/drill/pull/1914#discussion_r369342360
########## File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/base/filter/FilterPushDownStrategy.java ########## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.base.filter; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.util.Pair; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.ops.OptimizerRulesContext; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.planner.common.DrillRelOptUtil; +import org.apache.drill.exec.planner.logical.DrillFilterRel; +import org.apache.drill.exec.planner.logical.DrillOptiq; +import org.apache.drill.exec.planner.logical.DrillParseContext; +import org.apache.drill.exec.planner.logical.DrillProjectRel; +import org.apache.drill.exec.planner.logical.DrillScanRel; +import org.apache.drill.exec.planner.logical.RelOptHelper; +import org.apache.drill.exec.planner.physical.FilterPrel; +import org.apache.drill.exec.planner.physical.PrelUtil; +import org.apache.drill.exec.store.StoragePluginOptimizerRule; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; + +/** + * Generalized filter push-down strategy which performs all the tree-walking + * and tree restructuring work, allowing a "listener" to do the work needed + * for a particular scan. + * <p> + * General usage in a storage plugin: <code><pre> + * public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules( + * OptimizerRulesContext optimizerRulesContext) { + * return FilterPushDownStrategy.rulesFor(optimizerRulesContext, + * new MyPushDownListener(...)); + * } + * </pre></code> + */ + +public class FilterPushDownStrategy { + + private static final Collection<String> BANNED_OPERATORS = + Lists.newArrayList("flatten"); + + /** + * Base rule that passes target information to the push-down strategy + */ + + private static abstract class AbstractFilterPushDownRule extends StoragePluginOptimizerRule { + + protected final FilterPushDownStrategy strategy; + + public AbstractFilterPushDownRule(RelOptRuleOperand operand, String description, + FilterPushDownStrategy strategy) { + super(operand, description); + this.strategy = strategy; + } + } + + /** + * Calcite rule for FILTER --> PROJECT --> SCAN + */ + + private static class ProjectAndFilterRule extends AbstractFilterPushDownRule { + + private ProjectAndFilterRule(FilterPushDownStrategy strategy) { + super(RelOptHelper.some(FilterPrel.class, RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))), + strategy.namePrefix() + "PushDownFilter:Filter_On_Project", + strategy); + } + + @Override + public boolean matches(RelOptRuleCall call) { + if (!super.matches(call)) { + return false; + } + DrillScanRel scan = call.rel(2); + return strategy.isTargetScan(scan); + } + + @Override + public void onMatch(RelOptRuleCall call) { + DrillFilterRel filterRel = call.rel(0); + DrillProjectRel projectRel = call.rel(1); + DrillScanRel scanRel = call.rel(2); + strategy.onMatch(call, filterRel, projectRel, scanRel); + } + } + + /** + * Calcite rule for FILTER --> SCAN + */ + + private static class FilterWithoutProjectRule extends AbstractFilterPushDownRule { + + private FilterWithoutProjectRule(FilterPushDownStrategy strategy) { + super(RelOptHelper.some(DrillFilterRel.class, RelOptHelper.any(DrillScanRel.class)), + strategy.namePrefix() + "PushDownFilter:Filter_On_Scan", + strategy); + } + + @Override + public boolean matches(RelOptRuleCall call) { + if (!super.matches(call)) { + return false; + } + DrillScanRel scan = call.rel(1); + return strategy.isTargetScan(scan); + } + + @Override + public void onMatch(RelOptRuleCall call) { + DrillFilterRel filterRel = call.rel(0); + DrillScanRel scanRel = call.rel(1); + strategy.onMatch(call, filterRel, null, scanRel); + } + } + + private final FilterPushDownListener listener; + + public FilterPushDownStrategy(FilterPushDownListener listener) { + this.listener = listener; + } + + public Set<StoragePluginOptimizerRule> rules() { + return ImmutableSet.of( + new ProjectAndFilterRule(this), + new FilterWithoutProjectRule(this)); + } + + public static Set<StoragePluginOptimizerRule> rulesFor( + OptimizerRulesContext optimizerContext, FilterPushDownListener listener) { + return new FilterPushDownStrategy(listener).rules(); + } + + private String namePrefix() { return listener.prefix(); } + + private boolean isTargetScan(DrillScanRel scan) { + return listener.isTargetScan(scan.getGroupScan()); + } + + public void onMatch(RelOptRuleCall call, DrillFilterRel filter, DrillProjectRel project, DrillScanRel scan) { + + // Skip if rule has already been applied. + + if (!listener.needsApplication(scan.getGroupScan())) { Review comment: Thanks for pointing this out. It is not very obvious what Calcite might be doing. In which planner phase will the above transform occur? I tried doing the filter push down in the physical planning phase, but that is too late to affect parallelization. So, I shifted it to the logical phase in which (presumably) we have the original query blocks before moving things around. What I need is a phase after the above join planning completes, but before the physical planning phase. Which phase might that be? One of the limitations of rules-based engines (discussed in the Cascades planner paper which was a follow up to the classic Volcano planner paper) is that they tend to need lots of phases to get things to occur in the right order. I hope we don't need to introduce a new phase to solve this problem. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
