DRILL-3121: Add support for interpreter based partition pruning for Hive tables. Remove the old partition pruning logic.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6d5d7cc2 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6d5d7cc2 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6d5d7cc2 Branch: refs/heads/master Commit: 6d5d7cc27140963291e76eac72218b862aa6af45 Parents: 95d576d Author: Mehant Baid <meha...@gmail.com> Authored: Sat Jul 25 00:28:55 2015 -0700 Committer: Mehant Baid <meha...@gmail.com> Committed: Thu Jul 30 20:23:52 2015 -0700 ---------------------------------------------------------------------- .../planner/sql/HivePartitionDescriptor.java | 171 ++++++++++-- .../exec/planner/sql/HivePartitionLocation.java | 54 ++++ .../HivePushPartitionFilterIntoScan.java | 150 ++++------- .../exec/store/hive/HiveDataTypeUtility.java | 122 +++++++++ .../drill/exec/store/hive/HiveRecordReader.java | 48 +--- .../exec/store/hive/HiveStoragePlugin.java | 2 +- .../apache/drill/exec/store/hive/HiveTable.java | 22 +- .../drill/exec/TestHivePartitionPruning.java | 28 +- .../exec/planner/logical/DirPathBuilder.java | 265 ------------------- .../DrillPushPartitionFilterIntoScan.java | 139 ---------- .../planner/logical/PartitionPruningUtil.java | 78 ------ 11 files changed, 424 insertions(+), 655 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/6d5d7cc2/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java index 93bfadd..d323db9 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java @@ -17,29 +17,54 @@ */ package org.apache.drill.exec.planner.sql; +import org.apache.calcite.util.BitSets; +import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.expr.fn.impl.DateUtility; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.planner.PartitionDescriptor; import org.apache.drill.exec.planner.PartitionLocation; +import org.apache.drill.exec.planner.logical.DrillScanRel; import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.apache.drill.exec.store.hive.HiveDataTypeUtility; +import org.apache.drill.exec.store.hive.HiveReadEntry; +import org.apache.drill.exec.store.hive.HiveScan; import org.apache.drill.exec.store.hive.HiveTable; +import org.apache.drill.exec.vector.NullableBigIntVector; +import org.apache.drill.exec.vector.NullableDateVector; +import org.apache.drill.exec.vector.NullableFloat4Vector; +import org.apache.drill.exec.vector.NullableFloat8Vector; +import org.apache.drill.exec.vector.NullableIntVector; +import org.apache.drill.exec.vector.NullableTimeStampVector; +import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.ValueVector; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormatter; +import java.sql.Timestamp; import java.util.BitSet; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; + // Partition descriptor for hive tables public class HivePartitionDescriptor implements PartitionDescriptor { private final Map<String, Integer> partitionMap = new HashMap<>(); private final int MAX_NESTED_SUBDIRS; + private final DrillScanRel scanRel; - public HivePartitionDescriptor(List<HiveTable.FieldSchemaWrapper> partitionName) { + public HivePartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) { int i = 0; - for (HiveTable.FieldSchemaWrapper wrapper : partitionName) { + this.scanRel = scanRel; + for (HiveTable.FieldSchemaWrapper wrapper : ((HiveScan) scanRel.getGroupScan()).hiveReadEntry.table.partitionKeys) { partitionMap.put(wrapper.name, i); i++; } @@ -61,37 +86,145 @@ public class HivePartitionDescriptor implements PartitionDescriptor { return MAX_NESTED_SUBDIRS; } - @Override - public Integer getIdIfValid(String name) { - return partitionMap.get(name); - } - - /* - * Following method stubs are just added to satisfy the interface implementation. - * Actual implementation will be added when hive partition pruning is plugged in - * as part of DRILL-3121 - */ - private String getBaseTableLocation() { - return null; + public String getBaseTableLocation() { + HiveReadEntry origEntry = ((HiveScan) scanRel.getGroupScan()).hiveReadEntry; + return origEntry.table.getTable().getSd().getLocation(); } @Override - public GroupScan createNewGroupScan(List<String> newFiles) throws Exception { - return null; + public GroupScan createNewGroupScan(List<String> newFiles) throws ExecutionSetupException { + HiveScan hiveScan = (HiveScan) scanRel.getGroupScan(); + HiveReadEntry origReadEntry = hiveScan.hiveReadEntry; + List<HiveTable.HivePartition> oldPartitions = origReadEntry.partitions; + List<HiveTable.HivePartition> newPartitions = new LinkedList<>(); + + for (HiveTable.HivePartition part: oldPartitions) { + String partitionLocation = part.getPartition().getSd().getLocation(); + for (String newPartitionLocation: newFiles) { + if (partitionLocation.equals(newPartitionLocation)) { + newPartitions.add(part); + } + } + } + + HiveReadEntry newReadEntry = new HiveReadEntry(origReadEntry.table, newPartitions, origReadEntry.hiveConfigOverride); + HiveScan newScan = new HiveScan(hiveScan.getUserName(), newReadEntry, hiveScan.storagePlugin, hiveScan.columns); + return newScan; } @Override public List<PartitionLocation> getPartitions() { - return null; + List<PartitionLocation> partitions = new LinkedList<>(); + HiveReadEntry origEntry = ((HiveScan) scanRel.getGroupScan()).hiveReadEntry; + List<String> allFileLocations = new LinkedList<>(); + for (Partition partition: origEntry.getPartitions()) { + allFileLocations.add(partition.getSd().getLocation()); + } + for (String file: allFileLocations) { + partitions.add(new HivePartitionLocation(MAX_NESTED_SUBDIRS, getBaseTableLocation(),file)); + } + return partitions; } @Override - public void populatePartitionVectors(ValueVector[] vectors, List<PartitionLocation> partitions, BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap) { + public void populatePartitionVectors(ValueVector[] vectors, List<PartitionLocation> partitions, + BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap) { + int record = 0; + for(PartitionLocation partitionLocation: partitions) { + for(int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)){ + populateVector(vectors[partitionColumnIndex], partitionLocation.getPartitionValue(partitionColumnIndex), + record); + } + record++; + } + for(ValueVector v : vectors) { + if(v == null){ + continue; + } + v.getMutator().setValueCount(partitions.size()); + } } @Override public TypeProtos.MajorType getVectorType(SchemaPath column, PlannerSettings plannerSettings) { - return null; + HiveScan hiveScan = (HiveScan) scanRel.getGroupScan(); + String partitionName = column.getAsNamePart().getName(); + Map<String, String> partitionNameTypeMap = hiveScan.hiveReadEntry.table.getPartitionNameTypeMap(); + String hiveType = partitionNameTypeMap.get(partitionName); + PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) TypeInfoUtils.getTypeInfoFromTypeString(hiveType); + + TypeProtos.MinorType partitionType = HiveDataTypeUtility.getMinorTypeFromHivePrimitiveTypeInfo(primitiveTypeInfo, + plannerSettings.getOptions()); + return TypeProtos.MajorType.newBuilder().setMode(TypeProtos.DataMode.OPTIONAL).setMinorType(partitionType).build(); + } + + @Override + public Integer getIdIfValid(String name) { + return partitionMap.get(name); + } + + public static void populateVector(ValueVector vector, String value, int record) { + TypeProtos.MinorType type = vector.getField().getType().getMinorType(); + + switch (type) { + case TINYINT: + case SMALLINT: + case INT: + if (value == null) { + ((NullableIntVector) vector).getMutator().setNull(record); + } else { + ((NullableIntVector) vector).getMutator().setSafe(record, Integer.parseInt(value)); + } + break; + + case BIGINT: + if (value == null) { + ((NullableBigIntVector) vector).getMutator().setNull(record); + } else { + ((NullableBigIntVector) vector).getMutator().setSafe(record, Long.parseLong(value)); + } + break; + case FLOAT4: + if (value == null) { + ((NullableFloat4Vector) vector).getMutator().setNull(record); + } else { + ((NullableFloat4Vector) vector).getMutator().setSafe(record, Float.parseFloat(value)); + } + break; + case FLOAT8: + if (value == null) { + ((NullableFloat8Vector) vector).getMutator().setNull(record); + } else { + ((NullableFloat8Vector) vector).getMutator().setSafe(record, Double.parseDouble(value)); + } + break; + case TIMESTAMP: + if (value == null) { + ((NullableTimeStampVector) vector).getMutator().setNull(record); + } else { + DateTimeFormatter f = DateUtility.getDateTimeFormatter(); + value = value.replace("%3A", ":"); + long ts = DateTime.parse(value, f).withZoneRetainFields(DateTimeZone.UTC).getMillis(); + ((NullableTimeStampVector) vector).getMutator().set(record, ts); + } + break; + case DATE: + if (value == null) { + ((NullableDateVector) vector).getMutator().setNull(record); + } else { + DateTimeFormatter f = DateUtility.formatDate; + long ts = DateTime.parse(value, f).withZoneRetainFields(DateTimeZone.UTC).getMillis(); + ((NullableDateVector) vector).getMutator().set(record, ts); + } + break; + case VARCHAR: + if (value == null) { + ((NullableVarCharVector) vector).getMutator().setNull(record); + } else { + ((NullableVarCharVector) vector).getMutator().set(record, value.getBytes()); + } + break; + } } } http://git-wip-us.apache.org/repos/asf/drill/blob/6d5d7cc2/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java new file mode 100644 index 0000000..e3066a4 --- /dev/null +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.sql; + +import org.apache.drill.exec.planner.PartitionLocation; + +public class HivePartitionLocation implements PartitionLocation { + private final String partitionLocation; + private final String[] partitionValue; + private final String fileSeparator = System.getProperty("file.separator"); + public HivePartitionLocation(int max, String baseTableLocation, String entireLocation) { + this.partitionLocation = entireLocation; + partitionValue = new String[max]; + int start = partitionLocation.indexOf(baseTableLocation) + baseTableLocation.length(); + String postPath = entireLocation.substring(start); + if (postPath.length() == 0) { + return; + } + if (postPath.startsWith(fileSeparator)) { + postPath = postPath.substring(postPath.indexOf(fileSeparator) + 1); + } + String[] mostDirs = postPath.split(fileSeparator); + assert mostDirs.length <= max; + for (int i = 0; i < mostDirs.length; i++) { + this.partitionValue[i] = mostDirs[i].substring(mostDirs[i].indexOf("=") + 1); + } + + } + @Override + public String getPartitionValue(int index) { + assert index < partitionValue.length; + return partitionValue[index]; + } + + @Override + public String getEntirePartitionLocation() { + return partitionLocation; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/6d5d7cc2/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java index 6ab1a78..90b0c5f 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java @@ -18,125 +18,75 @@ package org.apache.drill.exec.planner.sql.logical; -import java.util.LinkedList; -import java.util.List; - -import org.apache.drill.common.exceptions.DrillRuntimeException; -import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.OptimizerRulesContext; import org.apache.drill.exec.physical.base.GroupScan; -import org.apache.drill.exec.planner.logical.DirPathBuilder; +import org.apache.drill.exec.planner.PartitionDescriptor; import org.apache.drill.exec.planner.logical.DrillFilterRel; import org.apache.drill.exec.planner.logical.DrillProjectRel; -import org.apache.drill.exec.planner.logical.DrillRel; import org.apache.drill.exec.planner.logical.DrillScanRel; -import org.apache.drill.exec.planner.logical.PartitionPruningUtil; import org.apache.drill.exec.planner.logical.RelOptHelper; +import org.apache.drill.exec.planner.logical.partition.PruneScanRule; +import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.planner.sql.HivePartitionDescriptor; import org.apache.drill.exec.store.StoragePluginOptimizerRule; -import org.apache.drill.exec.store.hive.HiveReadEntry; import org.apache.drill.exec.store.hive.HiveScan; -import org.apache.drill.exec.store.hive.HiveTable; -import org.apache.drill.exec.store.hive.HiveTable.HivePartition; import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.calcite.plan.RelOptRuleOperand; - -import com.google.common.collect.Lists; - -public abstract class HivePushPartitionFilterIntoScan extends StoragePluginOptimizerRule { - - public static final StoragePluginOptimizerRule HIVE_FILTER_ON_PROJECT = - new HivePushPartitionFilterIntoScan( - RelOptHelper.some(DrillFilterRel.class, RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))), - "HivePushPartitionFilterIntoScan:Filter_On_Project") { - @Override - public boolean matches(RelOptRuleCall call) { - final DrillScanRel scan = (DrillScanRel) call.rel(2); - GroupScan groupScan = scan.getGroupScan(); - return groupScan instanceof HiveScan && groupScan.supportsPartitionFilterPushdown(); - } +public abstract class HivePushPartitionFilterIntoScan { - @Override - public void onMatch(RelOptRuleCall call) { - final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0); - final DrillProjectRel projectRel = (DrillProjectRel) call.rel(1); - final DrillScanRel scanRel = (DrillScanRel) call.rel(2); - doOnMatch(call, filterRel, projectRel, scanRel); - } - }; + public static final StoragePluginOptimizerRule getFilterOnProject(OptimizerRulesContext optimizerRulesContext) { + return new PruneScanRule( + RelOptHelper.some(DrillFilterRel.class, RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))), + "HivePushPartitionFilterIntoScan:Filter_On_Project_Hive", + optimizerRulesContext) { - public static final StoragePluginOptimizerRule HIVE_FILTER_ON_SCAN = - new HivePushPartitionFilterIntoScan( - RelOptHelper.some(DrillFilterRel.class, RelOptHelper.any(DrillScanRel.class)), - "HivePushPartitionFilterIntoScan:Filter_On_Scan") { - - @Override - public boolean matches(RelOptRuleCall call) { - final DrillScanRel scan = (DrillScanRel) call.rel(1); - GroupScan groupScan = scan.getGroupScan(); - return groupScan instanceof HiveScan && groupScan.supportsPartitionFilterPushdown(); - } - - @Override - public void onMatch(RelOptRuleCall call) { - final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0); - final DrillScanRel scanRel = (DrillScanRel) call.rel(1); - doOnMatch(call, filterRel, null, scanRel); - } - }; - - private HivePushPartitionFilterIntoScan( - RelOptRuleOperand operand, - String id) { - super(operand, id); - } - - private HiveReadEntry splitFilter(HiveReadEntry origReadEntry, DirPathBuilder builder) { - HiveTable table = origReadEntry.table; - List<HivePartition> partitions = origReadEntry.partitions; - List<HivePartition> newPartitions = new LinkedList<>(); - String pathPrefix = PartitionPruningUtil.truncatePrefixFromPath(table.getTable().getSd().getLocation()); - List<String> newFiles = Lists.newArrayList(); - List<String> dirPathList = builder.getDirPath(); - - for (String dirPath : dirPathList) { - String fullPath = pathPrefix + dirPath; - // check containment of this path in the list of files - for (HivePartition part: partitions) { - String origFilePath = origReadEntry.getPartitionLocation(part); - String origFileName = PartitionPruningUtil.truncatePrefixFromPath(origFilePath); + @Override + public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) { + return new HivePartitionDescriptor(settings, scanRel); + } - if (origFileName.startsWith(fullPath)) { - newFiles.add(origFileName); - newPartitions.add(part); - } + @Override + public boolean matches(RelOptRuleCall call) { + final DrillScanRel scan = (DrillScanRel) call.rel(2); + GroupScan groupScan = scan.getGroupScan(); + // this rule is applicable only for Hive based partition pruning + return groupScan instanceof HiveScan && groupScan.supportsPartitionFilterPushdown(); } - } - if (newFiles.size() > 0) { - HiveReadEntry newReadEntry = new HiveReadEntry(table, newPartitions, origReadEntry.hiveConfigOverride); - return newReadEntry; - } - return origReadEntry; + @Override + public void onMatch(RelOptRuleCall call) { + final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0); + final DrillProjectRel projectRel = (DrillProjectRel) call.rel(1); + final DrillScanRel scanRel = (DrillScanRel) call.rel(2); + doOnMatch(call, filterRel, projectRel, scanRel); + } + }; } - protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillProjectRel projectRel, DrillScanRel scanRel) { - DrillRel inputRel = projectRel != null ? projectRel : scanRel; - HiveReadEntry origReadEntry = ((HiveScan)scanRel.getGroupScan()).hiveReadEntry; - DirPathBuilder builder = new DirPathBuilder(filterRel, inputRel, filterRel.getCluster().getRexBuilder(), new HivePartitionDescriptor(origReadEntry.table.partitionKeys)); - HiveReadEntry newReadEntry = splitFilter(origReadEntry, builder); + public static final StoragePluginOptimizerRule getFilterOnScan(OptimizerRulesContext optimizerRulesContext) { + return new PruneScanRule( + RelOptHelper.some(DrillFilterRel.class, RelOptHelper.any(DrillScanRel.class)), + "HivePushPartitionFilterIntoScan:Filter_On_Scan_Hive", optimizerRulesContext) { - if (origReadEntry == newReadEntry) { - return; // no directory filter was pushed down - } + @Override + public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) { + return new HivePartitionDescriptor(settings, scanRel); + } - try { - HiveScan oldScan = (HiveScan) scanRel.getGroupScan(); - HiveScan hiveScan = new HiveScan(oldScan.getUserName(), newReadEntry, oldScan.storagePlugin, oldScan.columns); - PartitionPruningUtil.rewritePlan(call, filterRel, projectRel, scanRel, hiveScan, builder); - } catch (ExecutionSetupException e) { - throw new DrillRuntimeException(e); - } + @Override + public boolean matches(RelOptRuleCall call) { + final DrillScanRel scan = (DrillScanRel) call.rel(1); + GroupScan groupScan = scan.getGroupScan(); + // this rule is applicable only for Hive based partition pruning + return groupScan instanceof HiveScan && groupScan.supportsPartitionFilterPushdown(); + } + @Override + public void onMatch(RelOptRuleCall call) { + final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0); + final DrillScanRel scanRel = (DrillScanRel) call.rel(1); + doOnMatch(call, filterRel, null, scanRel); + } + }; } } http://git-wip-us.apache.org/repos/asf/drill/blob/6d5d7cc2/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDataTypeUtility.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDataTypeUtility.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDataTypeUtility.java new file mode 100644 index 0000000..84d8790 --- /dev/null +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDataTypeUtility.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.hive; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.expr.holders.Decimal18Holder; +import org.apache.drill.exec.expr.holders.Decimal28SparseHolder; +import org.apache.drill.exec.expr.holders.Decimal38SparseHolder; +import org.apache.drill.exec.expr.holders.Decimal9Holder; +import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.util.DecimalUtility; +import org.apache.drill.exec.vector.AllocationHelper; +import org.apache.drill.exec.vector.BigIntVector; +import org.apache.drill.exec.vector.BitVector; +import org.apache.drill.exec.vector.DateVector; +import org.apache.drill.exec.vector.Decimal18Vector; +import org.apache.drill.exec.vector.Decimal28SparseVector; +import org.apache.drill.exec.vector.Decimal38SparseVector; +import org.apache.drill.exec.vector.Decimal9Vector; +import org.apache.drill.exec.vector.Float4Vector; +import org.apache.drill.exec.vector.Float8Vector; +import org.apache.drill.exec.vector.IntVector; +import org.apache.drill.exec.vector.TimeStampVector; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.VarBinaryVector; +import org.apache.drill.exec.vector.VarCharVector; +import org.apache.drill.exec.work.ExecErrorConstants; + +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; + +public class HiveDataTypeUtility { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveDataTypeUtility.class); + + public static TypeProtos.MinorType getMinorTypeFromHivePrimitiveTypeInfo(PrimitiveTypeInfo primitiveTypeInfo, + OptionManager options) { + switch(primitiveTypeInfo.getPrimitiveCategory()) { + case BINARY: + return TypeProtos.MinorType.VARBINARY; + case BOOLEAN: + return TypeProtos.MinorType.BIT; + case DECIMAL: { + + if (options.getOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY).bool_val == false) { + throw UserException.unsupportedError() + .message(ExecErrorConstants.DECIMAL_DISABLE_ERR_MSG) + .build(logger); + } + DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) primitiveTypeInfo; + return DecimalUtility.getDecimalDataType(decimalTypeInfo.precision()); + } + case DOUBLE: + return TypeProtos.MinorType.FLOAT8; + case FLOAT: + return TypeProtos.MinorType.FLOAT4; + // TODO (DRILL-2470) + // Byte and short (tinyint and smallint in SQL types) are currently read as integers + // as these smaller integer types are not fully supported in Drill today. + case SHORT: + case BYTE: + case INT: + return TypeProtos.MinorType.INT; + case LONG: + return TypeProtos.MinorType.BIGINT; + case STRING: + case VARCHAR: + return TypeProtos.MinorType.VARCHAR; + case TIMESTAMP: + return TypeProtos.MinorType.TIMESTAMP; + case DATE: + return TypeProtos.MinorType.DATE; + } + throwUnsupportedHiveDataTypeError(primitiveTypeInfo.getPrimitiveCategory().toString()); + return null; + } + + public static void throwUnsupportedHiveDataTypeError(String unsupportedType) { + StringBuilder errMsg = new StringBuilder(); + errMsg.append(String.format("Unsupported Hive data type %s. ", unsupportedType)); + errMsg.append(System.getProperty("line.separator")); + errMsg.append("Following Hive data types are supported in Drill for querying: "); + errMsg.append( + "BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, DATE, TIMESTAMP, BINARY, DECIMAL, STRING, and VARCHAR"); + + throw new RuntimeException(errMsg.toString()); + } +} + http://git-wip-us.apache.org/repos/asf/drill/blob/6d5d7cc2/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java index 088fb74..1a66ad9 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java @@ -126,7 +126,7 @@ public class HiveRecordReader extends AbstractRecordReader { protected static final int FIELD_SIZE = 50; public HiveRecordReader(Table table, Partition partition, InputSplit inputSplit, List<SchemaPath> projectedColumns, - FragmentContext context, Map<String, String> hiveConfigOverride) throws ExecutionSetupException { + FragmentContext context, Map<String, String> hiveConfigOverride) throws ExecutionSetupException { this.table = table; this.partition = partition; this.inputSplit = inputSplit; @@ -353,53 +353,11 @@ public class HiveRecordReader extends AbstractRecordReader { } } - private MinorType getMinorTypeFromHivePrimitiveTypeInfo(PrimitiveTypeInfo primitiveTypeInfo) { - switch(primitiveTypeInfo.getPrimitiveCategory()) { - case BINARY: - return TypeProtos.MinorType.VARBINARY; - case BOOLEAN: - return MinorType.BIT; - case DECIMAL: { - - if (context.getOptions().getOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY).bool_val == false) { - throw UserException.unsupportedError() - .message(ExecErrorConstants.DECIMAL_DISABLE_ERR_MSG) - .build(logger); - } - DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) primitiveTypeInfo; - return DecimalUtility.getDecimalDataType(decimalTypeInfo.precision()); - } - case DOUBLE: - return MinorType.FLOAT8; - case FLOAT: - return MinorType.FLOAT4; - // TODO (DRILL-2470) - // Byte and short (tinyint and smallint in SQL types) are currently read as integers - // as these smaller integer types are not fully supported in Drill today. - case SHORT: - case BYTE: - case INT: - return MinorType.INT; - case LONG: - return MinorType.BIGINT; - case STRING: - case VARCHAR: - return MinorType.VARCHAR; - case TIMESTAMP: - return MinorType.TIMESTAMP; - case DATE: - return MinorType.DATE; - } - - throwUnsupportedHiveDataTypeError(primitiveTypeInfo.getPrimitiveCategory().toString()); - return null; - } - public MajorType getMajorTypeFromHiveTypeInfo(TypeInfo typeInfo, boolean nullable) { switch (typeInfo.getCategory()) { case PRIMITIVE: { PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo; - MinorType minorType = getMinorTypeFromHivePrimitiveTypeInfo(primitiveTypeInfo); + MinorType minorType = HiveDataTypeUtility.getMinorTypeFromHivePrimitiveTypeInfo(primitiveTypeInfo, context.getOptions()); MajorType.Builder typeBuilder = MajorType.newBuilder().setMinorType(minorType) .setMode((nullable ? DataMode.OPTIONAL : DataMode.REQUIRED)); @@ -526,7 +484,7 @@ public class HiveRecordReader extends AbstractRecordReader { } private void populateDecimalPartitionVector(DecimalTypeInfo typeInfo, ValueVector vector, BigDecimal bigDecimal, - int recordCount) { + int recordCount) { int precision = typeInfo.precision(); int scale = typeInfo.scale(); if (precision <= 9) { http://git-wip-us.apache.org/repos/asf/drill/blob/6d5d7cc2/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java index 4813c5e..22552b7 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java @@ -89,7 +89,7 @@ public class HiveStoragePlugin extends AbstractStoragePlugin { @Override public Set<StoragePluginOptimizerRule> getOptimizerRules(OptimizerRulesContext optimizerRulesContext) { - return ImmutableSet.of(HivePushPartitionFilterIntoScan.HIVE_FILTER_ON_PROJECT, HivePushPartitionFilterIntoScan.HIVE_FILTER_ON_SCAN); + return ImmutableSet.of(HivePushPartitionFilterIntoScan.getFilterOnProject(optimizerRulesContext), HivePushPartitionFilterIntoScan.getFilterOnScan(optimizerRulesContext)); } } http://git-wip-us.apache.org/repos/asf/drill/blob/6d5d7cc2/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java index 99101cc..88fe8c3 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.hive; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -64,12 +65,15 @@ public class HiveTable { @JsonProperty public String tableType; + @JsonIgnore + public final Map<String, String> partitionNameTypeMap = new HashMap(); + @JsonCreator public HiveTable(@JsonProperty("tableName") String tableName, @JsonProperty("dbName") String dbName, @JsonProperty("owner") String owner, @JsonProperty("createTime") int createTime, @JsonProperty("lastAccessTime") int lastAccessTime, @JsonProperty("retention") int retention, @JsonProperty("sd") StorageDescriptorWrapper sd, @JsonProperty("partitionKeys") List<FieldSchemaWrapper> partitionKeys, @JsonProperty("parameters") Map<String, String> parameters, @JsonProperty("viewOriginalText") String viewOriginalText, @JsonProperty("viewExpandedText") String viewExpandedText, @JsonProperty("tableType") String tableType - ) { + ) { this.tableName = tableName; this.dbName = dbName; this.owner = owner; @@ -86,10 +90,11 @@ public class HiveTable { List<FieldSchema> partitionKeysUnwrapped = Lists.newArrayList(); for (FieldSchemaWrapper w : partitionKeys) { partitionKeysUnwrapped.add(w.getFieldSchema()); + partitionNameTypeMap.put(w.name, w.type); } StorageDescriptor sdUnwrapped = sd.getSd(); this.table = new Table(tableName, dbName, owner, createTime, lastAccessTime, retention, sdUnwrapped, partitionKeysUnwrapped, - parameters, viewOriginalText, viewExpandedText, tableType); + parameters, viewOriginalText, viewExpandedText, tableType); } public HiveTable(Table table) { @@ -107,6 +112,7 @@ public class HiveTable { this.partitionKeys = Lists.newArrayList(); for (FieldSchema f : table.getPartitionKeys()) { this.partitionKeys.add(new FieldSchemaWrapper(f)); + partitionNameTypeMap.put(f.getName(), f.getType()); } this.parameters = table.getParameters(); this.viewOriginalText = table.getViewOriginalText(); @@ -156,8 +162,8 @@ public class HiveTable { @JsonCreator public HivePartition(@JsonProperty("values") List<String> values, @JsonProperty("tableName") String tableName, @JsonProperty("dbName") String dbName, @JsonProperty("createTime") int createTime, - @JsonProperty("lastAccessTime") int lastAccessTime, @JsonProperty("sd") StorageDescriptorWrapper sd, - @JsonProperty("parameters") Map<String, String> parameters + @JsonProperty("lastAccessTime") int lastAccessTime, @JsonProperty("sd") StorageDescriptorWrapper sd, + @JsonProperty("parameters") Map<String, String> parameters ) { this.values = values; this.tableName = tableName; @@ -217,7 +223,7 @@ public class HiveTable { public int numBuckets; @JsonProperty public SerDeInfoWrapper serDeInfo; -// @JsonProperty + // @JsonProperty // public List<String> bucketCols; @JsonProperty public List<OrderWrapper> sortCols; @@ -251,7 +257,7 @@ public class HiveTable { // this.sd = new StorageDescriptor(colsUnwrapped, location, inputFormat, outputFormat, compressed, numBuckets, serDeInfoUnwrapped, // bucketCols, sortColsUnwrapped, parameters); this.sd = new StorageDescriptor(colsUnwrapped, location, inputFormat, outputFormat, compressed, numBuckets, serDeInfoUnwrapped, - null, sortColsUnwrapped, parameters); + null, sortColsUnwrapped, parameters); } public StorageDescriptorWrapper(StorageDescriptor sd) { @@ -369,4 +375,8 @@ public class HiveTable { } } + public Map<String, String> getPartitionNameTypeMap() { + return partitionNameTypeMap; + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/6d5d7cc2/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java index c846328..0ea9d53 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java @@ -37,7 +37,7 @@ public class TestHivePartitionPruning extends HiveTestBase { /* Partition pruning is not supported for disjuncts that do not meet pruning criteria. * Will be enabled when we can do wild card comparison for partition pruning */ - @Ignore + @Test public void testDisjunctsPartitionFilter() throws Exception { final String query = "explain plan for select * from hive.`default`.partition_pruning_test where (c = 1) or (d = 1)"; final String plan = getPlanInString(query, OPTIQ_FORMAT); @@ -55,7 +55,7 @@ public class TestHivePartitionPruning extends HiveTestBase { assertFalse(plan.contains("Filter")); } - @Ignore("DRILL-1571") + @Test public void testComplexFilter() throws Exception { final String query = "explain plan for select * from hive.`default`.partition_pruning_test where (c = 1 and d = 1) or (c = 2 and d = 3)"; final String plan = getPlanInString(query, OPTIQ_FORMAT); @@ -63,4 +63,28 @@ public class TestHivePartitionPruning extends HiveTestBase { // Check and make sure that Filter is not present in the plan assertFalse(plan.contains("Filter")); } + + @Test + public void testRangeFilter() throws Exception { + final String query = "explain plan for " + + "select * from hive.`default`.partition_pruning_test where " + + "c > 1 and d > 1"; + + final String plan = getPlanInString(query, OPTIQ_FORMAT); + + // Check and make sure that Filter is not present in the plan + assertFalse(plan.contains("Filter")); + } + + @Test + public void testRangeFilterWithDisjunct() throws Exception { + final String query = "explain plan for " + + "select * from hive.`default`.partition_pruning_test where " + + "(c > 1 and d > 1) or (c < 2 and d < 2)"; + + final String plan = getPlanInString(query, OPTIQ_FORMAT); + + // Check and make sure that Filter is not present in the plan + assertFalse(plan.contains("Filter")); + } } http://git-wip-us.apache.org/repos/asf/drill/blob/6d5d7cc2/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DirPathBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DirPathBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DirPathBuilder.java deleted file mode 100644 index 892e8cb..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DirPathBuilder.java +++ /dev/null @@ -1,265 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.planner.logical; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.drill.common.expression.FieldReference; -import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.exec.planner.PartitionDescriptor; -import org.apache.calcite.plan.RelOptUtil; -import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexCall; -import org.apache.calcite.rex.RexCorrelVariable; -import org.apache.calcite.rex.RexDynamicParam; -import org.apache.calcite.rex.RexFieldAccess; -import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexLiteral; -import org.apache.calcite.rex.RexLocalRef; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.rex.RexOver; -import org.apache.calcite.rex.RexRangeRef; -import org.apache.calcite.rex.RexUtil; -import org.apache.calcite.rex.RexVisitorImpl; -import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.SqlSyntax; - -import com.google.common.collect.Lists; - -public class DirPathBuilder extends RexVisitorImpl <SchemaPath> { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DirPathBuilder.class); - - static final String EMPTY_STRING = ""; - - final private DrillFilterRel filterRel; - final private DrillRel inputRel; - final private RexBuilder builder; - final private PartitionDescriptor partitionDescriptor; - - private List<String> dirNameList; - private List<RexNode> conjunctList; - private List<String> dirPathList = Lists.newArrayList(); - private final static List<String> emptyDirPathList = new ArrayList<>(0); - private RexNode currentConjunct = null; // the current conjunct are we evaluating during visitor traversal - private RexNode finalCondition = null; // placeholder for the final filter condition - private boolean dirMatch = false; - - public DirPathBuilder(DrillFilterRel filterRel, DrillRel inputRel, RexBuilder builder, PartitionDescriptor partitionDescriptor) { - super(true); - this.filterRel = filterRel; - this.inputRel = inputRel; - this.builder = builder; - this.finalCondition = filterRel.getCondition(); - this.partitionDescriptor = partitionDescriptor; - } - - private void initPathComponents() { - int maxHierarchy = partitionDescriptor.getMaxHierarchyLevel(); - dirNameList = Lists.newArrayListWithExpectedSize(maxHierarchy); - conjunctList = Lists.newArrayListWithExpectedSize(maxHierarchy); - for (int i=0; i < maxHierarchy; i++) { - dirNameList.add(EMPTY_STRING); - conjunctList.add(null); - } - } - - /** - * Build a directory path string for filter conditions containing directory filters. - * For example, suppose we have directory hierarchy: - * {orders/2012/Jan...Dec, orders/2013/Jan...Dec, orders/2014/Jan...Dec} - * path will be built for following types of filters (More types of filters will be added in the future): - * 1. SELECT * FROM <path>/orders WHERE o_custkey = 5 AND dir0 = '2014' AND dir1 = 'June' - * 2. SELECT * FROM <path>/orders WHERE (dir0 = '2013' AND dir1 = 'June') OR (dir0 = '2014' AND dir1 = 'June') - * For (1) dirPath = <path>/orders/2014/June - * For (2) there are 2 dirPaths: {<path>/orders/2013/June, <path>/orders/2014/June} - * @return The list of strings corresponding to directory paths - */ - public List<String> getDirPath() { - List<RexNode> disjuncts = RelOptUtil.disjunctions(filterRel.getCondition()); - boolean buildDisjunction = false; - List<RexNode> newDisjunctList = Lists.newArrayList(); - - for (RexNode d : disjuncts) { // process the top-level disjuncts - List<RexNode> conjuncts = RelOptUtil.conjunctions(d); - String dirPath = EMPTY_STRING; - initPathComponents(); - - boolean buildConjunction = false; - - // go through the conjuncts to identify the directory filters - for (RexNode c : conjuncts) { - currentConjunct = c; - SchemaPath expr = c.accept(this); - - if (expr != null) { - logger.debug("Found directory filter: " + expr.getRootSegment().getPath()); - } - } - - String prevPath = dirNameList.get(0); - - // compose the final path string - for (int i = 0; i < dirNameList.size(); i++) { - String path = dirNameList.get(i); - if (i > 0) { - prevPath = dirNameList.get(i-1); - } - // Check if both the current path and the previous path are non-empty; currently - // we will only push a dir<N> filter if dir<N-1> filter is also specified - if (!path.equals(EMPTY_STRING) && !prevPath.equals(EMPTY_STRING)) { - dirPath += "/" + path; - - // since we are pushing this directory filter we should remove it from the - // list of conjuncts - RexNode thisConjunct = conjunctList.get(i); - conjuncts.remove(thisConjunct); - buildConjunction = true; - } - } - if (!dirPath.equals(EMPTY_STRING)) { - dirPathList.add(dirPath); - } else { - // If one of the disjuncts do not satisfy our criteria then we shouldn't apply any optimization - return emptyDirPathList; - } - - if (buildConjunction) { - RexNode newConjunct = RexUtil.composeConjunction(builder, conjuncts, false); - newDisjunctList.add(newConjunct); - buildDisjunction = true; - } - - } // for (disjuncts) - - if (buildDisjunction) { - this.finalCondition = RexUtil.composeDisjunction(builder, newDisjunctList, false); - } - return dirPathList; - } - - public RexNode getFinalCondition() { - return finalCondition; - } - - @Override - public SchemaPath visitInputRef(RexInputRef inputRef) { - final int index = inputRef.getIndex(); - final RelDataTypeField field = inputRel.getRowType().getFieldList().get(index); - if (partitionDescriptor.isPartitionName(field.getName())) { - dirMatch = true; - } - return FieldReference.getWithQuotedRef(field.getName()); - } - - @Override - public SchemaPath visitCall(RexCall call) { -// logger.debug("RexCall {}, {}", call); - final SqlSyntax syntax = call.getOperator().getSyntax(); - switch (syntax) { - case BINARY: - if (call.getKind() == SqlKind.EQUALS) { - dirMatch = false; - // TODO: an assumption here is that the binary predicate is of the form <column> = <value>. - // In order to handle predicates of the form '<value> = <column>' we would need to canonicalize - // the predicate first before calling this function. - SchemaPath e1 = call.getOperands().get(0).accept(this); - - if (dirMatch && e1 != null) { - // get the index for the 'dir<N>' filter - String dirName = e1.getRootSegment().getPath(); - int hierarychyIndex = partitionDescriptor.getPartitionHierarchyIndex(dirName); - - if (hierarychyIndex >= partitionDescriptor.getMaxHierarchyLevel()) { - return null; - } - - // SchemaPath e2 = call.getOperands().get(1).accept(this); - if (call.getOperands().get(1).getKind() == SqlKind.LITERAL) { - String e2 = ((RexLiteral)call.getOperands().get(1)).getValue2().toString(); - dirNameList.set(hierarychyIndex, e2); - // dirNameList.set(suffixIndex, e2.getRootSegment().getPath()); - conjunctList.set(hierarychyIndex, currentConjunct); - return e1; - } - } - } - - return null; - - case SPECIAL: - switch(call.getKind()) { - case CAST: - return getInputFromCast(call); - - default: - - } - // fall through - default: - // throw new AssertionError("Unexpected expression"); - - } - return null; - } - - private SchemaPath getInputFromCast(RexCall call){ - SchemaPath arg = call.getOperands().get(0).accept(this); - if (dirMatch) { - return arg; - } - return null; - } - - @Override - public SchemaPath visitLocalRef(RexLocalRef localRef) { - return null; - } - - @Override - public SchemaPath visitOver(RexOver over) { - return null; - } - - @Override - public SchemaPath visitCorrelVariable(RexCorrelVariable correlVariable) { - return null; - } - - @Override - public SchemaPath visitDynamicParam(RexDynamicParam dynamicParam) { - return null; - } - - @Override - public SchemaPath visitRangeRef(RexRangeRef rangeRef) { - return null; - } - - @Override - public SchemaPath visitFieldAccess(RexFieldAccess fieldAccess) { - return super.visitFieldAccess(fieldAccess); - } - - @Override - public SchemaPath visitLiteral(RexLiteral literal) { - return FieldReference.getWithQuotedRef(literal.getValue2().toString()); - } - -} http://git-wip-us.apache.org/repos/asf/drill/blob/6d5d7cc2/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java deleted file mode 100644 index 811eef1..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java +++ /dev/null @@ -1,139 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.drill.exec.planner.logical; - -import java.io.IOException; -import java.util.List; - -import org.apache.drill.common.exceptions.DrillRuntimeException; -import org.apache.drill.exec.physical.base.FileGroupScan; -import org.apache.drill.exec.physical.base.GroupScan; -import org.apache.drill.exec.planner.FileSystemPartitionDescriptor; -import org.apache.drill.exec.planner.physical.PlannerSettings; -import org.apache.drill.exec.planner.physical.PrelUtil; -import org.apache.drill.exec.store.dfs.FileSelection; -import org.apache.drill.exec.store.dfs.FormatSelection; -import org.apache.calcite.plan.RelOptRule; -import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.calcite.plan.RelOptRuleOperand; - -import com.google.common.collect.Lists; - -public abstract class DrillPushPartitionFilterIntoScan extends RelOptRule { - - public static final RelOptRule FILTER_ON_PROJECT = - new DrillPushPartitionFilterIntoScan( - RelOptHelper.some(DrillFilterRel.class, RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))), - "DrillPushPartitionFilterIntoScan:Filter_On_Project") { - - @Override - public boolean matches(RelOptRuleCall call) { - final DrillScanRel scan = (DrillScanRel) call.rel(2); - GroupScan groupScan = scan.getGroupScan(); - // this rule is applicable only for dfs based partition pruning - return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown(); - } - - @Override - public void onMatch(RelOptRuleCall call) { - final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0); - final DrillProjectRel projectRel = (DrillProjectRel) call.rel(1); - final DrillScanRel scanRel = (DrillScanRel) call.rel(2); - doOnMatch(call, filterRel, projectRel, scanRel); - } - }; - - public static final RelOptRule FILTER_ON_SCAN = - new DrillPushPartitionFilterIntoScan( - RelOptHelper.some(DrillFilterRel.class, RelOptHelper.any(DrillScanRel.class)), - "DrillPushPartitionFilterIntoScan:Filter_On_Scan") { - - @Override - public boolean matches(RelOptRuleCall call) { - final DrillScanRel scan = (DrillScanRel) call.rel(1); - GroupScan groupScan = scan.getGroupScan(); - // this rule is applicable only for dfs based partition pruning - return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown(); - } - - @Override - public void onMatch(RelOptRuleCall call) { - final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0); - final DrillScanRel scanRel = (DrillScanRel) call.rel(1); - doOnMatch(call, filterRel, null, scanRel); - } - }; - - private DrillPushPartitionFilterIntoScan( - RelOptRuleOperand operand, - String id) { - super(operand, id); - } - - private FormatSelection splitFilter(FormatSelection origSelection, DirPathBuilder builder) { - - List<String> origFiles = origSelection.getAsFiles(); - String pathPrefix = origSelection.getSelection().selectionRoot; - - List<String> newFiles = Lists.newArrayList(); - - List<String> dirPathList = builder.getDirPath(); - - for (String dirPath : dirPathList) { - String fullPath = pathPrefix + dirPath; - // check containment of this path in the list of files - for (String origFilePath : origFiles) { - String origFileName = PartitionPruningUtil.truncatePrefixFromPath(origFilePath); - - if (origFileName.startsWith(fullPath)) { - newFiles.add(origFileName); - } - } - } - - if (newFiles.size() > 0) { - FileSelection newFileSelection = new FileSelection(newFiles, origSelection.getSelection().selectionRoot, true); - FormatSelection newFormatSelection = new FormatSelection(origSelection.getFormat(), newFileSelection); - return newFormatSelection; - } - - return origSelection; - } - - protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillProjectRel projectRel, DrillScanRel scanRel) { - DrillRel inputRel = projectRel != null ? projectRel : scanRel; - - PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner()); - DirPathBuilder builder = new DirPathBuilder(filterRel, inputRel, filterRel.getCluster().getRexBuilder(), new FileSystemPartitionDescriptor(settings, scanRel)); - - FormatSelection origSelection = (FormatSelection)scanRel.getDrillTable().getSelection(); - FormatSelection newSelection = splitFilter(origSelection, builder); - - if (origSelection == newSelection) { - return; // no directory filter was pushed down - } - - try { - FileGroupScan fgscan = ((FileGroupScan)scanRel.getGroupScan()).clone(newSelection.getSelection()); - PartitionPruningUtil.rewritePlan(call, filterRel, projectRel, scanRel, fgscan, builder); - } catch (IOException e) { - throw new DrillRuntimeException(e) ; - } - } -} http://git-wip-us.apache.org/repos/asf/drill/blob/6d5d7cc2/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PartitionPruningUtil.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PartitionPruningUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PartitionPruningUtil.java deleted file mode 100644 index 05ccfb9..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PartitionPruningUtil.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.planner.logical; - -import org.apache.drill.exec.physical.base.GroupScan; -import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.calcite.rex.RexNode; - -public class PartitionPruningUtil { - public static void rewritePlan(RelOptRuleCall call, DrillFilterRel filterRel, DrillProjectRel projectRel, DrillScanRel scanRel, GroupScan newScan, DirPathBuilder builder) { - RexNode origFilterCondition = filterRel.getCondition(); - RexNode newFilterCondition = builder.getFinalCondition(); - - if (newFilterCondition.isAlwaysTrue()) { - - final DrillScanRel newScanRel = - new DrillScanRel(scanRel.getCluster(), - scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL), - scanRel.getTable(), - newScan, - scanRel.getRowType(), - scanRel.getColumns()); - - if (projectRel != null) { - DrillProjectRel newProjectRel = new DrillProjectRel(projectRel.getCluster(), projectRel.getTraitSet(), - newScanRel, projectRel.getProjects(), filterRel.getRowType()); - - call.transformTo(newProjectRel); - } else { - call.transformTo(newScanRel); - } - } else { - DrillRel inputRel; - final DrillScanRel newScanRel = - new DrillScanRel(scanRel.getCluster(), - scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL), - scanRel.getTable(), - newScan, - scanRel.getRowType(), - scanRel.getColumns()); - if (projectRel != null) { - DrillProjectRel newProjectRel = new DrillProjectRel(projectRel.getCluster(), projectRel.getTraitSet(), - newScanRel, projectRel.getProjects(), projectRel.getRowType()); - inputRel = newProjectRel; - } else { - inputRel = newScanRel; - } - final DrillFilterRel newFilterRel = new DrillFilterRel(filterRel.getCluster(), filterRel.getTraitSet(), - inputRel, origFilterCondition /* for now keep the original condition until we add more test coverage */); - - call.transformTo(newFilterRel); - } - } - - public static String truncatePrefixFromPath(String fileName) { - String pathPrefixComponent[] = fileName.split(":", 2); - if (pathPrefixComponent.length == 1) { - return pathPrefixComponent[0]; - } else { - return pathPrefixComponent[1]; - } - } -}