DRILL-3121: Add support for interpreter based partition pruning for Hive tables.
Remove the old partition pruning logic.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6d5d7cc2
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6d5d7cc2
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6d5d7cc2

Branch: refs/heads/master
Commit: 6d5d7cc27140963291e76eac72218b862aa6af45
Parents: 95d576d
Author: Mehant Baid <meha...@gmail.com>
Authored: Sat Jul 25 00:28:55 2015 -0700
Committer: Mehant Baid <meha...@gmail.com>
Committed: Thu Jul 30 20:23:52 2015 -0700

----------------------------------------------------------------------
 .../planner/sql/HivePartitionDescriptor.java    | 171 ++++++++++--
 .../exec/planner/sql/HivePartitionLocation.java |  54 ++++
 .../HivePushPartitionFilterIntoScan.java        | 150 ++++-------
 .../exec/store/hive/HiveDataTypeUtility.java    | 122 +++++++++
 .../drill/exec/store/hive/HiveRecordReader.java |  48 +---
 .../exec/store/hive/HiveStoragePlugin.java      |   2 +-
 .../apache/drill/exec/store/hive/HiveTable.java |  22 +-
 .../drill/exec/TestHivePartitionPruning.java    |  28 +-
 .../exec/planner/logical/DirPathBuilder.java    | 265 -------------------
 .../DrillPushPartitionFilterIntoScan.java       | 139 ----------
 .../planner/logical/PartitionPruningUtil.java   |  78 ------
 11 files changed, 424 insertions(+), 655 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/6d5d7cc2/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
index 93bfadd..d323db9 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
@@ -17,29 +17,54 @@
  */
 package org.apache.drill.exec.planner.sql;
 
+import org.apache.calcite.util.BitSets;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.expr.fn.impl.DateUtility;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.PartitionDescriptor;
 import org.apache.drill.exec.planner.PartitionLocation;
+import org.apache.drill.exec.planner.logical.DrillScanRel;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.store.hive.HiveDataTypeUtility;
+import org.apache.drill.exec.store.hive.HiveReadEntry;
+import org.apache.drill.exec.store.hive.HiveScan;
 import org.apache.drill.exec.store.hive.HiveTable;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableDateVector;
+import org.apache.drill.exec.vector.NullableFloat4Vector;
+import org.apache.drill.exec.vector.NullableFloat8Vector;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.NullableTimeStampVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormatter;
 
+import java.sql.Timestamp;
 import java.util.BitSet;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+
 // Partition descriptor for hive tables
 public class HivePartitionDescriptor implements PartitionDescriptor {
 
   private final Map<String, Integer> partitionMap = new HashMap<>();
   private final int MAX_NESTED_SUBDIRS;
+  private final DrillScanRel scanRel;
 
-  public HivePartitionDescriptor(List<HiveTable.FieldSchemaWrapper> 
partitionName) {
+  public HivePartitionDescriptor(PlannerSettings settings, DrillScanRel 
scanRel) {
     int i = 0;
-    for (HiveTable.FieldSchemaWrapper wrapper : partitionName) {
+    this.scanRel = scanRel;
+    for (HiveTable.FieldSchemaWrapper wrapper : ((HiveScan) 
scanRel.getGroupScan()).hiveReadEntry.table.partitionKeys) {
       partitionMap.put(wrapper.name, i);
       i++;
     }
@@ -61,37 +86,145 @@ public class HivePartitionDescriptor implements 
PartitionDescriptor {
     return MAX_NESTED_SUBDIRS;
   }
 
-  @Override
-  public Integer getIdIfValid(String name) {
-    return partitionMap.get(name);
-  }
-
-  /*
-   * Following method stubs are just added to satisfy the interface 
implementation.
-   * Actual implementation will be added when hive partition pruning is 
plugged in
-   * as part of DRILL-3121
-   */
-  private String getBaseTableLocation() {
-    return null;
+  public String getBaseTableLocation() {
+    HiveReadEntry origEntry = ((HiveScan) 
scanRel.getGroupScan()).hiveReadEntry;
+    return origEntry.table.getTable().getSd().getLocation();
   }
 
   @Override
-  public GroupScan createNewGroupScan(List<String> newFiles) throws Exception {
-    return null;
+  public GroupScan createNewGroupScan(List<String> newFiles) throws 
ExecutionSetupException {
+    HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
+    HiveReadEntry origReadEntry = hiveScan.hiveReadEntry;
+    List<HiveTable.HivePartition> oldPartitions = origReadEntry.partitions;
+    List<HiveTable.HivePartition> newPartitions = new LinkedList<>();
+
+    for (HiveTable.HivePartition part: oldPartitions) {
+      String partitionLocation = part.getPartition().getSd().getLocation();
+      for (String newPartitionLocation: newFiles) {
+        if (partitionLocation.equals(newPartitionLocation)) {
+          newPartitions.add(part);
+        }
+      }
+    }
+
+    HiveReadEntry newReadEntry = new HiveReadEntry(origReadEntry.table, 
newPartitions, origReadEntry.hiveConfigOverride);
+    HiveScan newScan = new HiveScan(hiveScan.getUserName(), newReadEntry, 
hiveScan.storagePlugin, hiveScan.columns);
+    return newScan;
   }
 
   @Override
   public List<PartitionLocation> getPartitions() {
-    return null;
+    List<PartitionLocation> partitions = new LinkedList<>();
+    HiveReadEntry origEntry = ((HiveScan) 
scanRel.getGroupScan()).hiveReadEntry;
+    List<String> allFileLocations = new LinkedList<>();
+    for (Partition partition: origEntry.getPartitions()) {
+      allFileLocations.add(partition.getSd().getLocation());
+    }
+    for (String file: allFileLocations) {
+      partitions.add(new HivePartitionLocation(MAX_NESTED_SUBDIRS, 
getBaseTableLocation(),file));
+    }
+    return partitions;
   }
 
   @Override
-  public void populatePartitionVectors(ValueVector[] vectors, 
List<PartitionLocation> partitions, BitSet partitionColumnBitSet, Map<Integer, 
String> fieldNameMap) {
+  public void populatePartitionVectors(ValueVector[] vectors, 
List<PartitionLocation> partitions,
+                                       BitSet partitionColumnBitSet, 
Map<Integer, String> fieldNameMap) {
+    int record = 0;
+    for(PartitionLocation partitionLocation: partitions) {
+      for(int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)){
+        populateVector(vectors[partitionColumnIndex], 
partitionLocation.getPartitionValue(partitionColumnIndex),
+            record);
+      }
+      record++;
+    }
 
+    for(ValueVector v : vectors) {
+      if(v == null){
+        continue;
+      }
+      v.getMutator().setValueCount(partitions.size());
+    }
   }
 
   @Override
   public TypeProtos.MajorType getVectorType(SchemaPath column, PlannerSettings 
plannerSettings) {
-    return null;
+    HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
+    String partitionName = column.getAsNamePart().getName();
+    Map<String, String> partitionNameTypeMap = 
hiveScan.hiveReadEntry.table.getPartitionNameTypeMap();
+    String hiveType = partitionNameTypeMap.get(partitionName);
+    PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) 
TypeInfoUtils.getTypeInfoFromTypeString(hiveType);
+
+    TypeProtos.MinorType partitionType = 
HiveDataTypeUtility.getMinorTypeFromHivePrimitiveTypeInfo(primitiveTypeInfo,
+        plannerSettings.getOptions());
+    return 
TypeProtos.MajorType.newBuilder().setMode(TypeProtos.DataMode.OPTIONAL).setMinorType(partitionType).build();
+  }
+
+  @Override
+  public Integer getIdIfValid(String name) {
+    return partitionMap.get(name);
+  }
+
+  public static void populateVector(ValueVector vector, String value, int 
record) {
+    TypeProtos.MinorType type = vector.getField().getType().getMinorType();
+
+    switch (type) {
+      case TINYINT:
+      case SMALLINT:
+      case INT:
+        if (value == null) {
+          ((NullableIntVector) vector).getMutator().setNull(record);
+        } else {
+          ((NullableIntVector) vector).getMutator().setSafe(record, 
Integer.parseInt(value));
+        }
+        break;
+
+      case BIGINT:
+        if (value == null) {
+          ((NullableBigIntVector) vector).getMutator().setNull(record);
+        } else {
+          ((NullableBigIntVector) vector).getMutator().setSafe(record, 
Long.parseLong(value));
+        }
+        break;
+      case FLOAT4:
+        if (value == null) {
+          ((NullableFloat4Vector) vector).getMutator().setNull(record);
+        } else {
+          ((NullableFloat4Vector) vector).getMutator().setSafe(record, 
Float.parseFloat(value));
+        }
+        break;
+      case FLOAT8:
+        if (value == null) {
+          ((NullableFloat8Vector) vector).getMutator().setNull(record);
+        } else {
+          ((NullableFloat8Vector) vector).getMutator().setSafe(record, 
Double.parseDouble(value));
+        }
+        break;
+      case TIMESTAMP:
+        if (value == null) {
+          ((NullableTimeStampVector) vector).getMutator().setNull(record);
+        } else {
+          DateTimeFormatter f = DateUtility.getDateTimeFormatter();
+          value = value.replace("%3A", ":");
+          long ts = DateTime.parse(value, 
f).withZoneRetainFields(DateTimeZone.UTC).getMillis();
+          ((NullableTimeStampVector) vector).getMutator().set(record, ts);
+        }
+        break;
+      case DATE:
+        if (value == null) {
+          ((NullableDateVector) vector).getMutator().setNull(record);
+        } else {
+          DateTimeFormatter f = DateUtility.formatDate;
+          long ts = DateTime.parse(value, 
f).withZoneRetainFields(DateTimeZone.UTC).getMillis();
+          ((NullableDateVector) vector).getMutator().set(record, ts);
+        }
+        break;
+      case VARCHAR:
+        if (value == null) {
+          ((NullableVarCharVector) vector).getMutator().setNull(record);
+        } else {
+          ((NullableVarCharVector) vector).getMutator().set(record, 
value.getBytes());
+        }
+        break;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/6d5d7cc2/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java
new file mode 100644
index 0000000..e3066a4
--- /dev/null
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql;
+
+import org.apache.drill.exec.planner.PartitionLocation;
+
+public class HivePartitionLocation implements PartitionLocation {
+  private final String partitionLocation;
+  private final String[] partitionValue;
+  private final String fileSeparator = System.getProperty("file.separator");
+  public HivePartitionLocation(int max, String baseTableLocation, String 
entireLocation) {
+    this.partitionLocation = entireLocation;
+    partitionValue = new String[max];
+    int start = partitionLocation.indexOf(baseTableLocation) + 
baseTableLocation.length();
+    String postPath = entireLocation.substring(start);
+    if (postPath.length() == 0) {
+      return;
+    }
+    if (postPath.startsWith(fileSeparator)) {
+      postPath = postPath.substring(postPath.indexOf(fileSeparator) + 1);
+    }
+    String[] mostDirs = postPath.split(fileSeparator);
+    assert mostDirs.length <= max;
+    for (int i = 0; i < mostDirs.length; i++) {
+      this.partitionValue[i] = mostDirs[i].substring(mostDirs[i].indexOf("=") 
+ 1);
+    }
+
+  }
+  @Override
+  public String getPartitionValue(int index) {
+    assert index < partitionValue.length;
+    return partitionValue[index];
+  }
+
+  @Override
+  public String getEntirePartitionLocation() {
+    return partitionLocation;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/6d5d7cc2/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
index 6ab1a78..90b0c5f 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
@@ -18,125 +18,75 @@
 
 package org.apache.drill.exec.planner.sql.logical;
 
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.planner.logical.DirPathBuilder;
+import org.apache.drill.exec.planner.PartitionDescriptor;
 import org.apache.drill.exec.planner.logical.DrillFilterRel;
 import org.apache.drill.exec.planner.logical.DrillProjectRel;
-import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillScanRel;
-import org.apache.drill.exec.planner.logical.PartitionPruningUtil;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.logical.partition.PruneScanRule;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.sql.HivePartitionDescriptor;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
-import org.apache.drill.exec.store.hive.HiveReadEntry;
 import org.apache.drill.exec.store.hive.HiveScan;
-import org.apache.drill.exec.store.hive.HiveTable;
-import org.apache.drill.exec.store.hive.HiveTable.HivePartition;
 import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelOptRuleOperand;
-
-import com.google.common.collect.Lists;
-
-public abstract class HivePushPartitionFilterIntoScan extends 
StoragePluginOptimizerRule {
-
-  public static final StoragePluginOptimizerRule HIVE_FILTER_ON_PROJECT =
-      new HivePushPartitionFilterIntoScan(
-          RelOptHelper.some(DrillFilterRel.class, 
RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))),
-          "HivePushPartitionFilterIntoScan:Filter_On_Project") {
 
-        @Override
-        public boolean matches(RelOptRuleCall call) {
-          final DrillScanRel scan = (DrillScanRel) call.rel(2);
-          GroupScan groupScan = scan.getGroupScan();
-          return groupScan instanceof HiveScan &&  
groupScan.supportsPartitionFilterPushdown();
-        }
+public abstract class HivePushPartitionFilterIntoScan {
 
-        @Override
-        public void onMatch(RelOptRuleCall call) {
-          final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
-          final DrillProjectRel projectRel = (DrillProjectRel) call.rel(1);
-          final DrillScanRel scanRel = (DrillScanRel) call.rel(2);
-          doOnMatch(call, filterRel, projectRel, scanRel);
-        }
-      };
+  public static final StoragePluginOptimizerRule 
getFilterOnProject(OptimizerRulesContext optimizerRulesContext) {
+    return new PruneScanRule(
+        RelOptHelper.some(DrillFilterRel.class, 
RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))),
+        "HivePushPartitionFilterIntoScan:Filter_On_Project_Hive",
+        optimizerRulesContext) {
 
-  public static final StoragePluginOptimizerRule HIVE_FILTER_ON_SCAN =
-      new HivePushPartitionFilterIntoScan(
-          RelOptHelper.some(DrillFilterRel.class, 
RelOptHelper.any(DrillScanRel.class)),
-          "HivePushPartitionFilterIntoScan:Filter_On_Scan") {
-
-        @Override
-        public boolean matches(RelOptRuleCall call) {
-          final DrillScanRel scan = (DrillScanRel) call.rel(1);
-          GroupScan groupScan = scan.getGroupScan();
-          return groupScan instanceof HiveScan &&  
groupScan.supportsPartitionFilterPushdown();
-        }
-
-        @Override
-        public void onMatch(RelOptRuleCall call) {
-          final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
-          final DrillScanRel scanRel = (DrillScanRel) call.rel(1);
-          doOnMatch(call, filterRel, null, scanRel);
-        }
-      };
-
-  private HivePushPartitionFilterIntoScan(
-      RelOptRuleOperand operand,
-      String id) {
-    super(operand, id);
-  }
-
-  private HiveReadEntry splitFilter(HiveReadEntry origReadEntry, 
DirPathBuilder builder) {
-    HiveTable table = origReadEntry.table;
-    List<HivePartition> partitions = origReadEntry.partitions;
-    List<HivePartition> newPartitions = new LinkedList<>();
-    String pathPrefix = 
PartitionPruningUtil.truncatePrefixFromPath(table.getTable().getSd().getLocation());
-    List<String> newFiles = Lists.newArrayList();
-    List<String> dirPathList = builder.getDirPath();
-
-    for (String dirPath : dirPathList) {
-      String fullPath = pathPrefix + dirPath;
-      // check containment of this path in the list of files
-      for (HivePartition part: partitions) {
-        String origFilePath = origReadEntry.getPartitionLocation(part);
-        String origFileName = 
PartitionPruningUtil.truncatePrefixFromPath(origFilePath);
+      @Override
+      public PartitionDescriptor getPartitionDescriptor(PlannerSettings 
settings, DrillScanRel scanRel) {
+        return new HivePartitionDescriptor(settings, scanRel);
+      }
 
-        if (origFileName.startsWith(fullPath)) {
-          newFiles.add(origFileName);
-          newPartitions.add(part);
-        }
+      @Override
+      public boolean matches(RelOptRuleCall call) {
+        final DrillScanRel scan = (DrillScanRel) call.rel(2);
+        GroupScan groupScan = scan.getGroupScan();
+        // this rule is applicable only for Hive based partition pruning
+        return groupScan instanceof HiveScan && 
groupScan.supportsPartitionFilterPushdown();
       }
-    }
 
-    if (newFiles.size() > 0) {
-      HiveReadEntry newReadEntry = new HiveReadEntry(table, newPartitions, 
origReadEntry.hiveConfigOverride);
-      return newReadEntry;
-    }
-    return origReadEntry;
+      @Override
+      public void onMatch(RelOptRuleCall call) {
+        final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
+        final DrillProjectRel projectRel = (DrillProjectRel) call.rel(1);
+        final DrillScanRel scanRel = (DrillScanRel) call.rel(2);
+        doOnMatch(call, filterRel, projectRel, scanRel);
+      }
+    };
   }
 
-  protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, 
DrillProjectRel projectRel, DrillScanRel scanRel) {
-    DrillRel inputRel = projectRel != null ? projectRel : scanRel;
-    HiveReadEntry origReadEntry = 
((HiveScan)scanRel.getGroupScan()).hiveReadEntry;
-    DirPathBuilder builder = new DirPathBuilder(filterRel, inputRel, 
filterRel.getCluster().getRexBuilder(), new 
HivePartitionDescriptor(origReadEntry.table.partitionKeys));
-    HiveReadEntry newReadEntry = splitFilter(origReadEntry, builder);
+  public static final StoragePluginOptimizerRule 
getFilterOnScan(OptimizerRulesContext optimizerRulesContext) {
+    return new PruneScanRule(
+        RelOptHelper.some(DrillFilterRel.class, 
RelOptHelper.any(DrillScanRel.class)),
+        "HivePushPartitionFilterIntoScan:Filter_On_Scan_Hive", 
optimizerRulesContext) {
 
-    if (origReadEntry == newReadEntry) {
-      return; // no directory filter was pushed down
-    }
+      @Override
+      public PartitionDescriptor getPartitionDescriptor(PlannerSettings 
settings, DrillScanRel scanRel) {
+        return new HivePartitionDescriptor(settings, scanRel);
+      }
 
-    try {
-      HiveScan oldScan = (HiveScan) scanRel.getGroupScan();
-      HiveScan hiveScan = new HiveScan(oldScan.getUserName(), newReadEntry, 
oldScan.storagePlugin, oldScan.columns);
-      PartitionPruningUtil.rewritePlan(call, filterRel, projectRel, scanRel, 
hiveScan, builder);
-    } catch (ExecutionSetupException e) {
-      throw new DrillRuntimeException(e);
-    }
+      @Override
+      public boolean matches(RelOptRuleCall call) {
+        final DrillScanRel scan = (DrillScanRel) call.rel(1);
+        GroupScan groupScan = scan.getGroupScan();
+        // this rule is applicable only for Hive based partition pruning
+        return groupScan instanceof HiveScan && 
groupScan.supportsPartitionFilterPushdown();
+      }
 
+      @Override
+      public void onMatch(RelOptRuleCall call) {
+        final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
+        final DrillScanRel scanRel = (DrillScanRel) call.rel(1);
+        doOnMatch(call, filterRel, null, scanRel);
+      }
+    };
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/6d5d7cc2/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDataTypeUtility.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDataTypeUtility.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDataTypeUtility.java
new file mode 100644
index 0000000..84d8790
--- /dev/null
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDataTypeUtility.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.expr.holders.Decimal18Holder;
+import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
+import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
+import org.apache.drill.exec.expr.holders.Decimal9Holder;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.BitVector;
+import org.apache.drill.exec.vector.DateVector;
+import org.apache.drill.exec.vector.Decimal18Vector;
+import org.apache.drill.exec.vector.Decimal28SparseVector;
+import org.apache.drill.exec.vector.Decimal38SparseVector;
+import org.apache.drill.exec.vector.Decimal9Vector;
+import org.apache.drill.exec.vector.Float4Vector;
+import org.apache.drill.exec.vector.Float8Vector;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.TimeStampVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarBinaryVector;
+import org.apache.drill.exec.vector.VarCharVector;
+import org.apache.drill.exec.work.ExecErrorConstants;
+
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+
+public class HiveDataTypeUtility {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HiveDataTypeUtility.class);
+
+  public static TypeProtos.MinorType 
getMinorTypeFromHivePrimitiveTypeInfo(PrimitiveTypeInfo primitiveTypeInfo,
+                                                                           
OptionManager options) {
+    switch(primitiveTypeInfo.getPrimitiveCategory()) {
+      case BINARY:
+        return TypeProtos.MinorType.VARBINARY;
+      case BOOLEAN:
+        return TypeProtos.MinorType.BIT;
+      case DECIMAL: {
+
+        if 
(options.getOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY).bool_val == 
false) {
+          throw UserException.unsupportedError()
+              .message(ExecErrorConstants.DECIMAL_DISABLE_ERR_MSG)
+              .build(logger);
+        }
+        DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) primitiveTypeInfo;
+        return DecimalUtility.getDecimalDataType(decimalTypeInfo.precision());
+      }
+      case DOUBLE:
+        return TypeProtos.MinorType.FLOAT8;
+      case FLOAT:
+        return TypeProtos.MinorType.FLOAT4;
+      // TODO (DRILL-2470)
+      // Byte and short (tinyint and smallint in SQL types) are currently read 
as integers
+      // as these smaller integer types are not fully supported in Drill today.
+      case SHORT:
+      case BYTE:
+      case INT:
+        return TypeProtos.MinorType.INT;
+      case LONG:
+        return TypeProtos.MinorType.BIGINT;
+      case STRING:
+      case VARCHAR:
+        return TypeProtos.MinorType.VARCHAR;
+      case TIMESTAMP:
+        return TypeProtos.MinorType.TIMESTAMP;
+      case DATE:
+        return TypeProtos.MinorType.DATE;
+    }
+    
throwUnsupportedHiveDataTypeError(primitiveTypeInfo.getPrimitiveCategory().toString());
+    return null;
+  }
+
+  public static void throwUnsupportedHiveDataTypeError(String unsupportedType) 
{
+    StringBuilder errMsg = new StringBuilder();
+    errMsg.append(String.format("Unsupported Hive data type %s. ", 
unsupportedType));
+    errMsg.append(System.getProperty("line.separator"));
+    errMsg.append("Following Hive data types are supported in Drill for 
querying: ");
+    errMsg.append(
+        "BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, DATE, TIMESTAMP, 
BINARY, DECIMAL, STRING, and VARCHAR");
+
+    throw new RuntimeException(errMsg.toString());
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/drill/blob/6d5d7cc2/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
index 088fb74..1a66ad9 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
@@ -126,7 +126,7 @@ public class HiveRecordReader extends AbstractRecordReader {
   protected static final int FIELD_SIZE = 50;
 
   public HiveRecordReader(Table table, Partition partition, InputSplit 
inputSplit, List<SchemaPath> projectedColumns,
-      FragmentContext context, Map<String, String> hiveConfigOverride) throws 
ExecutionSetupException {
+                          FragmentContext context, Map<String, String> 
hiveConfigOverride) throws ExecutionSetupException {
     this.table = table;
     this.partition = partition;
     this.inputSplit = inputSplit;
@@ -353,53 +353,11 @@ public class HiveRecordReader extends 
AbstractRecordReader {
     }
   }
 
-  private MinorType getMinorTypeFromHivePrimitiveTypeInfo(PrimitiveTypeInfo 
primitiveTypeInfo) {
-    switch(primitiveTypeInfo.getPrimitiveCategory()) {
-      case BINARY:
-        return TypeProtos.MinorType.VARBINARY;
-      case BOOLEAN:
-        return MinorType.BIT;
-      case DECIMAL: {
-
-        if 
(context.getOptions().getOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY).bool_val
 == false) {
-          throw UserException.unsupportedError()
-              .message(ExecErrorConstants.DECIMAL_DISABLE_ERR_MSG)
-              .build(logger);
-        }
-        DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) primitiveTypeInfo;
-        return DecimalUtility.getDecimalDataType(decimalTypeInfo.precision());
-      }
-      case DOUBLE:
-        return MinorType.FLOAT8;
-      case FLOAT:
-        return MinorType.FLOAT4;
-      // TODO (DRILL-2470)
-      // Byte and short (tinyint and smallint in SQL types) are currently read 
as integers
-      // as these smaller integer types are not fully supported in Drill today.
-      case SHORT:
-      case BYTE:
-      case INT:
-        return MinorType.INT;
-      case LONG:
-        return MinorType.BIGINT;
-      case STRING:
-      case VARCHAR:
-        return MinorType.VARCHAR;
-      case TIMESTAMP:
-        return MinorType.TIMESTAMP;
-      case DATE:
-        return MinorType.DATE;
-    }
-
-    
throwUnsupportedHiveDataTypeError(primitiveTypeInfo.getPrimitiveCategory().toString());
-    return null;
-  }
-
   public MajorType getMajorTypeFromHiveTypeInfo(TypeInfo typeInfo, boolean 
nullable) {
     switch (typeInfo.getCategory()) {
       case PRIMITIVE: {
         PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo;
-        MinorType minorType = 
getMinorTypeFromHivePrimitiveTypeInfo(primitiveTypeInfo);
+        MinorType minorType = 
HiveDataTypeUtility.getMinorTypeFromHivePrimitiveTypeInfo(primitiveTypeInfo, 
context.getOptions());
         MajorType.Builder typeBuilder = 
MajorType.newBuilder().setMinorType(minorType)
             .setMode((nullable ? DataMode.OPTIONAL : DataMode.REQUIRED));
 
@@ -526,7 +484,7 @@ public class HiveRecordReader extends AbstractRecordReader {
   }
 
   private void populateDecimalPartitionVector(DecimalTypeInfo typeInfo, 
ValueVector vector, BigDecimal bigDecimal,
-      int recordCount) {
+                                              int recordCount) {
     int precision = typeInfo.precision();
     int scale = typeInfo.scale();
     if (precision <= 9) {

http://git-wip-us.apache.org/repos/asf/drill/blob/6d5d7cc2/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index 4813c5e..22552b7 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -89,7 +89,7 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
 
   @Override
   public Set<StoragePluginOptimizerRule> 
getOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
-    return 
ImmutableSet.of(HivePushPartitionFilterIntoScan.HIVE_FILTER_ON_PROJECT, 
HivePushPartitionFilterIntoScan.HIVE_FILTER_ON_SCAN);
+    return 
ImmutableSet.of(HivePushPartitionFilterIntoScan.getFilterOnProject(optimizerRulesContext),
 HivePushPartitionFilterIntoScan.getFilterOnScan(optimizerRulesContext));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/6d5d7cc2/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java
index 99101cc..88fe8c3 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.hive;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -64,12 +65,15 @@ public class HiveTable {
   @JsonProperty
   public String tableType;
 
+  @JsonIgnore
+  public final Map<String, String> partitionNameTypeMap = new HashMap();
+
   @JsonCreator
   public HiveTable(@JsonProperty("tableName") String tableName, 
@JsonProperty("dbName") String dbName, @JsonProperty("owner") String owner, 
@JsonProperty("createTime") int createTime,
                    @JsonProperty("lastAccessTime") int lastAccessTime, 
@JsonProperty("retention") int retention, @JsonProperty("sd") 
StorageDescriptorWrapper sd,
                    @JsonProperty("partitionKeys") List<FieldSchemaWrapper> 
partitionKeys, @JsonProperty("parameters") Map<String, String> parameters,
                    @JsonProperty("viewOriginalText") String viewOriginalText, 
@JsonProperty("viewExpandedText") String viewExpandedText, 
@JsonProperty("tableType") String tableType
-                   ) {
+  ) {
     this.tableName = tableName;
     this.dbName = dbName;
     this.owner = owner;
@@ -86,10 +90,11 @@ public class HiveTable {
     List<FieldSchema> partitionKeysUnwrapped = Lists.newArrayList();
     for (FieldSchemaWrapper w : partitionKeys) {
       partitionKeysUnwrapped.add(w.getFieldSchema());
+      partitionNameTypeMap.put(w.name, w.type);
     }
     StorageDescriptor sdUnwrapped = sd.getSd();
     this.table = new Table(tableName, dbName, owner, createTime, 
lastAccessTime, retention, sdUnwrapped, partitionKeysUnwrapped,
-            parameters, viewOriginalText, viewExpandedText, tableType);
+        parameters, viewOriginalText, viewExpandedText, tableType);
   }
 
   public HiveTable(Table table) {
@@ -107,6 +112,7 @@ public class HiveTable {
     this.partitionKeys = Lists.newArrayList();
     for (FieldSchema f : table.getPartitionKeys()) {
       this.partitionKeys.add(new FieldSchemaWrapper(f));
+      partitionNameTypeMap.put(f.getName(), f.getType());
     }
     this.parameters = table.getParameters();
     this.viewOriginalText = table.getViewOriginalText();
@@ -156,8 +162,8 @@ public class HiveTable {
 
     @JsonCreator
     public HivePartition(@JsonProperty("values") List<String> values, 
@JsonProperty("tableName") String tableName, @JsonProperty("dbName") String 
dbName, @JsonProperty("createTime") int createTime,
-                     @JsonProperty("lastAccessTime") int lastAccessTime,  
@JsonProperty("sd") StorageDescriptorWrapper sd,
-                     @JsonProperty("parameters") Map<String, String> parameters
+                         @JsonProperty("lastAccessTime") int lastAccessTime,  
@JsonProperty("sd") StorageDescriptorWrapper sd,
+                         @JsonProperty("parameters") Map<String, String> 
parameters
     ) {
       this.values = values;
       this.tableName = tableName;
@@ -217,7 +223,7 @@ public class HiveTable {
     public int numBuckets;
     @JsonProperty
     public SerDeInfoWrapper serDeInfo;
-//    @JsonProperty
+    //    @JsonProperty
 //    public List<String> bucketCols;
     @JsonProperty
     public List<OrderWrapper> sortCols;
@@ -251,7 +257,7 @@ public class HiveTable {
 //      this.sd = new StorageDescriptor(colsUnwrapped, location, inputFormat, 
outputFormat, compressed, numBuckets, serDeInfoUnwrapped,
 //              bucketCols, sortColsUnwrapped, parameters);
       this.sd = new StorageDescriptor(colsUnwrapped, location, inputFormat, 
outputFormat, compressed, numBuckets, serDeInfoUnwrapped,
-              null, sortColsUnwrapped, parameters);
+          null, sortColsUnwrapped, parameters);
     }
 
     public StorageDescriptorWrapper(StorageDescriptor sd) {
@@ -369,4 +375,8 @@ public class HiveTable {
     }
   }
 
+  public Map<String, String> getPartitionNameTypeMap() {
+    return partitionNameTypeMap;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/6d5d7cc2/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
index c846328..0ea9d53 100644
--- 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
+++ 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
@@ -37,7 +37,7 @@ public class TestHivePartitionPruning extends HiveTestBase {
   /* Partition pruning is not supported for disjuncts that do not meet pruning 
criteria.
    * Will be enabled when we can do wild card comparison for partition pruning
    */
-  @Ignore
+  @Test
   public void testDisjunctsPartitionFilter() throws Exception {
     final String query = "explain plan for select * from 
hive.`default`.partition_pruning_test where (c = 1) or (d = 1)";
     final String plan = getPlanInString(query, OPTIQ_FORMAT);
@@ -55,7 +55,7 @@ public class TestHivePartitionPruning extends HiveTestBase {
     assertFalse(plan.contains("Filter"));
   }
 
-  @Ignore("DRILL-1571")
+  @Test
   public void testComplexFilter() throws Exception {
     final String query = "explain plan for select * from 
hive.`default`.partition_pruning_test where (c = 1 and d = 1) or (c = 2 and d = 
3)";
     final String plan = getPlanInString(query, OPTIQ_FORMAT);
@@ -63,4 +63,28 @@ public class TestHivePartitionPruning extends HiveTestBase {
     // Check and make sure that Filter is not present in the plan
     assertFalse(plan.contains("Filter"));
   }
+
+  @Test
+  public void testRangeFilter() throws Exception {
+    final String query = "explain plan for " +
+        "select * from hive.`default`.partition_pruning_test where " +
+        "c > 1 and d > 1";
+
+    final String plan = getPlanInString(query, OPTIQ_FORMAT);
+
+    // Check and make sure that Filter is not present in the plan
+    assertFalse(plan.contains("Filter"));
+  }
+
+  @Test
+  public void testRangeFilterWithDisjunct() throws Exception {
+    final String query = "explain plan for " +
+        "select * from hive.`default`.partition_pruning_test where " +
+        "(c > 1 and d > 1) or (c < 2 and d < 2)";
+
+    final String plan = getPlanInString(query, OPTIQ_FORMAT);
+
+    // Check and make sure that Filter is not present in the plan
+    assertFalse(plan.contains("Filter"));
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/6d5d7cc2/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DirPathBuilder.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DirPathBuilder.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DirPathBuilder.java
deleted file mode 100644
index 892e8cb..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DirPathBuilder.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.planner.logical;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.planner.PartitionDescriptor;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexCorrelVariable;
-import org.apache.calcite.rex.RexDynamicParam;
-import org.apache.calcite.rex.RexFieldAccess;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexLocalRef;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexOver;
-import org.apache.calcite.rex.RexRangeRef;
-import org.apache.calcite.rex.RexUtil;
-import org.apache.calcite.rex.RexVisitorImpl;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlSyntax;
-
-import com.google.common.collect.Lists;
-
-public class DirPathBuilder extends RexVisitorImpl <SchemaPath> {
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DirPathBuilder.class);
-
-  static final String EMPTY_STRING = "";
-
-  final private DrillFilterRel filterRel;
-  final private DrillRel inputRel;
-  final private RexBuilder builder;
-  final private PartitionDescriptor partitionDescriptor;
-
-  private List<String> dirNameList;
-  private List<RexNode> conjunctList;
-  private List<String> dirPathList = Lists.newArrayList();
-  private final static List<String> emptyDirPathList = new ArrayList<>(0);
-  private RexNode currentConjunct = null;    // the current conjunct are we 
evaluating during visitor traversal
-  private RexNode finalCondition = null;     // placeholder for the final 
filter condition
-  private boolean dirMatch = false;
-
-  public DirPathBuilder(DrillFilterRel filterRel, DrillRel inputRel, 
RexBuilder builder, PartitionDescriptor partitionDescriptor) {
-    super(true);
-    this.filterRel = filterRel;
-    this.inputRel = inputRel;
-    this.builder = builder;
-    this.finalCondition = filterRel.getCondition();
-    this.partitionDescriptor = partitionDescriptor;
-  }
-
-  private void initPathComponents() {
-    int maxHierarchy = partitionDescriptor.getMaxHierarchyLevel();
-    dirNameList = Lists.newArrayListWithExpectedSize(maxHierarchy);
-    conjunctList = Lists.newArrayListWithExpectedSize(maxHierarchy);
-    for (int i=0; i < maxHierarchy; i++) {
-      dirNameList.add(EMPTY_STRING);
-      conjunctList.add(null);
-    }
-  }
-
-  /**
-   * Build a directory path string for filter conditions containing directory 
filters.
-   * For example, suppose we have directory hierarchy:
-   * {orders/2012/Jan...Dec, orders/2013/Jan...Dec, orders/2014/Jan...Dec}
-   * path will be built for following types of filters (More types of filters 
will be added in the future):
-   *    1. SELECT * FROM <path>/orders WHERE o_custkey = 5 AND dir0 = '2014' 
AND dir1 = 'June'
-   *    2. SELECT * FROM <path>/orders WHERE (dir0 = '2013' AND dir1 = 'June') 
OR (dir0 = '2014' AND dir1 = 'June')
-   * For (1) dirPath =  <path>/orders/2014/June
-   * For (2) there are 2 dirPaths: {<path>/orders/2013/June, 
<path>/orders/2014/June}
-   * @return The list of strings corresponding to directory paths
-   */
-  public List<String> getDirPath() {
-    List<RexNode> disjuncts = 
RelOptUtil.disjunctions(filterRel.getCondition());
-    boolean buildDisjunction = false;
-    List<RexNode> newDisjunctList = Lists.newArrayList();
-
-    for (RexNode d : disjuncts) {  // process the top-level disjuncts
-      List<RexNode> conjuncts = RelOptUtil.conjunctions(d);
-      String dirPath = EMPTY_STRING;
-      initPathComponents();
-
-      boolean buildConjunction = false;
-
-      // go through the conjuncts to identify the directory filters
-      for (RexNode c : conjuncts) {
-        currentConjunct = c;
-        SchemaPath expr = c.accept(this);
-
-        if (expr != null) {
-          logger.debug("Found directory filter: " + 
expr.getRootSegment().getPath());
-        }
-      }
-
-      String prevPath = dirNameList.get(0);
-
-      // compose the final path string
-      for (int i = 0; i < dirNameList.size(); i++) {
-        String path = dirNameList.get(i);
-        if (i > 0) {
-          prevPath = dirNameList.get(i-1);
-        }
-        // Check if both the current path and the previous path are non-empty; 
currently
-        // we will only push a dir<N> filter if dir<N-1> filter is also 
specified
-        if (!path.equals(EMPTY_STRING) && !prevPath.equals(EMPTY_STRING)) {
-          dirPath += "/" + path;
-
-          // since we are pushing this directory filter we should remove it 
from the
-          // list of conjuncts
-          RexNode thisConjunct = conjunctList.get(i);
-          conjuncts.remove(thisConjunct);
-          buildConjunction = true;
-        }
-      }
-      if (!dirPath.equals(EMPTY_STRING)) {
-        dirPathList.add(dirPath);
-      } else {
-        // If one of the disjuncts do not satisfy our criteria then we 
shouldn't apply any optimization
-        return emptyDirPathList;
-      }
-
-      if (buildConjunction) {
-        RexNode newConjunct = RexUtil.composeConjunction(builder, conjuncts, 
false);
-        newDisjunctList.add(newConjunct);
-        buildDisjunction = true;
-      }
-
-    } // for (disjuncts)
-
-    if (buildDisjunction) {
-      this.finalCondition = RexUtil.composeDisjunction(builder, 
newDisjunctList, false);
-    }
-    return dirPathList;
-  }
-
-  public RexNode getFinalCondition() {
-    return finalCondition;
-  }
-
-  @Override
-  public SchemaPath visitInputRef(RexInputRef inputRef) {
-    final int index = inputRef.getIndex();
-    final RelDataTypeField field = 
inputRel.getRowType().getFieldList().get(index);
-    if (partitionDescriptor.isPartitionName(field.getName())) {
-      dirMatch = true;
-    }
-    return FieldReference.getWithQuotedRef(field.getName());
-  }
-
-  @Override
-  public SchemaPath visitCall(RexCall call) {
-//    logger.debug("RexCall {}, {}", call);
-    final SqlSyntax syntax = call.getOperator().getSyntax();
-    switch (syntax) {
-    case BINARY:
-      if (call.getKind() == SqlKind.EQUALS) {
-        dirMatch = false;
-        // TODO: an assumption here is that the binary predicate is of the 
form <column> = <value>.
-        // In order to handle predicates of the form '<value> = <column>' we 
would need to canonicalize
-        // the predicate first before calling this function.
-        SchemaPath e1 = call.getOperands().get(0).accept(this);
-
-        if (dirMatch && e1 != null) {
-          // get the index for the 'dir<N>' filter
-          String dirName = e1.getRootSegment().getPath();
-          int hierarychyIndex = 
partitionDescriptor.getPartitionHierarchyIndex(dirName);
-
-          if (hierarychyIndex >= partitionDescriptor.getMaxHierarchyLevel()) {
-            return null;
-          }
-
-          // SchemaPath e2 = call.getOperands().get(1).accept(this);
-          if (call.getOperands().get(1).getKind() == SqlKind.LITERAL) {
-            String e2 = 
((RexLiteral)call.getOperands().get(1)).getValue2().toString();
-            dirNameList.set(hierarychyIndex, e2);
-            // dirNameList.set(suffixIndex, e2.getRootSegment().getPath());
-            conjunctList.set(hierarychyIndex, currentConjunct);
-            return e1;
-          }
-        }
-      }
-
-      return null;
-
-    case SPECIAL:
-      switch(call.getKind()) {
-      case CAST:
-        return getInputFromCast(call);
-
-      default:
-
-      }
-      // fall through
-    default:
-      // throw new AssertionError("Unexpected expression");
-
-    }
-    return null;
-  }
-
-  private SchemaPath getInputFromCast(RexCall call){
-    SchemaPath arg = call.getOperands().get(0).accept(this);
-    if (dirMatch) {
-      return arg;
-    }
-    return null;
-  }
-
-  @Override
-  public SchemaPath visitLocalRef(RexLocalRef localRef) {
-    return null;
-  }
-
-  @Override
-  public SchemaPath visitOver(RexOver over) {
-    return null;
-  }
-
-  @Override
-  public SchemaPath visitCorrelVariable(RexCorrelVariable correlVariable) {
-    return null;
-  }
-
-  @Override
-  public SchemaPath visitDynamicParam(RexDynamicParam dynamicParam) {
-    return null;
-  }
-
-  @Override
-  public SchemaPath visitRangeRef(RexRangeRef rangeRef) {
-    return null;
-  }
-
-  @Override
-  public SchemaPath visitFieldAccess(RexFieldAccess fieldAccess) {
-    return super.visitFieldAccess(fieldAccess);
-  }
-
-  @Override
-  public SchemaPath visitLiteral(RexLiteral literal) {
-    return FieldReference.getWithQuotedRef(literal.getValue2().toString());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/6d5d7cc2/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java
deleted file mode 100644
index 811eef1..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.drill.exec.planner.logical;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.exec.physical.base.FileGroupScan;
-import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.planner.FileSystemPartitionDescriptor;
-import org.apache.drill.exec.planner.physical.PlannerSettings;
-import org.apache.drill.exec.planner.physical.PrelUtil;
-import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.dfs.FormatSelection;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelOptRuleOperand;
-
-import com.google.common.collect.Lists;
-
-public abstract class DrillPushPartitionFilterIntoScan extends RelOptRule {
-
-  public static final RelOptRule FILTER_ON_PROJECT =
-    new DrillPushPartitionFilterIntoScan(
-        RelOptHelper.some(DrillFilterRel.class, 
RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))),
-        "DrillPushPartitionFilterIntoScan:Filter_On_Project") {
-
-    @Override
-      public boolean matches(RelOptRuleCall call) {
-        final DrillScanRel scan = (DrillScanRel) call.rel(2);
-        GroupScan groupScan = scan.getGroupScan();
-        // this rule is applicable only for dfs based partition pruning
-        return groupScan instanceof FileGroupScan && 
groupScan.supportsPartitionFilterPushdown();
-      }
-
-    @Override
-    public void onMatch(RelOptRuleCall call) {
-      final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
-      final DrillProjectRel projectRel = (DrillProjectRel) call.rel(1);
-      final DrillScanRel scanRel = (DrillScanRel) call.rel(2);
-      doOnMatch(call, filterRel, projectRel, scanRel);
-    }
-  };
-
-  public static final RelOptRule FILTER_ON_SCAN =
-      new DrillPushPartitionFilterIntoScan(
-          RelOptHelper.some(DrillFilterRel.class, 
RelOptHelper.any(DrillScanRel.class)),
-          "DrillPushPartitionFilterIntoScan:Filter_On_Scan") {
-
-      @Override
-        public boolean matches(RelOptRuleCall call) {
-          final DrillScanRel scan = (DrillScanRel) call.rel(1);
-          GroupScan groupScan = scan.getGroupScan();
-          // this rule is applicable only for dfs based partition pruning
-          return groupScan instanceof FileGroupScan && 
groupScan.supportsPartitionFilterPushdown();
-        }
-
-      @Override
-      public void onMatch(RelOptRuleCall call) {
-        final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
-        final DrillScanRel scanRel = (DrillScanRel) call.rel(1);
-        doOnMatch(call, filterRel, null, scanRel);
-      }
-    };
-
-  private DrillPushPartitionFilterIntoScan(
-      RelOptRuleOperand operand,
-      String id) {
-    super(operand, id);
-  }
-
-  private FormatSelection splitFilter(FormatSelection origSelection, 
DirPathBuilder builder) {
-
-    List<String> origFiles = origSelection.getAsFiles();
-    String pathPrefix = origSelection.getSelection().selectionRoot;
-
-    List<String> newFiles = Lists.newArrayList();
-
-    List<String> dirPathList = builder.getDirPath();
-
-    for (String dirPath : dirPathList) {
-      String fullPath = pathPrefix + dirPath;
-      // check containment of this path in the list of files
-      for (String origFilePath : origFiles) {
-        String origFileName = 
PartitionPruningUtil.truncatePrefixFromPath(origFilePath);
-
-        if (origFileName.startsWith(fullPath)) {
-          newFiles.add(origFileName);
-        }
-      }
-    }
-
-    if (newFiles.size() > 0) {
-      FileSelection newFileSelection = new FileSelection(newFiles, 
origSelection.getSelection().selectionRoot, true);
-      FormatSelection newFormatSelection = new 
FormatSelection(origSelection.getFormat(), newFileSelection);
-      return newFormatSelection;
-    }
-
-    return origSelection;
-  }
-
-  protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, 
DrillProjectRel projectRel, DrillScanRel scanRel) {
-    DrillRel inputRel = projectRel != null ? projectRel : scanRel;
-
-    PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
-    DirPathBuilder builder = new DirPathBuilder(filterRel, inputRel, 
filterRel.getCluster().getRexBuilder(), new 
FileSystemPartitionDescriptor(settings, scanRel));
-
-    FormatSelection origSelection = 
(FormatSelection)scanRel.getDrillTable().getSelection();
-    FormatSelection newSelection = splitFilter(origSelection, builder);
-
-    if (origSelection == newSelection) {
-      return; // no directory filter was pushed down
-    }
-
-    try {
-      FileGroupScan fgscan = 
((FileGroupScan)scanRel.getGroupScan()).clone(newSelection.getSelection());
-      PartitionPruningUtil.rewritePlan(call, filterRel, projectRel, scanRel, 
fgscan, builder);
-    } catch (IOException e) {
-      throw new DrillRuntimeException(e) ;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/6d5d7cc2/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PartitionPruningUtil.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PartitionPruningUtil.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PartitionPruningUtil.java
deleted file mode 100644
index 05ccfb9..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PartitionPruningUtil.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.planner.logical;
-
-import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.rex.RexNode;
-
-public class PartitionPruningUtil {
-  public static void rewritePlan(RelOptRuleCall call, DrillFilterRel 
filterRel, DrillProjectRel projectRel, DrillScanRel scanRel, GroupScan newScan, 
DirPathBuilder builder) {
-    RexNode origFilterCondition = filterRel.getCondition();
-    RexNode newFilterCondition = builder.getFinalCondition();
-
-    if (newFilterCondition.isAlwaysTrue()) {
-
-      final DrillScanRel newScanRel =
-          new DrillScanRel(scanRel.getCluster(),
-              scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
-              scanRel.getTable(),
-              newScan,
-              scanRel.getRowType(),
-              scanRel.getColumns());
-
-      if (projectRel != null) {
-        DrillProjectRel newProjectRel = new 
DrillProjectRel(projectRel.getCluster(), projectRel.getTraitSet(),
-            newScanRel, projectRel.getProjects(), filterRel.getRowType());
-
-        call.transformTo(newProjectRel);
-      } else {
-        call.transformTo(newScanRel);
-      }
-    } else {
-      DrillRel inputRel;
-      final DrillScanRel newScanRel =
-          new DrillScanRel(scanRel.getCluster(),
-              scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
-              scanRel.getTable(),
-              newScan,
-              scanRel.getRowType(),
-              scanRel.getColumns());
-      if (projectRel != null) {
-        DrillProjectRel newProjectRel = new 
DrillProjectRel(projectRel.getCluster(), projectRel.getTraitSet(),
-            newScanRel, projectRel.getProjects(), projectRel.getRowType());
-        inputRel = newProjectRel;
-      } else {
-        inputRel = newScanRel;
-      }
-      final DrillFilterRel newFilterRel = new 
DrillFilterRel(filterRel.getCluster(), filterRel.getTraitSet(),
-          inputRel, origFilterCondition /* for now keep the original condition 
until we add more test coverage */);
-
-      call.transformTo(newFilterRel);
-    }
-  }
-
-  public static String truncatePrefixFromPath(String fileName) {
-    String pathPrefixComponent[] = fileName.split(":", 2);
-    if (pathPrefixComponent.length == 1) {
-      return pathPrefixComponent[0];
-    } else {
-      return pathPrefixComponent[1];
-    }
-  }
-}

Reply via email to