DRILL-631 : do not push projects down if scan does not support . DRILL-631 : more work.
DRILL-631: Groupscan by default does not support project pushdown into scan. Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e790e796 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e790e796 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e790e796 Branch: refs/heads/master Commit: e790e79622f3b835ca6694d94435b3397a630725 Parents: 0b9893c Author: Jinfeng Ni <[email protected]> Authored: Sun May 4 12:12:18 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Tue May 6 14:49:29 2014 -0700 ---------------------------------------------------------------------- .../drill/exec/store/hbase/HBaseGroupScan.java | 8 +++- .../exec/physical/base/AbstractGroupScan.java | 13 ++++++- .../drill/exec/physical/base/GroupScan.java | 15 ++++++-- .../planner/logical/DrillPushProjIntoScan.java | 40 +++++++++++++++----- .../exec/planner/logical/DrillRuleSets.java | 8 ++-- .../exec/planner/logical/DrillScanRel.java | 17 +++++++-- .../drill/exec/planner/physical/PrelUtil.java | 25 +++++++----- .../drill/exec/planner/physical/ScanPrule.java | 5 +-- .../planner/sql/handlers/ExplainHandler.java | 4 ++ .../exec/store/dfs/easy/EasyGroupScan.java | 39 +++++++++++-------- .../exec/store/direct/DirectGroupScan.java | 2 +- .../exec/store/ischema/InfoSchemaGroupScan.java | 15 ++++---- .../exec/store/parquet/ParquetGroupScan.java | 31 ++++++++------- .../org/apache/drill/TestProjectPushDown.java | 2 +- 14 files changed, 150 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e790e796/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java index 21021d3..bcdebc3 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java @@ -118,7 +118,7 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst this.storagePlugin = that.storagePlugin; this.storagePluginConfig = that.storagePluginConfig; } - + private void getRegionInfos() { logger.debug("Getting region locations"); try { @@ -262,4 +262,10 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst newScan.columns = columns; return newScan; } + + @Override + public List<SchemaPath> checkProjPush(List<SchemaPath> columns) { + return columns; + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e790e796/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java index f4cee2a..cd78bc1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java @@ -28,7 +28,7 @@ import com.google.common.collect.Iterators; public abstract class AbstractGroupScan extends AbstractBase implements GroupScan { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractGroupScan.class); - + @Override public Iterator<PhysicalOperator> iterator() { return Iterators.emptyIterator(); @@ -60,4 +60,15 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca public long getMaxAllocation() { return 0; } + + /** + * Check if groupscan can support projects-push-down into scan. + * The default implementation assumes groupscan could not support project pushdown, by returning null. + * If one particular group scan can support, it should override this method. + */ + @Override + public List<SchemaPath> checkProjPush(List<SchemaPath> columns) { + return null; + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e790e796/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java index 32d68b3..492dbc1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java @@ -30,7 +30,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; * A GroupScan operator represents all data which will be scanned by a given physical * plan. It is the superset of all SubScans for the plan. */ -public interface GroupScan extends Scan, HasAffinity{ +public interface GroupScan extends Scan, HasAffinity{ public abstract void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException; @@ -45,10 +45,19 @@ public interface GroupScan extends Scan, HasAffinity{ */ @JsonIgnore public abstract String getDigest(); - + /** - * Returns a clone of Groupscan instance, except that the new GroupScan will use the provided list of columns + * Returns a clone of Groupscan instance, except that the new GroupScan will use the provided list of columns . + * */ @JsonIgnore public GroupScan clone(List<SchemaPath> columns); + + /** + * GroupScan should check the list of columns, and see if it could support all the columns in the list. + * If it can not support any of them, return null. Null indicates that this groupscan will not support + * project pushdown for this list of columns. + */ + public List<SchemaPath> checkProjPush(List<SchemaPath> columns); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e790e796/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java index 98028b8..0eae1da 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.planner.logical; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -26,6 +27,9 @@ import java.util.TreeSet; import net.hydromatic.optiq.rules.java.JavaRules.EnumerableTableAccessRel; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.planner.physical.PrelUtil; import org.eigenbase.rel.ProjectRel; import org.eigenbase.rel.ProjectRelBase; import org.eigenbase.rel.RelNode; @@ -40,6 +44,7 @@ import org.eigenbase.rex.RexInputRef; import org.eigenbase.rex.RexNode; import org.eigenbase.rex.RexShuttle; +import com.google.common.base.Objects; import com.google.hive12.common.collect.Lists; public class DrillPushProjIntoScan extends RelOptRule { @@ -58,18 +63,35 @@ public class DrillPushProjIntoScan extends RelOptRule { RelDataType newScanRowType = createStructType(scan.getCluster().getTypeFactory(), getProjectedFields(scan.getRowType(),columnsIds)); - final DrillScanRel newScan = new DrillScanRel(scan.getCluster(), scan.getTraitSet().plus(DrillRel.DRILL_LOGICAL), - scan.getTable(), newScanRowType); + DrillTable drillTable = scan.getTable().unwrap(DrillTable.class); + try { + List<SchemaPath> columns = PrelUtil.getColumns(newScanRowType); - List<RexNode> convertedExprs = getConvertedProjExp(proj, scan, columnsIds); + GroupScan groupScan = drillTable.getGroupScan(); - final DrillProjectRel newProj = new DrillProjectRel(proj.getCluster(), proj.getTraitSet().plus(DrillRel.DRILL_LOGICAL), - newScan, convertedExprs, proj.getRowType()); + //Check if the group scan can support the list of columns. If not support, return without doing any further transformation. + List<SchemaPath> pushedColumns = groupScan.checkProjPush(columns); - if (RemoveTrivialProjectRule.isTrivial(newProj)) { - call.transformTo(newScan); - } else { - call.transformTo(newProj); + if (pushedColumns == null || pushedColumns.isEmpty()) + return; + + final DrillScanRel newScan = new DrillScanRel(scan.getCluster(), scan.getTraitSet().plus(DrillRel.DRILL_LOGICAL), + scan.getTable(), newScanRowType, columns); + + List<RexNode> convertedExprs = getConvertedProjExp(proj, scan, columnsIds); + + final DrillProjectRel newProj = new DrillProjectRel(proj.getCluster(), proj.getTraitSet().plus(DrillRel.DRILL_LOGICAL), + newScan, convertedExprs, proj.getRowType()); + + if (RemoveTrivialProjectRule.isTrivial(newProj)) { + call.transformTo(newScan); + } else { + call.transformTo(newProj); + } + + } catch (IOException e) { + e.printStackTrace(); + return; } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e790e796/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java index 5521c4e..4defacd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java @@ -57,7 +57,7 @@ public class DrillRuleSets { // SwapJoinRule.INSTANCE, RemoveDistinctRule.INSTANCE, // UnionToDistinctRule.INSTANCE, -// RemoveTrivialProjectRule.INSTANCE, + RemoveTrivialProjectRule.INSTANCE, // RemoveTrivialCalcRule.INSTANCE, RemoveSortRule.INSTANCE, @@ -66,14 +66,14 @@ public class DrillRuleSets { new MergeProjectRule(true, RelFactories.DEFAULT_PROJECT_FACTORY), RemoveDistinctAggregateRule.INSTANCE, // ReduceAggregatesRule.INSTANCE, // -// PushProjectPastJoinRule.INSTANCE, -// PushProjectPastFilterRule.INSTANCE, + PushProjectPastJoinRule.INSTANCE, + PushProjectPastFilterRule.INSTANCE, // SwapJoinRule.INSTANCE, // // PushJoinThroughJoinRule.RIGHT, // // PushJoinThroughJoinRule.LEFT, // // PushSortPastProjectRule.INSTANCE, // -// DrillPushProjIntoScan.INSTANCE, + DrillPushProjIntoScan.INSTANCE, //////////////////////////////// DrillScanRule.INSTANCE, http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e790e796/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java index 2b4f9d7..46394a9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java @@ -18,8 +18,10 @@ package org.apache.drill.exec.planner.logical; import java.io.IOException; +import java.util.List; import org.apache.drill.common.JSONOptions; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.data.LogicalOperator; import org.apache.drill.common.logical.data.Scan; import org.apache.drill.exec.physical.OperatorCost; @@ -45,21 +47,28 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel { /** Creates a DrillScan. */ public DrillScanRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table) { - this(cluster, traits, table, table.getRowType()); + // By default, scan does not support project pushdown. + // Decision whether push projects into scan will be made solely in DrillPushProjIntoScanRule. + this(cluster, traits, table, table.getRowType(), null); } /** Creates a DrillScan. */ public DrillScanRel(RelOptCluster cluster, RelTraitSet traits, - RelOptTable table, RelDataType rowType) { + RelOptTable table, RelDataType rowType, List<SchemaPath> columns) { super(DRILL_LOGICAL, cluster, traits, table); this.rowType = rowType; + try { - this.groupScan = this.drillTable.getGroupScan().clone( - PrelUtil.getColumns(rowType)); + if (columns == null || columns.isEmpty()) { + this.groupScan = this.drillTable.getGroupScan(); + } else { + this.groupScan = this.drillTable.getGroupScan().clone(columns); + } } catch (IOException e) { this.groupScan = null; e.printStackTrace(); } + } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e790e796/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java index 53804c7..d69f8cf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java @@ -84,21 +84,26 @@ public class PrelUtil { } return new SelectionVectorRemover(child); } - - public static List<SchemaPath> getColumns(RelDataType rowType) { + + public static List<SchemaPath> getColumns(RelDataType rowType) { final List<String> fields = rowType.getFieldNames(); - + if (fields.isEmpty()) return null; - + List<SchemaPath> columns = Lists.newArrayList(); - + for (String field : fields) { + //If star column is required, no project pushdown. Just return null, to indicate SCAN should get ALL the columns. if (field.startsWith("*")) - continue; - + return null; + columns.add(SchemaPath.getSimplePath(field)); - } - return columns; - } + } + + if (columns.isEmpty()) + return null; + else + return columns; + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e790e796/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java index 880c6fa..9e3bac0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java @@ -38,10 +38,7 @@ public class ScanPrule extends RelOptRule{ public void onMatch(RelOptRuleCall call) { final DrillScanRel scan = (DrillScanRel) call.rel(0); - List<SchemaPath> columns = PrelUtil.getColumns(scan.getRowType()); - columns = columns == null || columns.isEmpty() ? null : columns; - - GroupScan groupScan = scan.getGroupScan().clone(columns); + GroupScan groupScan = scan.getGroupScan(); DrillDistributionTrait partition = groupScan.getMaxParallelizationWidth() > 1 ? DrillDistributionTrait.RANDOM_DISTRIBUTED : DrillDistributionTrait.SINGLETON; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e790e796/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java index 296f400..dec9222 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java @@ -55,6 +55,8 @@ public class ExplainHandler extends DefaultSqlHandler{ SqlNode validated = validateNode(sqlNode); RelNode rel = convertToRel(validated); DrillRel drel = convertToDrel(rel); + log("Optiq Logical", rel); + log("Drill Logical", drel); if(mode == ResultMode.LOGICAL){ LogicalExplain logicalResult = new LogicalExplain(drel); @@ -62,8 +64,10 @@ public class ExplainHandler extends DefaultSqlHandler{ } Prel prel = convertToPrel(drel); + log("Drill Physical", prel); PhysicalOperator pop = convertToPop(prel); PhysicalPlan plan = convertToPlan(pop); + log("Drill Plan", plan); PhysicalExplain physicalResult = new PhysicalExplain(prel, plan); return DirectPlan.createDirectPlan(context, physicalResult); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e790e796/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java index 9b35204..f94cff8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java @@ -57,7 +57,7 @@ public class EasyGroupScan extends AbstractGroupScan{ private final EasyFormatPlugin<?> formatPlugin; private final int maxWidth; private List<SchemaPath> columns; - + private ListMultimap<Integer, CompleteFileWork> mappings; private List<CompleteFileWork> chunks; private List<EndpointAffinity> endpointAffinities; @@ -68,7 +68,7 @@ public class EasyGroupScan extends AbstractGroupScan{ @JsonProperty("files") List<String> files, // @JsonProperty("storage") StoragePluginConfig storageConfig, // @JsonProperty("format") FormatPluginConfig formatConfig, // - @JacksonInject StoragePluginRegistry engineRegistry, // + @JacksonInject StoragePluginRegistry engineRegistry, // @JsonProperty("columns") List<SchemaPath> columns, @JsonProperty("selectionRoot") String selectionRoot ) throws IOException, ExecutionSetupException { @@ -88,10 +88,10 @@ public class EasyGroupScan extends AbstractGroupScan{ this.columns = columns; this.selectionRoot = selectionRoot; } - + public EasyGroupScan( FileSelection selection, // - EasyFormatPlugin<?> formatPlugin, // + EasyFormatPlugin<?> formatPlugin, // List<SchemaPath> columns, String selectionRoot ) throws IOException{ @@ -120,7 +120,7 @@ public class EasyGroupScan extends AbstractGroupScan{ this.selection = that.selection; this.selectionRoot = that.selectionRoot; } - + public String getSelectionRoot() { return selectionRoot; } @@ -144,23 +144,25 @@ public class EasyGroupScan extends AbstractGroupScan{ public List<String> getFiles() { return selection.getAsFiles(); } - + + @JsonProperty("columns") + public List<SchemaPath> getColumns(){ + return columns; + } + + @JsonIgnore public FileSelection getFileSelection(){ return selection; } - + @Override public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { assert children == null || children.isEmpty(); return this; } - @JsonProperty("columns") - public List<SchemaPath> getColumns(){ - return columns; - } - + @Override public List<EndpointAffinity> getOperatorAffinity() { assert chunks != null && chunks.size() > 0; @@ -189,7 +191,7 @@ public class EasyGroupScan extends AbstractGroupScan{ return new EasySubScan(convert(filesForMinor), formatPlugin, columns, selectionRoot); } - + private List<FileWorkImpl> convert(List<CompleteFileWork> list){ List<FileWorkImpl> newList = Lists.newArrayList(); for(CompleteFileWork f : list){ @@ -197,7 +199,7 @@ public class EasyGroupScan extends AbstractGroupScan{ } return newList; } - + @JsonProperty("storage") public StoragePluginConfig getStorageConfig(){ return formatPlugin.getStorageConfig(); @@ -210,7 +212,7 @@ public class EasyGroupScan extends AbstractGroupScan{ @Override public String toString() { - return "EasyGroupScan [selectionRoot=" + selectionRoot + "]"; + return "EasyGroupScan [selectionRoot=" + selectionRoot + ", columns = " + columns + "]"; } @Override @@ -224,5 +226,10 @@ public class EasyGroupScan extends AbstractGroupScan{ newScan.columns = columns; return newScan; } - + + @Override + public List<SchemaPath> checkProjPush(List<SchemaPath> columns) { + return columns; + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e790e796/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java index 9169be4..256b6b6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.PhysicalOperatorSetupException; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.EndpointAffinity; import org.apache.drill.exec.physical.OperatorCost; import org.apache.drill.exec.physical.base.AbstractGroupScan; @@ -82,5 +83,4 @@ public class DirectGroupScan extends AbstractGroupScan{ public String getDigest() { return String.valueOf(reader); } - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e790e796/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java index b0d8ca5..5202038 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java @@ -43,25 +43,25 @@ public class InfoSchemaGroupScan extends AbstractGroupScan{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InfoSchemaGroupScan.class); private final SelectedTable table; - + private List<SchemaPath> columns; - + @JsonCreator public InfoSchemaGroupScan(@JsonProperty("table") SelectedTable table, @JsonProperty("columns") List<SchemaPath> columns) { this.table = table; this.columns = columns; } - + private InfoSchemaGroupScan(InfoSchemaGroupScan that) { this.table = that.table; this.columns = that.columns; } - + public List<SchemaPath> getColumns() { return columns; - } - + } + @Override public void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException { Preconditions.checkArgument(endpoints.size() == 1); @@ -108,5 +108,6 @@ public class InfoSchemaGroupScan extends AbstractGroupScan{ InfoSchemaGroupScan newScan = new InfoSchemaGroupScan (this); newScan.columns = columns; return newScan; - } + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e790e796/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index a8fff8a..4d4ec9b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -77,7 +77,7 @@ public class ParquetGroupScan extends AbstractGroupScan { static final String ENDPOINT_BYTES_TIMER = MetricRegistry.name(ParquetGroupScan.class, "endpointBytes"); static final String ASSIGNMENT_TIMER = MetricRegistry.name(ParquetGroupScan.class, "applyAssignments"); static final String ASSIGNMENT_AFFINITY_HIST = MetricRegistry.name(ParquetGroupScan.class, "assignmentAffinity"); - + final Histogram assignmentAffinityStats = metrics.histogram(ASSIGNMENT_AFFINITY_HIST); private ListMultimap<Integer, RowGroupInfo> mappings; @@ -111,7 +111,7 @@ public class ParquetGroupScan extends AbstractGroupScan { @JsonProperty("entries") List<ReadEntryWithPath> entries, // @JsonProperty("storage") StoragePluginConfig storageConfig, // @JsonProperty("format") FormatPluginConfig formatConfig, // - @JacksonInject StoragePluginRegistry engineRegistry, // + @JacksonInject StoragePluginRegistry engineRegistry, // @JsonProperty("columns") List<SchemaPath> columns, // @JsonProperty("selectionRoot") String selectionRoot // ) throws IOException, ExecutionSetupException { @@ -142,19 +142,19 @@ public class ParquetGroupScan extends AbstractGroupScan { this.columns = columns; this.formatConfig = formatPlugin.getConfig(); this.fs = formatPlugin.getFileSystem().getUnderlying(); - + this.entries = Lists.newArrayList(); for(FileStatus file : files){ entries.add(new ReadEntryWithPath(file.getPath().toString())); } - + this.selectionRoot = selectionRoot; readFooter(files); } - + /* - * This is used to clone another copy of the group scan. + * This is used to clone another copy of the group scan. */ private ParquetGroupScan(ParquetGroupScan that){ this.columns = that.columns; @@ -175,13 +175,13 @@ public class ParquetGroupScan extends AbstractGroupScan { } readFooter(files); } - + private void readFooter(List<FileStatus> statuses) throws IOException { watch.reset(); watch.start(); Timer.Context tContext = metrics.timer(READ_FOOTER_TIMER).time(); - - + + rowGroupInfos = Lists.newArrayList(); long start = 0, length = 0; ColumnChunkMetaData columnChunkMetaData; @@ -267,12 +267,12 @@ public class ParquetGroupScan extends AbstractGroupScan { /** * Calculates the affinity each endpoint has for this scan, by adding up the affinity each endpoint has for each * rowGroup - * + * * @return a list of EndpointAffinity objects */ @Override public List<EndpointAffinity> getOperatorAffinity() { - + if (this.endpointAffinities == null) { BlockMapBuilder bmb = new BlockMapBuilder(fs, formatPlugin.getContext().getBits()); try{ @@ -311,8 +311,8 @@ public class ParquetGroupScan extends AbstractGroupScan { return new ParquetRowGroupScan(formatPlugin, convertToReadEntries(rowGroupsForMinor), columns, selectionRoot); } - - + + private List<RowGroupReadEntry> convertToReadEntries(List<RowGroupInfo> rowGroups){ List<RowGroupReadEntry> entries = Lists.newArrayList(); for (RowGroupInfo rgi : rowGroups) { @@ -371,4 +371,9 @@ public class ParquetGroupScan extends AbstractGroupScan { newScan.columns = columns; return newScan; } + + @Override + public List<SchemaPath> checkProjPush(List<SchemaPath> columns) { + return columns; + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e790e796/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java index a8c059b..13bb1ac 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java @@ -79,7 +79,7 @@ public class TestProjectPushDown extends PlanTestBase { } @Test - @Ignore + @Ignore // InfoSchema do not support project pushdown currently. public void testFromInfoSchema() throws Exception { String expectedColNames = " \"columns\" : [ \"`CATALOG_DESCRIPTION`\" ]"; testPhysicalPlan(
