Repository: incubator-drill Updated Branches: refs/heads/master 1e0e4dfc9 -> 2ae4a5f0f
DRILL-494: Support for Storage Plugin Optimizer Rules Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/bcc3c731 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/bcc3c731 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/bcc3c731 Branch: refs/heads/master Commit: bcc3c731caa2094ea046fdab2c9707e9084c39a9 Parents: 1e0e4df Author: Aditya Kishore <[email protected]> Authored: Mon Apr 7 23:37:42 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Sat May 3 17:51:45 2014 -0700 ---------------------------------------------------------------------- .../drill/exec/store/hbase/HBaseGroupScan.java | 5 +++ .../drill/exec/physical/base/GroupScan.java | 7 ++++ .../exec/planner/logical/DrillRuleSets.java | 21 +++++++--- .../drill/exec/planner/physical/ScanPrel.java | 43 ++++++++++++++++---- .../drill/exec/planner/physical/ScanPrule.java | 19 +++------ .../drill/exec/planner/sql/DrillSqlWorker.java | 33 +++++++++++++-- .../drill/exec/store/AbstractStoragePlugin.java | 9 ++-- .../drill/exec/store/QueryOptimizerRule.java | 21 ---------- .../apache/drill/exec/store/StoragePlugin.java | 4 +- .../exec/store/StoragePluginOptimizerRule.java | 29 +++++++++++++ .../drill/exec/store/StoragePluginRegistry.java | 30 ++++++++++++-- .../drill/exec/store/dfs/FormatPlugin.java | 7 ++-- .../drill/exec/store/dfs/ReadEntryWithPath.java | 5 +++ .../exec/store/dfs/easy/EasyFormatPlugin.java | 15 ++++--- .../exec/store/dfs/easy/EasyGroupScan.java | 12 +++++- .../exec/store/direct/DirectGroupScan.java | 5 +++ .../apache/drill/exec/store/hive/HiveScan.java | 14 +++++++ .../exec/store/ischema/InfoSchemaGroupScan.java | 6 +++ .../drill/exec/store/mock/MockGroupScanPOP.java | 22 ++++++++++ .../exec/store/parquet/ParquetFormatPlugin.java | 21 ++++++---- .../exec/store/parquet/ParquetGroupScan.java | 12 ++++++ .../java/org/apache/drill/PlanningBase.java | 4 +- 22 files changed, 260 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bcc3c731/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java index b8b6af4..f309b3b 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java @@ -197,4 +197,9 @@ public class HBaseGroupScan extends AbstractGroupScan { return this; } + @Override + public String getDigest() { + return toString(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bcc3c731/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java index 314e889..3504be7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java @@ -37,4 +37,11 @@ public interface GroupScan extends Scan, HasAffinity{ @JsonIgnore public int getMaxParallelizationWidth(); + + /** + * Returns a signature of the {@link GroupScan} which should usually be composed of + * all its attributes which could describe it uniquely. + */ + @JsonIgnore + public abstract String getDigest(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bcc3c731/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java index 1492a28..199d36a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java @@ -36,20 +36,15 @@ import org.eigenbase.rel.rules.MergeProjectRule; import org.eigenbase.rel.rules.PushFilterPastJoinRule; import org.eigenbase.rel.rules.PushFilterPastProjectRule; import org.eigenbase.rel.rules.PushJoinThroughJoinRule; -import org.eigenbase.rel.rules.PushSortPastProjectRule; import org.eigenbase.rel.rules.ReduceAggregatesRule; import org.eigenbase.rel.rules.RemoveDistinctAggregateRule; import org.eigenbase.rel.rules.RemoveDistinctRule; import org.eigenbase.rel.rules.RemoveSortRule; -import org.eigenbase.rel.rules.RemoveTrivialCalcRule; -import org.eigenbase.rel.rules.RemoveTrivialProjectRule; -import org.eigenbase.rel.rules.SwapJoinRule; -import org.eigenbase.rel.rules.TableAccessRule; -import org.eigenbase.rel.rules.UnionToDistinctRule; import org.eigenbase.relopt.RelOptRule; import org.eigenbase.relopt.volcano.AbstractConverter.ExpandConversionRule; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableSet.Builder; public class DrillRuleSets { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRuleSets.class); @@ -144,6 +139,20 @@ public class DrillRuleSets { )); + public static RuleSet create(ImmutableSet<RelOptRule> rules) { + return new DrillRuleSet(rules); + } + + public static RuleSet mergedRuleSets(RuleSet...ruleSets) { + Builder<RelOptRule> relOptRuleSetBuilder = ImmutableSet.builder(); + for (RuleSet ruleSet : ruleSets) { + for (RelOptRule relOptRule : ruleSet) { + relOptRuleSetBuilder.add(relOptRule); + } + } + return new DrillRuleSet(relOptRuleSetBuilder.build()); + } + private static class DrillRuleSet implements RuleSet{ final ImmutableSet<RelOptRule> rules; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bcc3c731/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java index a945129..b22e862 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java @@ -20,44 +20,69 @@ package org.apache.drill.exec.planner.physical; import java.io.IOException; import java.util.List; -import org.apache.drill.common.JSONOptions; +import org.apache.drill.exec.physical.OperatorCost; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.Size; import org.apache.drill.exec.planner.common.DrillScanRelBase; -import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; -import org.apache.drill.exec.store.StoragePlugin; import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.RelWriter; import org.eigenbase.relopt.RelOptCluster; +import org.eigenbase.relopt.RelOptCost; +import org.eigenbase.relopt.RelOptPlanner; import org.eigenbase.relopt.RelOptTable; import org.eigenbase.relopt.RelTraitSet; public class ScanPrel extends DrillScanRelBase implements Prel{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanPrel.class); - public ScanPrel(RelOptCluster cluster, RelTraitSet traits, RelOptTable tbl) { + protected final GroupScan scan; + + private ScanPrel(RelOptCluster cluster, RelTraitSet traits, RelOptTable tbl, GroupScan scan) { super(DRILL_PHYSICAL, cluster, traits, tbl); + this.scan = scan; } - @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { - return super.copy(traitSet, inputs); + return new ScanPrel(this.getCluster(), traitSet, this.getTable(), this.scan); } @Override protected Object clone() throws CloneNotSupportedException { - return super.clone(); + return new ScanPrel(this.getCluster(), this.getTraitSet(), this.getTable(), this.scan); } @Override public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { - StoragePlugin plugin = this.drillTable.getPlugin(); - GroupScan scan = plugin.getPhysicalScan(new JSONOptions(drillTable.getSelection())); + return scan; + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + OperatorCost scanCost = this.scan.getCost(); + Size scanSize = this.scan.getSize(); + // FIXME: Use the new cost model + return this.getCluster().getPlanner().getCostFactory() + .makeCost(scanSize.getRecordCount(), + scanCost.getCpu(), + scanCost.getNetwork() * scanCost.getDisk()); + } + public GroupScan getGroupScan() { return scan; } + public static ScanPrel create(DrillScanRelBase old, RelTraitSet traitSets, GroupScan scan) { + return new ScanPrel(old.getCluster(), traitSets, old.getTable(), scan); + } + + + @Override + public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw).item("groupscan", scan.getDigest()); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bcc3c731/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java index 8985a58..99ec1f5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java @@ -19,9 +19,10 @@ package org.apache.drill.exec.planner.physical; import java.io.IOException; +import org.apache.drill.common.JSONOptions; +import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.planner.common.DrillScanRelBase; import org.apache.drill.exec.planner.logical.DrillTable; -import org.apache.drill.exec.planner.logical.DynamicDrillTable; import org.apache.drill.exec.planner.logical.RelOptHelper; import org.eigenbase.relopt.RelOptRule; import org.eigenbase.relopt.RelOptRuleCall; @@ -39,22 +40,14 @@ public class ScanPrule extends RelOptRule{ public void onMatch(RelOptRuleCall call) { try{ final DrillScanRelBase scan = (DrillScanRelBase) call.rel(0); - DrillTable table = scan.getTable().unwrap(DrillTable.class); - - DrillDistributionTrait partition = table.getGroupScan().getMaxParallelizationWidth() > 1 ? DrillDistributionTrait.RANDOM_DISTRIBUTED : DrillDistributionTrait.SINGLETON; + final DrillTable table = scan.getTable().unwrap(DrillTable.class); -// DrillDistributionTrait partition = DrillDistributionTrait.SINGLETON; -// -// if (table instanceof DynamicDrillTable ) { -// if (table.getGroupScan().getMaxParallelizationWidth() > 1 ) -// partition = DrillDistributionTrait.RANDOM_DISTRIBUTED; -// } - + final GroupScan groupScan = table.getPlugin().getPhysicalScan(new JSONOptions(table.getSelection())); + final DrillDistributionTrait partition = groupScan.getMaxParallelizationWidth() > 1 ? DrillDistributionTrait.RANDOM_DISTRIBUTED : DrillDistributionTrait.SINGLETON; final RelTraitSet traits = scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(partition); - DrillScanRelBase newScan = new ScanPrel(scan.getCluster(), traits, scan.getTable()); + final DrillScanRelBase newScan = ScanPrel.create(scan, traits, groupScan); call.transformTo(newScan); - }catch(IOException e){ throw new RuntimeException("Failure getting group scan.", e); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bcc3c731/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java index 8892a8f..4336a1f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java @@ -33,9 +33,20 @@ import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.planner.logical.DrillRuleSets; import org.apache.drill.exec.planner.physical.DrillDistributionTraitDef; -import org.apache.drill.exec.planner.sql.handlers.*; -import org.apache.drill.exec.planner.sql.parser.*; +import org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler; +import org.apache.drill.exec.planner.sql.handlers.DescribeTableHandler; +import org.apache.drill.exec.planner.sql.handlers.ExplainHandler; +import org.apache.drill.exec.planner.sql.handlers.SetOptionHandler; +import org.apache.drill.exec.planner.sql.handlers.ShowSchemasHandler; +import org.apache.drill.exec.planner.sql.handlers.ShowTablesHandler; +import org.apache.drill.exec.planner.sql.handlers.SqlHandler; +import org.apache.drill.exec.planner.sql.handlers.UseSchemaHandler; +import org.apache.drill.exec.planner.sql.parser.SqlDescribeTable; +import org.apache.drill.exec.planner.sql.parser.SqlShowSchemas; +import org.apache.drill.exec.planner.sql.parser.SqlShowTables; +import org.apache.drill.exec.planner.sql.parser.SqlUseSchema; import org.apache.drill.exec.planner.sql.parser.impl.DrillParserImpl; +import org.apache.drill.exec.store.StoragePluginRegistry; import org.eigenbase.rel.RelCollationTraitDef; import org.eigenbase.relopt.ConventionTraitDef; import org.eigenbase.relopt.RelTraitDef; @@ -47,11 +58,12 @@ public class DrillSqlWorker { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSqlWorker.class); private final Planner planner; - private final static RuleSet[] RULES = new RuleSet[]{DrillRuleSets.DRILL_BASIC_RULES, DrillRuleSets.DRILL_PHYSICAL_MEM}; public final static int LOGICAL_RULES = 0; public final static int PHYSICAL_MEM_RULES = 1; private final QueryContext context; + private static volatile RuleSet[] allRules; + public DrillSqlWorker(QueryContext context) throws Exception { final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>(); @@ -68,12 +80,25 @@ public class DrillSqlWorker { .traitDefs(traitDefs) // .convertletTable(StandardConvertletTable.INSTANCE) // .context(context.getPlannerSettings()) // - .ruleSets(RULES) // + .ruleSets(getRules(context.getStorage())) // .build(); this.planner = Frameworks.getPlanner(config); } + private static RuleSet[] getRules(StoragePluginRegistry storagePluginRegistry) { + if (allRules == null) { + synchronized (DrillSqlWorker.class) { + if (allRules == null) { + RuleSet dirllPhysicalMem = DrillRuleSets.mergedRuleSets( + DrillRuleSets.DRILL_PHYSICAL_MEM, storagePluginRegistry.getStoragePluginRuleSet()); + allRules = new RuleSet[] {DrillRuleSets.DRILL_BASIC_RULES, dirllPhysicalMem}; + } + } + } + return allRules; + } + public PhysicalPlan getPlan(String sql) throws SqlParseException, ValidationException, RelConversionException, IOException{ SqlNode sqlNode = planner.parse(sql); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bcc3c731/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java index 3081b46..2a9ae39 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java @@ -18,12 +18,13 @@ package org.apache.drill.exec.store; import java.io.IOException; -import java.util.Collections; -import java.util.List; +import java.util.Set; import org.apache.drill.common.JSONOptions; import org.apache.drill.exec.physical.base.AbstractGroupScan; +import com.google.common.collect.ImmutableSet; + public abstract class AbstractStoragePlugin implements StoragePlugin{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractStoragePlugin.class); @@ -41,8 +42,8 @@ public abstract class AbstractStoragePlugin implements StoragePlugin{ } @Override - public List<QueryOptimizerRule> getOptimizerRules() { - return Collections.emptyList(); + public Set<StoragePluginOptimizerRule> getOptimizerRules() { + return ImmutableSet.of(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bcc3c731/exec/java-exec/src/main/java/org/apache/drill/exec/store/QueryOptimizerRule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/QueryOptimizerRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/QueryOptimizerRule.java deleted file mode 100644 index ec6465e..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/QueryOptimizerRule.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store; - -public interface QueryOptimizerRule { -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bcc3c731/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java index 67f6df3..9f528bb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java @@ -18,7 +18,7 @@ package org.apache.drill.exec.store; import java.io.IOException; -import java.util.List; +import java.util.Set; import net.hydromatic.optiq.SchemaPlus; @@ -32,7 +32,7 @@ public interface StoragePlugin extends SchemaFactory{ public boolean supportsWrite(); - public List<QueryOptimizerRule> getOptimizerRules(); + public Set<StoragePluginOptimizerRule> getOptimizerRules(); /** * Get the physical scan operator for the particular GroupScan (read) node. http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bcc3c731/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginOptimizerRule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginOptimizerRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginOptimizerRule.java new file mode 100644 index 0000000..32ec6ff --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginOptimizerRule.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store; + +import org.eigenbase.relopt.RelOptRule; +import org.eigenbase.relopt.RelOptRuleOperand; + +public abstract class StoragePluginOptimizerRule extends RelOptRule { + + public StoragePluginOptimizerRule(RelOptRuleOperand operand, String description) { + super(operand, description); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bcc3c731/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java index 7a88098..4d88686 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java @@ -26,12 +26,13 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import com.google.common.base.Preconditions; import net.hydromatic.linq4j.expressions.DefaultExpression; import net.hydromatic.linq4j.expressions.Expression; import net.hydromatic.optiq.SchemaPlus; -import net.hydromatic.optiq.tools.Frameworks; +import net.hydromatic.optiq.tools.RuleSet; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.ExecutionSetupException; @@ -39,6 +40,7 @@ import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.common.util.PathScanner; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.planner.logical.DrillRuleSets; import org.apache.drill.exec.cache.DistributedMap; import org.apache.drill.exec.cache.JacksonDrillSerializable.StoragePluginsSerializable; import org.apache.drill.exec.exception.DrillbitStartupException; @@ -49,9 +51,12 @@ import org.apache.drill.exec.store.dfs.FileSystemPlugin; import org.apache.drill.exec.store.dfs.FormatPlugin; import org.apache.drill.exec.store.ischema.InfoSchemaConfig; import org.apache.drill.exec.store.ischema.InfoSchemaStoragePlugin; +import org.eigenbase.relopt.RelOptRule; import com.google.common.base.Charsets; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableSet.Builder; import com.google.common.io.Resources; @@ -64,6 +69,8 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage private DrillbitContext context; private final DrillSchemaFactory schemaFactory = new DrillSchemaFactory(); + private RuleSet storagePluginsRuleSet; + private static final Expression EXPRESSION = new DefaultExpression(Object.class); public StoragePluginRegistry(DrillbitContext context) { @@ -84,7 +91,10 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage int i =0; for(Constructor<?> c : plugin.getConstructors()){ Class<?>[] params = c.getParameterTypes(); - if(params.length != 3 || params[1] != DrillbitContext.class || !StoragePluginConfig.class.isAssignableFrom(params[0]) || params[2] != String.class){ + if(params.length != 3 + || params[1] != DrillbitContext.class + || !StoragePluginConfig.class.isAssignableFrom(params[0]) + || params[2] != String.class){ logger.info("Skipping StoragePlugin constructor {} for plugin class {} since it doesn't implement a [constructor(StoragePluginConfig, DrillbitContext, String)]", c, plugin); continue; } @@ -96,10 +106,20 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage } } + // create registered plugins defined in "storage-plugins.json" this.plugins = ImmutableMap.copyOf(createPlugins()); + // query registered engines for optimizer rules and build the storage plugin RuleSet + Builder<RelOptRule> setBuilder = ImmutableSet.builder(); + for (StoragePlugin plugin : this.plugins.values()) { + Set<StoragePluginOptimizerRule> rules = plugin.getOptimizerRules(); + if (rules != null && rules.size() > 0) { + setBuilder.addAll(rules); + } + } + this.storagePluginsRuleSet = DrillRuleSets.create(setBuilder.build()); } - + private Map<String, StoragePlugin> createPlugins() throws DrillbitStartupException { /* * Check if "storage-plugins.json" exists. Also check if "storage-plugins" object exists in Distributed Cache. @@ -192,6 +212,10 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage return plugins.entrySet().iterator(); } + public RuleSet getStoragePluginRuleSet() { + return storagePluginsRuleSet; + } + public DrillSchemaFactory getSchemaFactory(){ return schemaFactory; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bcc3c731/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java index 73e414c..2999d9d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java @@ -18,14 +18,13 @@ package org.apache.drill.exec.store.dfs; import java.io.IOException; -import java.util.List; +import java.util.Set; -import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.server.DrillbitContext; -import org.apache.drill.exec.store.QueryOptimizerRule; +import org.apache.drill.exec.store.StoragePluginOptimizerRule; import org.apache.drill.exec.store.dfs.shim.DrillFileSystem; /** @@ -41,7 +40,7 @@ public interface FormatPlugin { public AbstractGroupScan getGroupScan(FileSelection selection) throws IOException; - public List<QueryOptimizerRule> getOptimizerRules(); + public Set<StoragePluginOptimizerRule> getOptimizerRules(); public FormatPluginConfig getConfig(); public StoragePluginConfig getStorageConfig(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bcc3c731/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java index bf1d762..e421eec 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java @@ -34,4 +34,9 @@ public class ReadEntryWithPath { return path; } + @Override + public String toString() { + return "ReadEntryWithPath [path=" + path + "]"; + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bcc3c731/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java index 6e87da5..c5a5294 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java @@ -18,14 +18,13 @@ package org.apache.drill.exec.store.dfs.easy; import java.io.IOException; -import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.commons.lang3.ArrayUtils; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; @@ -35,18 +34,19 @@ import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.physical.impl.ScanBatch; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.server.DrillbitContext; -import org.apache.drill.exec.store.QueryOptimizerRule; import org.apache.drill.exec.store.RecordReader; +import org.apache.drill.exec.store.StoragePluginOptimizerRule; import org.apache.drill.exec.store.dfs.BasicFormatMatcher; import org.apache.drill.exec.store.dfs.FileSelection; import org.apache.drill.exec.store.dfs.FormatMatcher; import org.apache.drill.exec.store.dfs.FormatPlugin; import org.apache.drill.exec.store.dfs.shim.DrillFileSystem; - -import com.beust.jcommander.internal.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.CompressionCodecFactory; +import com.beust.jcommander.internal.Lists; +import com.google.common.collect.ImmutableSet; + public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements FormatPlugin { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyFormatPlugin.class); @@ -187,9 +187,8 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements } @Override - public List<QueryOptimizerRule> getOptimizerRules() { - return Collections.emptyList(); + public Set<StoragePluginOptimizerRule> getOptimizerRules() { + return ImmutableSet.of(); } - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bcc3c731/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java index 68fee34..a4bd1c6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java @@ -22,7 +22,6 @@ import java.util.Collections; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; @@ -196,4 +195,15 @@ public class EasyGroupScan extends AbstractGroupScan{ public FormatPluginConfig getFormatConfig(){ return formatPlugin.getConfig(); } + + @Override + public String toString() { + return "EasyGroupScan [selectionRoot=" + selectionRoot + "]"; + } + + @Override + public String getDigest() { + return toString(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bcc3c731/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java index cfbeb83..9169be4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java @@ -78,4 +78,9 @@ public class DirectGroupScan extends AbstractGroupScan{ return Collections.emptyList(); } + @Override + public String getDigest() { + return String.valueOf(reader); + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bcc3c731/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java index bf33805..7c46d15 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.io.ByteArrayDataOutput; import com.google.common.io.ByteStreams; + import org.apache.commons.codec.binary.Base64; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; @@ -244,4 +245,17 @@ public class HiveScan extends AbstractGroupScan { public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { return new HiveScan(hiveReadEntry, storagePlugin, columns); } + + @Override + public String getDigest() { + return toString(); + } + + @Override + public String toString() { + return "HiveScan [table=" + table + + ", inputSplits=" + inputSplits + + ", columns=" + columns + "]"; + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bcc3c731/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java index b8b66dc..a9261be 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java @@ -81,4 +81,10 @@ public class InfoSchemaGroupScan extends AbstractGroupScan{ public List<EndpointAffinity> getOperatorAffinity() { return Collections.emptyList(); } + + @Override + public String getDigest() { + return this.table.toString(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bcc3c731/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java index b71784b..8ae5116 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.mock; +import java.util.Arrays; import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -109,6 +110,11 @@ public class MockGroupScanPOP extends AbstractGroupScan { public Size getSize() { return new Size(records, recordSize); } + + @Override + public String toString() { + return "MockScanEntry [records=" + records + ", columns=" + Arrays.toString(types) + "]"; + } } @JsonInclude(Include.NON_NULL) @@ -161,6 +167,11 @@ public class MockGroupScanPOP extends AbstractGroupScan { if(scale != null) b.setScale(scale); return b.build(); } + + @Override + public String toString() { + return "MockColumn [minorType=" + minorType + ", name=" + name + ", mode=" + mode + "]"; + } } @@ -218,4 +229,15 @@ public class MockGroupScanPOP extends AbstractGroupScan { } + @Override + public String getDigest() { + return toString(); + } + + @Override + public String toString() { + return "MockGroupScanPOP [url=" + url + + ", readEntries=" + readEntries + "]"; + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bcc3c731/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java index d9e6795..4fd0587 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java @@ -18,26 +18,31 @@ package org.apache.drill.exec.store.parquet; import java.io.IOException; -import java.util.Collections; -import java.util.List; +import java.util.Set; import java.util.regex.Pattern; -import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.server.DrillbitContext; -import org.apache.drill.exec.store.QueryOptimizerRule; -import org.apache.drill.exec.store.dfs.*; +import org.apache.drill.exec.store.StoragePluginOptimizerRule; +import org.apache.drill.exec.store.dfs.BasicFormatMatcher; +import org.apache.drill.exec.store.dfs.DrillPathFilter; +import org.apache.drill.exec.store.dfs.FileSelection; +import org.apache.drill.exec.store.dfs.FormatMatcher; +import org.apache.drill.exec.store.dfs.FormatPlugin; +import org.apache.drill.exec.store.dfs.FormatSelection; +import org.apache.drill.exec.store.dfs.MagicString; import org.apache.drill.exec.store.dfs.shim.DrillFileSystem; import org.apache.drill.exec.store.mock.MockStorageEngine; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; - import org.apache.hadoop.fs.PathFilter; + import parquet.format.converter.ParquetMetadataConverter; import parquet.hadoop.CodecFactoryExposer; import parquet.hadoop.ParquetFileWriter; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; public class ParquetFormatPlugin implements FormatPlugin{ @@ -89,8 +94,8 @@ public class ParquetFormatPlugin implements FormatPlugin{ } @Override - public List<QueryOptimizerRule> getOptimizerRules() { - return Collections.emptyList(); + public Set<StoragePluginOptimizerRule> getOptimizerRules() { + return ImmutableSet.of(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bcc3c731/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index cd7575d..6c3bd27 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -336,4 +336,16 @@ public class ParquetGroupScan extends AbstractGroupScan { return this; } + @Override + public String getDigest() { + return toString(); + } + + @Override + public String toString() { + return "ParquetGroupScan [entries=" + entries + + ", selectionRoot=" + selectionRoot + + ", columns=" + columns + "]"; + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bcc3c731/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java index a3a5647..948c763 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java @@ -81,7 +81,7 @@ public class PlanningBase extends ExecTest{ } }; - StoragePluginRegistry registry = new StoragePluginRegistry(dbContext); + final StoragePluginRegistry registry = new StoragePluginRegistry(dbContext); registry.init(); final FunctionImplementationRegistry functionRegistry = new FunctionImplementationRegistry(config); final SchemaPlus root = Frameworks.createRootSchema(); @@ -92,6 +92,8 @@ public class PlanningBase extends ExecTest{ { context.getNewDefaultSchema(); result = root; + context.getStorage(); + result = registry; context.getFunctionRegistry(); result = functionRegistry; context.getSession();
