DRILL-1386: Push directory filters into group scan for filesystem queries.

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/9628f9bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/9628f9bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/9628f9bb

Branch: refs/heads/smp-merge-230914
Commit: 9628f9bb5b8c5a9d842743bc2d127b3306be5c4b
Parents: 8def6e9
Author: Aman Sinha <asi...@maprtech.com>
Authored: Thu Sep 11 13:02:16 2014 -0700
Committer: Aman Sinha <asi...@maprtech.com>
Committed: Wed Sep 24 13:22:58 2014 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |   3 +-
 .../physical/base/AbstractFileGroupScan.java    |  43 ++++
 .../exec/physical/base/AbstractGroupScan.java   |   7 +
 .../drill/exec/physical/base/FileGroupScan.java |  29 +++
 .../drill/exec/physical/base/GroupScan.java     |   5 +
 .../exec/planner/common/DrillScanRelBase.java   |  10 +
 .../exec/planner/logical/DirPathBuilder.java    | 257 +++++++++++++++++++
 .../DrillPushPartitionFilterIntoScan.java       | 138 ++++++++++
 .../exec/planner/logical/DrillRuleSets.java     |   1 +
 .../exec/planner/logical/DrillScanRel.java      |  18 +-
 .../exec/planner/physical/PlannerSettings.java  |   4 +
 .../server/options/SystemOptionManager.java     |   1 +
 .../exec/store/dfs/easy/EasyGroupScan.java      |  34 ++-
 .../exec/store/parquet/ParquetGroupScan.java    |  22 +-
 .../org/apache/drill/TestExampleQueries.java    |   1 +
 15 files changed, 560 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 933bfbe..c210541 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -66,7 +66,6 @@ public interface ExecConstants {
   public static final String EXTERNAL_SORT_SPILL_FILESYSTEM = 
"drill.exec.sort.external.spill.fs";
   public static final String TEXT_LINE_READER_BATCH_SIZE = 
"drill.exec.storage.file.text.batch.size";
   public static final String TEXT_LINE_READER_BUFFER_SIZE = 
"drill.exec.storage.file.text.buffer.size";
-  public static final String FILESYSTEM_PARTITION_COLUMN_LABEL = 
"drill.exec.storage.file.partition.column.label";
   public static final String HAZELCAST_SUBNETS = 
"drill.exec.cache.hazel.subnets";
   public static final String TOP_LEVEL_MAX_ALLOC = "drill.exec.memory.top.max";
   public static final String HTTP_ENABLE = "drill.exec.http.enabled";
@@ -88,6 +87,8 @@ public interface ExecConstants {
   public static String JSON_ALL_TEXT_MODE = "store.json.all_text_mode";
   public static OptionValidator JSON_READER_ALL_TEXT_MODE_VALIDATOR = new 
BooleanValidator(JSON_ALL_TEXT_MODE, false);
 
+  public static final String FILESYSTEM_PARTITION_COLUMN_LABEL = 
"drill.exec.storage.file.partition.column.label";
+  public static final OptionValidator 
FILESYSTEM_PARTITION_COLUMN_LABEL_VALIDATOR = new 
StringValidator(FILESYSTEM_PARTITION_COLUMN_LABEL, "dir");
 
   public static final String SLICE_TARGET = "planner.slice_target";
   public static final OptionValidator SLICE_TARGET_OPTION = new 
PositiveLongValidator(SLICE_TARGET, Long.MAX_VALUE, 100000);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java
new file mode 100644
index 0000000..ee809fc
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.base;
+
+import java.io.IOException;
+
+import org.apache.drill.exec.store.dfs.FileSelection;
+
+public abstract class AbstractFileGroupScan extends AbstractGroupScan 
implements FileGroupScan {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractFileGroupScan.class);
+
+
+  @Override
+  public void modifyFileSelection(FileSelection selection) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public FileGroupScan clone(FileSelection selection) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean supportsPartitionFilterPushdown() {
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index c9b8e80..5d0d9bf 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -61,11 +61,18 @@ public abstract class AbstractGroupScan extends 
AbstractBase implements GroupSca
     return 0;
   }
 
+  @Override
   @JsonIgnore
   public boolean canPushdownProjects(List<SchemaPath> columns) {
     return false;
   }
 
+  @Override
+  @JsonIgnore
+  public boolean supportsPartitionFilterPushdown() {
+    return false;
+  }
+
   /**
    * By default, throw exception, since group scan does not have exact column 
value count.
    */

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FileGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FileGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FileGroupScan.java
new file mode 100644
index 0000000..552d1e8
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FileGroupScan.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.base;
+
+import java.io.IOException;
+
+import org.apache.drill.exec.store.dfs.FileSelection;
+
+public interface FileGroupScan extends GroupScan {
+
+  public void modifyFileSelection(FileSelection selection);
+
+  public FileGroupScan clone(FileSelection selection) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index a88a5ec..2f94995 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -68,4 +68,9 @@ public interface GroupScan extends Scan, HasAffinity{
    */
   public long getColumnValueCount(SchemaPath column);
 
+  /**
+   * Whether or not this GroupScan supports pushdown of partition filters 
(directories for filesystems)
+   */
+  public boolean supportsPartitionFilterPushdown();
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
index 0934818..33581a3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
@@ -38,4 +38,14 @@ public abstract class DrillScanRelBase extends 
TableAccessRelBase implements Dri
     assert drillTable != null;
   }
 
+  public DrillScanRelBase(Convention convention, RelOptCluster cluster, 
RelTraitSet traits, RelOptTable table,
+      DrillTable drillTable) {
+    super(cluster, traits, table);
+    this.drillTable = drillTable;
+  }
+
+  public DrillTable getDrillTable() {
+    return drillTable;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DirPathBuilder.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DirPathBuilder.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DirPathBuilder.java
new file mode 100644
index 0000000..f911d7c
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DirPathBuilder.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.List;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.planner.common.DrillScanRelBase;
+import org.eigenbase.relopt.RelOptUtil;
+import org.eigenbase.reltype.RelDataTypeField;
+import org.eigenbase.rex.RexBuilder;
+import org.eigenbase.rex.RexCall;
+import org.eigenbase.rex.RexCorrelVariable;
+import org.eigenbase.rex.RexDynamicParam;
+import org.eigenbase.rex.RexFieldAccess;
+import org.eigenbase.rex.RexInputRef;
+import org.eigenbase.rex.RexLiteral;
+import org.eigenbase.rex.RexLocalRef;
+import org.eigenbase.rex.RexNode;
+import org.eigenbase.rex.RexOver;
+import org.eigenbase.rex.RexRangeRef;
+import org.eigenbase.rex.RexUtil;
+import org.eigenbase.rex.RexVisitorImpl;
+import org.eigenbase.sql.SqlKind;
+import org.eigenbase.sql.SqlSyntax;
+
+import com.google.common.collect.Lists;
+
+public class DirPathBuilder extends RexVisitorImpl <SchemaPath> {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DirPathBuilder.class);
+
+  static final int MAX_NESTED_SUBDIRS = 10;          // allow up to 10 nested 
sub-directories
+  static final String EMPTY_STRING = "";
+
+  final private DrillFilterRel filterRel;
+  final private DrillScanRelBase scanRel;
+  final private RexBuilder builder;
+  final private String dirLabel;
+
+  private List<String> dirNameList = 
Lists.newArrayListWithExpectedSize(MAX_NESTED_SUBDIRS);
+  private List<RexNode> conjunctList = 
Lists.newArrayListWithExpectedSize(MAX_NESTED_SUBDIRS);
+  private List<String> dirPathList = Lists.newArrayList();
+  private RexNode currentConjunct = null;    // the current conjunct are we 
evaluating during visitor traversal
+  private RexNode finalCondition = null;     // placeholder for the final 
filter condition
+  private boolean dirMatch = false;
+
+  DirPathBuilder(DrillFilterRel filterRel, DrillScanRelBase scanRel, 
RexBuilder builder, String dirLabel) {
+    super(true);
+    this.filterRel = filterRel;
+    this.scanRel = scanRel;
+    this.builder = builder;
+    this.dirLabel = dirLabel;
+    this.finalCondition = filterRel.getCondition();
+  }
+
+  private void initPathComponents() {
+    for (int i=0; i < MAX_NESTED_SUBDIRS; i++) {
+      dirNameList.add(EMPTY_STRING);
+      conjunctList.add(null);
+    }
+  }
+
+  /**
+   * Build a directory path string for filter conditions containing directory 
filters.
+   * For example, suppose we have directory hierarchy:
+   * {orders/2012/Jan...Dec, orders/2013/Jan...Dec, orders/2014/Jan...Dec}
+   * path will be built for following types of filters (More types of filters 
will be added in the future):
+   *    1. SELECT * FROM <path>/orders WHERE o_custkey = 5 AND dir0 = '2014' 
AND dir1 = 'June'
+   *    2. SELECT * FROM <path>/orders WHERE (dir0 = '2013' AND dir1 = 'June') 
OR (dir0 = '2014' AND dir1 = 'June')
+   * For (1) dirPath =  <path>/orders/2014/June
+   * For (2) there are 2 dirPaths: {<path>/orders/2013/June, 
<path>/orders/2014/June}
+   * @return The list of strings corresponding to directory paths
+   */
+  public List<String> getDirPath() {
+    List<RexNode> disjuncts = 
RelOptUtil.disjunctions(filterRel.getCondition());
+    boolean buildDisjunction = false;
+    List<RexNode> newDisjunctList = Lists.newArrayList();
+
+    for (RexNode d : disjuncts) {  // process the top-level disjuncts
+      List<RexNode> conjuncts = RelOptUtil.conjunctions(d);
+      String dirPath = EMPTY_STRING;
+      initPathComponents();
+
+      boolean buildConjunction = false;
+
+      // go through the conjuncts to identify the directory filters
+      for (RexNode c : conjuncts) {
+        currentConjunct = c;
+        SchemaPath expr = c.accept(this);
+
+        if (expr != null) {
+          logger.debug("Found directory filter: " + 
expr.getRootSegment().getPath());
+        }
+      }
+
+      String prevPath = dirNameList.get(0);
+
+      // compose the final path string
+      for (int i = 0; i < dirNameList.size(); i++) {
+        String path = dirNameList.get(i);
+        if (i > 0) {
+          prevPath = dirNameList.get(i-1);
+        }
+        // Check if both the current path and the previous path are non-empty; 
currently
+        // we will only push a dir<N> filter if dir<N-1> filter is also 
specified
+        if (!path.equals(EMPTY_STRING) && !prevPath.equals(EMPTY_STRING)) {
+          dirPath += "/" + path;
+
+          // since we are pushing this directory filter we should remove it 
from the
+          // list of conjuncts
+          RexNode thisConjunct = conjunctList.get(i);
+          conjuncts.remove(thisConjunct);
+          buildConjunction = true;
+        }
+      }
+      if (!dirPath.equals(EMPTY_STRING)) {
+        dirPathList.add(dirPath);
+      }
+      if (buildConjunction) {
+        RexNode newConjunct = RexUtil.composeConjunction(builder, conjuncts, 
false);
+        newDisjunctList.add(newConjunct);
+        buildDisjunction = true;
+      }
+
+    } // for (disjuncts)
+
+    if (buildDisjunction) {
+      this.finalCondition = RexUtil.composeDisjunction(builder, 
newDisjunctList, false);
+    }
+    return dirPathList;
+  }
+
+  public RexNode getFinalCondition() {
+    return finalCondition;
+  }
+
+  @Override
+  public SchemaPath visitInputRef(RexInputRef inputRef) {
+    final int index = inputRef.getIndex();
+    final RelDataTypeField field = 
scanRel.getRowType().getFieldList().get(index);
+    return FieldReference.getWithQuotedRef(field.getName());
+  }
+
+  @Override
+  public SchemaPath visitCall(RexCall call) {
+    logger.debug("RexCall {}, {}", call);
+    final SqlSyntax syntax = call.getOperator().getSyntax();
+    switch (syntax) {
+    case BINARY:
+      if (call.getKind() == SqlKind.EQUALS) {
+        dirMatch = false;
+        // TODO: an assumption here is that the binary predicate is of the 
form <column> = <value>.
+        // In order to handle predicates of the form '<value> = <column>' we 
would need to canonicalize
+        // the predicate first before calling this function.
+        SchemaPath e1 = call.getOperands().get(0).accept(this);
+
+        if (dirMatch && e1 != null) {
+          // get the index for the 'dir<N>' filter
+          String dirName = e1.getRootSegment().getPath();
+          String suffix = dirName.substring(dirLabel.length()); // get the 
numeric suffix from 'dir<N>'
+          int suffixIndex = Integer.parseInt(suffix);
+
+          if (suffixIndex >= MAX_NESTED_SUBDIRS) {
+            return null;
+          }
+
+          // SchemaPath e2 = call.getOperands().get(1).accept(this);
+          if (call.getOperands().get(1).getKind() == SqlKind.LITERAL) {
+            String e2 = 
((RexLiteral)call.getOperands().get(1)).getValue2().toString();
+            dirNameList.set(suffixIndex, e2);
+            // dirNameList.set(suffixIndex, e2.getRootSegment().getPath());
+            conjunctList.set(suffixIndex, currentConjunct);
+            return e1;
+          }
+        }
+      }
+
+      return null;
+
+    case SPECIAL:
+      switch(call.getKind()) {
+      case CAST:
+        return getInputFromCast(call);
+
+      default:
+
+      }
+      // fall through
+    default:
+      // throw new AssertionError("Unexpected expression");
+
+    }
+    return null;
+  }
+
+  private SchemaPath getInputFromCast(RexCall call){
+    SchemaPath arg = call.getOperands().get(0).accept(this);
+
+    if (arg != null && 
arg.getRootSegment().getPath().matches(dirLabel+"[0-9]")) {
+      dirMatch = true;
+      return arg;
+    }
+    return null;
+  }
+
+  @Override
+  public SchemaPath visitLocalRef(RexLocalRef localRef) {
+    return null;
+  }
+
+  @Override
+  public SchemaPath visitOver(RexOver over) {
+    return null;
+  }
+
+  @Override
+  public SchemaPath visitCorrelVariable(RexCorrelVariable correlVariable) {
+    return null;
+  }
+
+  @Override
+  public SchemaPath visitDynamicParam(RexDynamicParam dynamicParam) {
+    return null;
+  }
+
+  @Override
+  public SchemaPath visitRangeRef(RexRangeRef rangeRef) {
+    return null;
+  }
+
+  @Override
+  public SchemaPath visitFieldAccess(RexFieldAccess fieldAccess) {
+    return super.visitFieldAccess(fieldAccess);
+  }
+
+  @Override
+  public SchemaPath visitLiteral(RexLiteral literal) {
+    return FieldReference.getWithQuotedRef(literal.getValue2().toString());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java
new file mode 100644
index 0000000..940b6c2
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.planner.logical;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.physical.base.FileGroupScan;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.rex.RexNode;
+
+import com.google.common.collect.Lists;
+
+public class DrillPushPartitionFilterIntoScan extends RelOptRule {
+  public static final RelOptRule INSTANCE = new 
DrillPushPartitionFilterIntoScan();
+
+  private DrillPushPartitionFilterIntoScan() {
+    super(RelOptHelper.some(DrillFilterRel.class, 
RelOptHelper.any(DrillScanRel.class)), "DrillPushPartitionFilterIntoScan");
+  }
+
+  private FormatSelection splitFilter(FormatSelection origSelection, 
DirPathBuilder builder) {
+
+    List<String> origFiles = origSelection.getAsFiles();
+    String pathPrefix = origSelection.getSelection().selectionRoot;
+
+    List<String> newFiles = Lists.newArrayList();
+
+    List<String> dirPathList = builder.getDirPath();
+
+    for (String dirPath : dirPathList) {
+      String fullPath = pathPrefix + dirPath;
+      // check containment of this path in the list of files
+      for (String origFilePath : origFiles) {
+        String[] components = origFilePath.split(":"); // some paths are of 
the form 'file:<path>', so we need to split
+        assert (components.length <= 2);
+        String origFileName = "";
+        if (components.length == 1) {
+          origFileName = components[0];
+        } else if (components.length == 2) {
+          origFileName = components[1];
+        } else {
+          assert false ;
+        }
+        if (origFileName.startsWith(fullPath)) {
+          newFiles.add(origFileName);
+        }
+      }
+    }
+
+    if (newFiles.size() > 0) {
+      FileSelection newFileSelection = new FileSelection(newFiles, 
origSelection.getSelection().selectionRoot, true);
+      FormatSelection newFormatSelection = new 
FormatSelection(origSelection.getFormat(), newFileSelection);
+      return newFormatSelection;
+    }
+
+    return origSelection;
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    final DrillScanRel scan = (DrillScanRel) call.rel(1);
+    return scan.getGroupScan().supportsPartitionFilterPushdown();
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
+    final DrillScanRel scanRel = (DrillScanRel) call.rel(1);
+
+    PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+    DirPathBuilder builder = new DirPathBuilder(filterRel, scanRel, 
filterRel.getCluster().getRexBuilder(), settings.getFsPartitionColumnLabel());
+
+    FormatSelection origSelection = 
(FormatSelection)scanRel.getDrillTable().getSelection();
+    FormatSelection newSelection = splitFilter(origSelection, builder);
+
+    if (origSelection == newSelection) {
+      return; // no directory filter was pushed down
+    }
+
+    RexNode newFilterCondition = builder.getFinalCondition();
+
+    try {
+      FileGroupScan fgscan = 
((FileGroupScan)scanRel.getGroupScan()).clone(newSelection.getSelection());
+
+      if (newFilterCondition.isAlwaysTrue()) {
+        // TODO: temporarily keep the original filter until we resolve bugs
+        newFilterCondition = filterRel.getCondition();
+      }
+        /*
+        final DrillScanRel newScanRel =
+            new DrillScanRel(scanRel.getCluster(),
+                scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+                scanRel.getTable(),
+                fgscan,
+                filterRel.getRowType(),
+                scanRel.getColumns());
+        call.transformTo(newScanRel);
+      } else {
+      */
+      final DrillScanRel newScanRel =
+          new DrillScanRel(scanRel.getCluster(),
+              scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+              scanRel.getTable(),
+              fgscan,
+              scanRel.getRowType(),
+              scanRel.getColumns());
+      final DrillFilterRel newFilterRel = new 
DrillFilterRel(filterRel.getCluster(), filterRel.getTraitSet(), newScanRel, 
newFilterCondition);
+      call.transformTo(newFilterRel);
+      // }
+    } catch (IOException e) {
+      throw new DrillRuntimeException(e) ;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index 1d3ce9a..ff1d5e4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -97,6 +97,7 @@ public class DrillRuleSets {
 //      PushSortPastProjectRule.INSTANCE, //
 
       DrillPushProjIntoScan.INSTANCE,
+      DrillPushPartitionFilterIntoScan.INSTANCE,
 
       ////////////////////////////////
       DrillScanRule.INSTANCE,

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
index 5a0cc08..45e1058 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
@@ -52,6 +52,7 @@ public class DrillScanRel extends DrillScanRelBase implements 
DrillRel {
 
   final private RelDataType rowType;
   private GroupScan groupScan;
+  private List<SchemaPath> columns;
 
   /** Creates a DrillScan. */
   public DrillScanRel(RelOptCluster cluster, RelTraitSet traits,
@@ -66,13 +67,23 @@ public class DrillScanRel extends DrillScanRelBase 
implements DrillRel {
       RelOptTable table, RelDataType rowType, List<SchemaPath> columns) {
     super(DRILL_LOGICAL, cluster, traits, table);
     this.rowType = rowType;
-    columns = columns == null || columns.size() == 0 ? GroupScan.ALL_COLUMNS : 
columns;
+    this.columns = columns == null || columns.size() == 0 ? 
GroupScan.ALL_COLUMNS : columns;
     try {
-      this.groupScan = drillTable.getGroupScan().clone(columns);
+      this.groupScan = drillTable.getGroupScan().clone(this.columns);
     } catch (IOException e) {
       throw new DrillRuntimeException("Failure creating scan.", e);
     }
   }
+
+  /** Creates a DrillScanRel for a particular GroupScan */
+  public DrillScanRel(RelOptCluster cluster, RelTraitSet traits,
+      RelOptTable table, GroupScan groupScan, RelDataType rowType, 
List<SchemaPath> columns) {
+    super(DRILL_LOGICAL, cluster, traits, table);
+    this.rowType = rowType;
+    this.columns = columns;
+    this.groupScan = groupScan;
+  }
+
 //
 //  private static GroupScan getCopy(GroupScan scan){
 //    try {
@@ -82,6 +93,9 @@ public class DrillScanRel extends DrillScanRelBase implements 
DrillRel {
 //    }
 //  }
 
+  public List<SchemaPath> getColumns() {
+    return this.columns;
+  }
 
   @Override
   public LogicalOperator implement(DrillImplementor implementor) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 5ec52c9..a036a46 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -121,6 +121,10 @@ public class PlannerSettings implements Context{
     return 
options.getOption(ExecConstants.ENABLE_MEMORY_ESTIMATION_KEY).bool_val;
   }
 
+  public String getFsPartitionColumnLabel() {
+    return 
options.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
+  }
+
   @Override
   public <T> T unwrap(Class<T> clazz) {
     if(clazz == PlannerSettings.class){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 4fa61e1..cfb7d08 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -63,6 +63,7 @@ public class SystemOptionManager implements OptionManager {
       ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR,
       ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR,
       ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR,
+      ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL_VALIDATOR,
       ExecConstants.SLICE_TARGET_OPTION,
       ExecConstants.AFFINITY_FACTOR,
       ExecConstants.MAX_WIDTH_GLOBAL,

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 8efcd2c..35d1868 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -25,7 +25,8 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.AbstractFileGroupScan;
+import org.apache.drill.exec.physical.base.FileGroupScan;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.ScanStats;
@@ -49,12 +50,12 @@ import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 
 @JsonTypeName("fs-scan")
-public class EasyGroupScan extends AbstractGroupScan{
+public class EasyGroupScan extends AbstractFileGroupScan{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(EasyGroupScan.class);
 
-  private final FileSelection selection;
+  private FileSelection selection;
   private final EasyFormatPlugin<?> formatPlugin;
-  private final int maxWidth;
+  private int maxWidth;
   private List<SchemaPath> columns;
 
   private ListMultimap<Integer, CompleteFileWork> mappings;
@@ -92,10 +93,7 @@ public class EasyGroupScan extends AbstractGroupScan{
     this.formatPlugin = Preconditions.checkNotNull(formatPlugin, "Unable to 
load format plugin for provided format config.");
     this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : 
columns;
     this.selectionRoot = selectionRoot;
-    BlockMapBuilder b = new 
BlockMapBuilder(formatPlugin.getFileSystem().getUnderlying(), 
formatPlugin.getContext().getBits());
-    this.chunks = 
b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), 
formatPlugin.isBlockSplittable());
-    this.maxWidth = chunks.size();
-    this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
+    initFromSelection(selection, formatPlugin);
   }
 
   private EasyGroupScan(EasyGroupScan that) {
@@ -110,6 +108,14 @@ public class EasyGroupScan extends AbstractGroupScan{
     mappings = that.mappings;
   }
 
+  private void initFromSelection(FileSelection selection, EasyFormatPlugin<?> 
formatPlugin) throws IOException {
+    this.selection = selection;
+    BlockMapBuilder b = new 
BlockMapBuilder(formatPlugin.getFileSystem().getUnderlying(), 
formatPlugin.getContext().getBits());
+    this.chunks = 
b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), 
formatPlugin.isBlockSplittable());
+    this.maxWidth = chunks.size();
+    this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
+  }
+
   public String getSelectionRoot() {
     return selectionRoot;
   }
@@ -148,6 +154,11 @@ public class EasyGroupScan extends AbstractGroupScan{
   }
 
   @Override
+  public void modifyFileSelection(FileSelection selection) {
+    this.selection = selection;
+  }
+
+  @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) 
throws ExecutionSetupException {
     assert children == null || children.isEmpty();
     return new EasyGroupScan(this);
@@ -222,6 +233,13 @@ public class EasyGroupScan extends AbstractGroupScan{
   }
 
   @Override
+  public FileGroupScan clone(FileSelection selection) throws IOException {
+    EasyGroupScan newScan = new EasyGroupScan(this);
+    newScan.initFromSelection(selection, formatPlugin);
+    return newScan;
+  }
+
+  @Override
   @JsonIgnore
   public boolean canPushdownProjects(List<SchemaPath> columns) {
     return formatPlugin.supportsPushDown();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 86e5224..6d3cf5a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -31,13 +31,15 @@ import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.metrics.DrillMetrics;
 import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.AbstractFileGroupScan;
+import org.apache.drill.exec.physical.base.FileGroupScan;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
 import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
@@ -70,7 +72,7 @@ import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 
 @JsonTypeName("parquet-scan")
-public class ParquetGroupScan extends AbstractGroupScan {
+public class ParquetGroupScan extends AbstractFileGroupScan {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class);
   static final MetricRegistry metrics = DrillMetrics.getInstance();
   static final String READ_FOOTER_TIMER = 
MetricRegistry.name(ParquetGroupScan.class, "readFooter");
@@ -249,6 +251,14 @@ public class ParquetGroupScan extends AbstractGroupScan {
     return this.fs;
   }
 
+  @Override
+  public void modifyFileSelection(FileSelection selection) {
+    entries.clear();
+    for (String fileName : selection.getAsFiles()) {
+      entries.add(new ReadEntryWithPath(fileName));
+    }
+  }
+
   public static class RowGroupInfo extends ReadEntryFromHDFS implements 
CompleteWork, FileWork {
 
     private EndpointByteMap byteMap;
@@ -388,6 +398,14 @@ public class ParquetGroupScan extends AbstractGroupScan {
   }
 
   @Override
+  public FileGroupScan clone(FileSelection selection) throws IOException {
+    ParquetGroupScan newScan = new ParquetGroupScan(this);
+    newScan.modifyFileSelection(selection);
+    newScan.readFooterFromEntries();
+    return newScan;
+  }
+
+  @Override
   @JsonIgnore
   public boolean canPushdownProjects(List<SchemaPath> columns) {
     return true;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9628f9bb/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java 
b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index c54772f..388d057 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -474,4 +474,5 @@ public class TestExampleQueries extends BaseTestQuery{
   public void testCase() throws Exception {
     test("select case when n_nationkey > 0 and n_nationkey < 2 then 
concat(n_name, '_abc') when n_nationkey >=2 and n_nationkey < 4 then '_EFG' 
else concat(n_name,'_XYZ') end from cp.`tpch/nation.parquet` ;");
   }
+
 }

Reply via email to