DRILL-1386: Push directory filters into group scan for filesystem queries.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/9628f9bb Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/9628f9bb Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/9628f9bb Branch: refs/heads/smp-merge-230914 Commit: 9628f9bb5b8c5a9d842743bc2d127b3306be5c4b Parents: 8def6e9 Author: Aman Sinha <asi...@maprtech.com> Authored: Thu Sep 11 13:02:16 2014 -0700 Committer: Aman Sinha <asi...@maprtech.com> Committed: Wed Sep 24 13:22:58 2014 -0700 ---------------------------------------------------------------------- .../org/apache/drill/exec/ExecConstants.java | 3 +- .../physical/base/AbstractFileGroupScan.java | 43 ++++ .../exec/physical/base/AbstractGroupScan.java | 7 + .../drill/exec/physical/base/FileGroupScan.java | 29 +++ .../drill/exec/physical/base/GroupScan.java | 5 + .../exec/planner/common/DrillScanRelBase.java | 10 + .../exec/planner/logical/DirPathBuilder.java | 257 +++++++++++++++++++ .../DrillPushPartitionFilterIntoScan.java | 138 ++++++++++ .../exec/planner/logical/DrillRuleSets.java | 1 + .../exec/planner/logical/DrillScanRel.java | 18 +- .../exec/planner/physical/PlannerSettings.java | 4 + .../server/options/SystemOptionManager.java | 1 + .../exec/store/dfs/easy/EasyGroupScan.java | 34 ++- .../exec/store/parquet/ParquetGroupScan.java | 22 +- .../org/apache/drill/TestExampleQueries.java | 1 + 15 files changed, 560 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 933bfbe..c210541 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -66,7 +66,6 @@ public interface ExecConstants { public static final String EXTERNAL_SORT_SPILL_FILESYSTEM = "drill.exec.sort.external.spill.fs"; public static final String TEXT_LINE_READER_BATCH_SIZE = "drill.exec.storage.file.text.batch.size"; public static final String TEXT_LINE_READER_BUFFER_SIZE = "drill.exec.storage.file.text.buffer.size"; - public static final String FILESYSTEM_PARTITION_COLUMN_LABEL = "drill.exec.storage.file.partition.column.label"; public static final String HAZELCAST_SUBNETS = "drill.exec.cache.hazel.subnets"; public static final String TOP_LEVEL_MAX_ALLOC = "drill.exec.memory.top.max"; public static final String HTTP_ENABLE = "drill.exec.http.enabled"; @@ -88,6 +87,8 @@ public interface ExecConstants { public static String JSON_ALL_TEXT_MODE = "store.json.all_text_mode"; public static OptionValidator JSON_READER_ALL_TEXT_MODE_VALIDATOR = new BooleanValidator(JSON_ALL_TEXT_MODE, false); + public static final String FILESYSTEM_PARTITION_COLUMN_LABEL = "drill.exec.storage.file.partition.column.label"; + public static final OptionValidator FILESYSTEM_PARTITION_COLUMN_LABEL_VALIDATOR = new StringValidator(FILESYSTEM_PARTITION_COLUMN_LABEL, "dir"); public static final String SLICE_TARGET = "planner.slice_target"; public static final OptionValidator SLICE_TARGET_OPTION = new PositiveLongValidator(SLICE_TARGET, Long.MAX_VALUE, 100000); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java new file mode 100644 index 0000000..ee809fc --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.base; + +import java.io.IOException; + +import org.apache.drill.exec.store.dfs.FileSelection; + +public abstract class AbstractFileGroupScan extends AbstractGroupScan implements FileGroupScan { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractFileGroupScan.class); + + + @Override + public void modifyFileSelection(FileSelection selection) { + throw new UnsupportedOperationException(); + } + + @Override + public FileGroupScan clone(FileSelection selection) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean supportsPartitionFilterPushdown() { + return true; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java index c9b8e80..5d0d9bf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java @@ -61,11 +61,18 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca return 0; } + @Override @JsonIgnore public boolean canPushdownProjects(List<SchemaPath> columns) { return false; } + @Override + @JsonIgnore + public boolean supportsPartitionFilterPushdown() { + return false; + } + /** * By default, throw exception, since group scan does not have exact column value count. */ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FileGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FileGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FileGroupScan.java new file mode 100644 index 0000000..552d1e8 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FileGroupScan.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.base; + +import java.io.IOException; + +import org.apache.drill.exec.store.dfs.FileSelection; + +public interface FileGroupScan extends GroupScan { + + public void modifyFileSelection(FileSelection selection); + + public FileGroupScan clone(FileSelection selection) throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java index a88a5ec..2f94995 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java @@ -68,4 +68,9 @@ public interface GroupScan extends Scan, HasAffinity{ */ public long getColumnValueCount(SchemaPath column); + /** + * Whether or not this GroupScan supports pushdown of partition filters (directories for filesystems) + */ + public boolean supportsPartitionFilterPushdown(); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java index 0934818..33581a3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java @@ -38,4 +38,14 @@ public abstract class DrillScanRelBase extends TableAccessRelBase implements Dri assert drillTable != null; } + public DrillScanRelBase(Convention convention, RelOptCluster cluster, RelTraitSet traits, RelOptTable table, + DrillTable drillTable) { + super(cluster, traits, table); + this.drillTable = drillTable; + } + + public DrillTable getDrillTable() { + return drillTable; + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DirPathBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DirPathBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DirPathBuilder.java new file mode 100644 index 0000000..f911d7c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DirPathBuilder.java @@ -0,0 +1,257 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import java.util.List; + +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.planner.common.DrillScanRelBase; +import org.eigenbase.relopt.RelOptUtil; +import org.eigenbase.reltype.RelDataTypeField; +import org.eigenbase.rex.RexBuilder; +import org.eigenbase.rex.RexCall; +import org.eigenbase.rex.RexCorrelVariable; +import org.eigenbase.rex.RexDynamicParam; +import org.eigenbase.rex.RexFieldAccess; +import org.eigenbase.rex.RexInputRef; +import org.eigenbase.rex.RexLiteral; +import org.eigenbase.rex.RexLocalRef; +import org.eigenbase.rex.RexNode; +import org.eigenbase.rex.RexOver; +import org.eigenbase.rex.RexRangeRef; +import org.eigenbase.rex.RexUtil; +import org.eigenbase.rex.RexVisitorImpl; +import org.eigenbase.sql.SqlKind; +import org.eigenbase.sql.SqlSyntax; + +import com.google.common.collect.Lists; + +public class DirPathBuilder extends RexVisitorImpl <SchemaPath> { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DirPathBuilder.class); + + static final int MAX_NESTED_SUBDIRS = 10; // allow up to 10 nested sub-directories + static final String EMPTY_STRING = ""; + + final private DrillFilterRel filterRel; + final private DrillScanRelBase scanRel; + final private RexBuilder builder; + final private String dirLabel; + + private List<String> dirNameList = Lists.newArrayListWithExpectedSize(MAX_NESTED_SUBDIRS); + private List<RexNode> conjunctList = Lists.newArrayListWithExpectedSize(MAX_NESTED_SUBDIRS); + private List<String> dirPathList = Lists.newArrayList(); + private RexNode currentConjunct = null; // the current conjunct are we evaluating during visitor traversal + private RexNode finalCondition = null; // placeholder for the final filter condition + private boolean dirMatch = false; + + DirPathBuilder(DrillFilterRel filterRel, DrillScanRelBase scanRel, RexBuilder builder, String dirLabel) { + super(true); + this.filterRel = filterRel; + this.scanRel = scanRel; + this.builder = builder; + this.dirLabel = dirLabel; + this.finalCondition = filterRel.getCondition(); + } + + private void initPathComponents() { + for (int i=0; i < MAX_NESTED_SUBDIRS; i++) { + dirNameList.add(EMPTY_STRING); + conjunctList.add(null); + } + } + + /** + * Build a directory path string for filter conditions containing directory filters. + * For example, suppose we have directory hierarchy: + * {orders/2012/Jan...Dec, orders/2013/Jan...Dec, orders/2014/Jan...Dec} + * path will be built for following types of filters (More types of filters will be added in the future): + * 1. SELECT * FROM <path>/orders WHERE o_custkey = 5 AND dir0 = '2014' AND dir1 = 'June' + * 2. SELECT * FROM <path>/orders WHERE (dir0 = '2013' AND dir1 = 'June') OR (dir0 = '2014' AND dir1 = 'June') + * For (1) dirPath = <path>/orders/2014/June + * For (2) there are 2 dirPaths: {<path>/orders/2013/June, <path>/orders/2014/June} + * @return The list of strings corresponding to directory paths + */ + public List<String> getDirPath() { + List<RexNode> disjuncts = RelOptUtil.disjunctions(filterRel.getCondition()); + boolean buildDisjunction = false; + List<RexNode> newDisjunctList = Lists.newArrayList(); + + for (RexNode d : disjuncts) { // process the top-level disjuncts + List<RexNode> conjuncts = RelOptUtil.conjunctions(d); + String dirPath = EMPTY_STRING; + initPathComponents(); + + boolean buildConjunction = false; + + // go through the conjuncts to identify the directory filters + for (RexNode c : conjuncts) { + currentConjunct = c; + SchemaPath expr = c.accept(this); + + if (expr != null) { + logger.debug("Found directory filter: " + expr.getRootSegment().getPath()); + } + } + + String prevPath = dirNameList.get(0); + + // compose the final path string + for (int i = 0; i < dirNameList.size(); i++) { + String path = dirNameList.get(i); + if (i > 0) { + prevPath = dirNameList.get(i-1); + } + // Check if both the current path and the previous path are non-empty; currently + // we will only push a dir<N> filter if dir<N-1> filter is also specified + if (!path.equals(EMPTY_STRING) && !prevPath.equals(EMPTY_STRING)) { + dirPath += "/" + path; + + // since we are pushing this directory filter we should remove it from the + // list of conjuncts + RexNode thisConjunct = conjunctList.get(i); + conjuncts.remove(thisConjunct); + buildConjunction = true; + } + } + if (!dirPath.equals(EMPTY_STRING)) { + dirPathList.add(dirPath); + } + if (buildConjunction) { + RexNode newConjunct = RexUtil.composeConjunction(builder, conjuncts, false); + newDisjunctList.add(newConjunct); + buildDisjunction = true; + } + + } // for (disjuncts) + + if (buildDisjunction) { + this.finalCondition = RexUtil.composeDisjunction(builder, newDisjunctList, false); + } + return dirPathList; + } + + public RexNode getFinalCondition() { + return finalCondition; + } + + @Override + public SchemaPath visitInputRef(RexInputRef inputRef) { + final int index = inputRef.getIndex(); + final RelDataTypeField field = scanRel.getRowType().getFieldList().get(index); + return FieldReference.getWithQuotedRef(field.getName()); + } + + @Override + public SchemaPath visitCall(RexCall call) { + logger.debug("RexCall {}, {}", call); + final SqlSyntax syntax = call.getOperator().getSyntax(); + switch (syntax) { + case BINARY: + if (call.getKind() == SqlKind.EQUALS) { + dirMatch = false; + // TODO: an assumption here is that the binary predicate is of the form <column> = <value>. + // In order to handle predicates of the form '<value> = <column>' we would need to canonicalize + // the predicate first before calling this function. + SchemaPath e1 = call.getOperands().get(0).accept(this); + + if (dirMatch && e1 != null) { + // get the index for the 'dir<N>' filter + String dirName = e1.getRootSegment().getPath(); + String suffix = dirName.substring(dirLabel.length()); // get the numeric suffix from 'dir<N>' + int suffixIndex = Integer.parseInt(suffix); + + if (suffixIndex >= MAX_NESTED_SUBDIRS) { + return null; + } + + // SchemaPath e2 = call.getOperands().get(1).accept(this); + if (call.getOperands().get(1).getKind() == SqlKind.LITERAL) { + String e2 = ((RexLiteral)call.getOperands().get(1)).getValue2().toString(); + dirNameList.set(suffixIndex, e2); + // dirNameList.set(suffixIndex, e2.getRootSegment().getPath()); + conjunctList.set(suffixIndex, currentConjunct); + return e1; + } + } + } + + return null; + + case SPECIAL: + switch(call.getKind()) { + case CAST: + return getInputFromCast(call); + + default: + + } + // fall through + default: + // throw new AssertionError("Unexpected expression"); + + } + return null; + } + + private SchemaPath getInputFromCast(RexCall call){ + SchemaPath arg = call.getOperands().get(0).accept(this); + + if (arg != null && arg.getRootSegment().getPath().matches(dirLabel+"[0-9]")) { + dirMatch = true; + return arg; + } + return null; + } + + @Override + public SchemaPath visitLocalRef(RexLocalRef localRef) { + return null; + } + + @Override + public SchemaPath visitOver(RexOver over) { + return null; + } + + @Override + public SchemaPath visitCorrelVariable(RexCorrelVariable correlVariable) { + return null; + } + + @Override + public SchemaPath visitDynamicParam(RexDynamicParam dynamicParam) { + return null; + } + + @Override + public SchemaPath visitRangeRef(RexRangeRef rangeRef) { + return null; + } + + @Override + public SchemaPath visitFieldAccess(RexFieldAccess fieldAccess) { + return super.visitFieldAccess(fieldAccess); + } + + @Override + public SchemaPath visitLiteral(RexLiteral literal) { + return FieldReference.getWithQuotedRef(literal.getValue2().toString()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java new file mode 100644 index 0000000..940b6c2 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.planner.logical; + +import java.io.IOException; +import java.util.List; + +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.exec.physical.base.FileGroupScan; +import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.apache.drill.exec.planner.physical.PrelUtil; +import org.apache.drill.exec.store.dfs.FileSelection; +import org.apache.drill.exec.store.dfs.FormatSelection; +import org.eigenbase.relopt.RelOptRule; +import org.eigenbase.relopt.RelOptRuleCall; +import org.eigenbase.rex.RexNode; + +import com.google.common.collect.Lists; + +public class DrillPushPartitionFilterIntoScan extends RelOptRule { + public static final RelOptRule INSTANCE = new DrillPushPartitionFilterIntoScan(); + + private DrillPushPartitionFilterIntoScan() { + super(RelOptHelper.some(DrillFilterRel.class, RelOptHelper.any(DrillScanRel.class)), "DrillPushPartitionFilterIntoScan"); + } + + private FormatSelection splitFilter(FormatSelection origSelection, DirPathBuilder builder) { + + List<String> origFiles = origSelection.getAsFiles(); + String pathPrefix = origSelection.getSelection().selectionRoot; + + List<String> newFiles = Lists.newArrayList(); + + List<String> dirPathList = builder.getDirPath(); + + for (String dirPath : dirPathList) { + String fullPath = pathPrefix + dirPath; + // check containment of this path in the list of files + for (String origFilePath : origFiles) { + String[] components = origFilePath.split(":"); // some paths are of the form 'file:<path>', so we need to split + assert (components.length <= 2); + String origFileName = ""; + if (components.length == 1) { + origFileName = components[0]; + } else if (components.length == 2) { + origFileName = components[1]; + } else { + assert false ; + } + if (origFileName.startsWith(fullPath)) { + newFiles.add(origFileName); + } + } + } + + if (newFiles.size() > 0) { + FileSelection newFileSelection = new FileSelection(newFiles, origSelection.getSelection().selectionRoot, true); + FormatSelection newFormatSelection = new FormatSelection(origSelection.getFormat(), newFileSelection); + return newFormatSelection; + } + + return origSelection; + } + + @Override + public boolean matches(RelOptRuleCall call) { + final DrillScanRel scan = (DrillScanRel) call.rel(1); + return scan.getGroupScan().supportsPartitionFilterPushdown(); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0); + final DrillScanRel scanRel = (DrillScanRel) call.rel(1); + + PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner()); + DirPathBuilder builder = new DirPathBuilder(filterRel, scanRel, filterRel.getCluster().getRexBuilder(), settings.getFsPartitionColumnLabel()); + + FormatSelection origSelection = (FormatSelection)scanRel.getDrillTable().getSelection(); + FormatSelection newSelection = splitFilter(origSelection, builder); + + if (origSelection == newSelection) { + return; // no directory filter was pushed down + } + + RexNode newFilterCondition = builder.getFinalCondition(); + + try { + FileGroupScan fgscan = ((FileGroupScan)scanRel.getGroupScan()).clone(newSelection.getSelection()); + + if (newFilterCondition.isAlwaysTrue()) { + // TODO: temporarily keep the original filter until we resolve bugs + newFilterCondition = filterRel.getCondition(); + } + /* + final DrillScanRel newScanRel = + new DrillScanRel(scanRel.getCluster(), + scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL), + scanRel.getTable(), + fgscan, + filterRel.getRowType(), + scanRel.getColumns()); + call.transformTo(newScanRel); + } else { + */ + final DrillScanRel newScanRel = + new DrillScanRel(scanRel.getCluster(), + scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL), + scanRel.getTable(), + fgscan, + scanRel.getRowType(), + scanRel.getColumns()); + final DrillFilterRel newFilterRel = new DrillFilterRel(filterRel.getCluster(), filterRel.getTraitSet(), newScanRel, newFilterCondition); + call.transformTo(newFilterRel); + // } + } catch (IOException e) { + throw new DrillRuntimeException(e) ; + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java index 1d3ce9a..ff1d5e4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java @@ -97,6 +97,7 @@ public class DrillRuleSets { // PushSortPastProjectRule.INSTANCE, // DrillPushProjIntoScan.INSTANCE, + DrillPushPartitionFilterIntoScan.INSTANCE, //////////////////////////////// DrillScanRule.INSTANCE, http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java index 5a0cc08..45e1058 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java @@ -52,6 +52,7 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel { final private RelDataType rowType; private GroupScan groupScan; + private List<SchemaPath> columns; /** Creates a DrillScan. */ public DrillScanRel(RelOptCluster cluster, RelTraitSet traits, @@ -66,13 +67,23 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel { RelOptTable table, RelDataType rowType, List<SchemaPath> columns) { super(DRILL_LOGICAL, cluster, traits, table); this.rowType = rowType; - columns = columns == null || columns.size() == 0 ? GroupScan.ALL_COLUMNS : columns; + this.columns = columns == null || columns.size() == 0 ? GroupScan.ALL_COLUMNS : columns; try { - this.groupScan = drillTable.getGroupScan().clone(columns); + this.groupScan = drillTable.getGroupScan().clone(this.columns); } catch (IOException e) { throw new DrillRuntimeException("Failure creating scan.", e); } } + + /** Creates a DrillScanRel for a particular GroupScan */ + public DrillScanRel(RelOptCluster cluster, RelTraitSet traits, + RelOptTable table, GroupScan groupScan, RelDataType rowType, List<SchemaPath> columns) { + super(DRILL_LOGICAL, cluster, traits, table); + this.rowType = rowType; + this.columns = columns; + this.groupScan = groupScan; + } + // // private static GroupScan getCopy(GroupScan scan){ // try { @@ -82,6 +93,9 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel { // } // } + public List<SchemaPath> getColumns() { + return this.columns; + } @Override public LogicalOperator implement(DrillImplementor implementor) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java index 5ec52c9..a036a46 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java @@ -121,6 +121,10 @@ public class PlannerSettings implements Context{ return options.getOption(ExecConstants.ENABLE_MEMORY_ESTIMATION_KEY).bool_val; } + public String getFsPartitionColumnLabel() { + return options.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val; + } + @Override public <T> T unwrap(Class<T> clazz) { if(clazz == PlannerSettings.class){ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 4fa61e1..cfb7d08 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -63,6 +63,7 @@ public class SystemOptionManager implements OptionManager { ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR, ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR, ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR, + ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL_VALIDATOR, ExecConstants.SLICE_TARGET_OPTION, ExecConstants.AFFINITY_FACTOR, ExecConstants.MAX_WIDTH_GLOBAL, http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java index 8efcd2c..35d1868 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java @@ -25,7 +25,8 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.physical.EndpointAffinity; -import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.AbstractFileGroupScan; +import org.apache.drill.exec.physical.base.FileGroupScan; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.ScanStats; @@ -49,12 +50,12 @@ import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; @JsonTypeName("fs-scan") -public class EasyGroupScan extends AbstractGroupScan{ +public class EasyGroupScan extends AbstractFileGroupScan{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyGroupScan.class); - private final FileSelection selection; + private FileSelection selection; private final EasyFormatPlugin<?> formatPlugin; - private final int maxWidth; + private int maxWidth; private List<SchemaPath> columns; private ListMultimap<Integer, CompleteFileWork> mappings; @@ -92,10 +93,7 @@ public class EasyGroupScan extends AbstractGroupScan{ this.formatPlugin = Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config."); this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : columns; this.selectionRoot = selectionRoot; - BlockMapBuilder b = new BlockMapBuilder(formatPlugin.getFileSystem().getUnderlying(), formatPlugin.getContext().getBits()); - this.chunks = b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), formatPlugin.isBlockSplittable()); - this.maxWidth = chunks.size(); - this.endpointAffinities = AffinityCreator.getAffinityMap(chunks); + initFromSelection(selection, formatPlugin); } private EasyGroupScan(EasyGroupScan that) { @@ -110,6 +108,14 @@ public class EasyGroupScan extends AbstractGroupScan{ mappings = that.mappings; } + private void initFromSelection(FileSelection selection, EasyFormatPlugin<?> formatPlugin) throws IOException { + this.selection = selection; + BlockMapBuilder b = new BlockMapBuilder(formatPlugin.getFileSystem().getUnderlying(), formatPlugin.getContext().getBits()); + this.chunks = b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), formatPlugin.isBlockSplittable()); + this.maxWidth = chunks.size(); + this.endpointAffinities = AffinityCreator.getAffinityMap(chunks); + } + public String getSelectionRoot() { return selectionRoot; } @@ -148,6 +154,11 @@ public class EasyGroupScan extends AbstractGroupScan{ } @Override + public void modifyFileSelection(FileSelection selection) { + this.selection = selection; + } + + @Override public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { assert children == null || children.isEmpty(); return new EasyGroupScan(this); @@ -222,6 +233,13 @@ public class EasyGroupScan extends AbstractGroupScan{ } @Override + public FileGroupScan clone(FileSelection selection) throws IOException { + EasyGroupScan newScan = new EasyGroupScan(this); + newScan.initFromSelection(selection, formatPlugin); + return newScan; + } + + @Override @JsonIgnore public boolean canPushdownProjects(List<SchemaPath> columns) { return formatPlugin.supportsPushDown(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index 86e5224..6d3cf5a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -31,13 +31,15 @@ import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.metrics.DrillMetrics; import org.apache.drill.exec.physical.EndpointAffinity; -import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.AbstractFileGroupScan; +import org.apache.drill.exec.physical.base.FileGroupScan; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.ScanStats; import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.exec.store.dfs.FileSelection; import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS; import org.apache.drill.exec.store.dfs.ReadEntryWithPath; import org.apache.drill.exec.store.dfs.easy.FileWork; @@ -70,7 +72,7 @@ import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; @JsonTypeName("parquet-scan") -public class ParquetGroupScan extends AbstractGroupScan { +public class ParquetGroupScan extends AbstractFileGroupScan { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class); static final MetricRegistry metrics = DrillMetrics.getInstance(); static final String READ_FOOTER_TIMER = MetricRegistry.name(ParquetGroupScan.class, "readFooter"); @@ -249,6 +251,14 @@ public class ParquetGroupScan extends AbstractGroupScan { return this.fs; } + @Override + public void modifyFileSelection(FileSelection selection) { + entries.clear(); + for (String fileName : selection.getAsFiles()) { + entries.add(new ReadEntryWithPath(fileName)); + } + } + public static class RowGroupInfo extends ReadEntryFromHDFS implements CompleteWork, FileWork { private EndpointByteMap byteMap; @@ -388,6 +398,14 @@ public class ParquetGroupScan extends AbstractGroupScan { } @Override + public FileGroupScan clone(FileSelection selection) throws IOException { + ParquetGroupScan newScan = new ParquetGroupScan(this); + newScan.modifyFileSelection(selection); + newScan.readFooterFromEntries(); + return newScan; + } + + @Override @JsonIgnore public boolean canPushdownProjects(List<SchemaPath> columns) { return true; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java index c54772f..388d057 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java @@ -474,4 +474,5 @@ public class TestExampleQueries extends BaseTestQuery{ public void testCase() throws Exception { test("select case when n_nationkey > 0 and n_nationkey < 2 then concat(n_name, '_abc') when n_nationkey >=2 and n_nationkey < 4 then '_EFG' else concat(n_name,'_XYZ') end from cp.`tpch/nation.parquet` ;"); } + }