Repository: tajo Updated Branches: refs/heads/master 97e61e6f4 -> 9840d3785
http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/AccessPathRewriter.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/AccessPathRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/AccessPathRewriter.java new file mode 100644 index 0000000..afabe7a --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/AccessPathRewriter.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.rewrite.rules; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.OverridableConf; +import org.apache.tajo.SessionVars; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.logical.IndexScanNode; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.RelationNode; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; +import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext; +import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor; + +import java.util.List; +import java.util.Stack; + +public class AccessPathRewriter implements LogicalPlanRewriteRule { + private static final Log LOG = LogFactory.getLog(AccessPathRewriter.class); + + private static final String NAME = "Access Path Rewriter"; + private Rewriter rewriter = new Rewriter(); + + @Override + public String getName() { + return NAME; + } + + @Override + public boolean isEligible(LogicalPlanRewriteRuleContext context) { + if (context.getQueryContext().getBool(SessionVars.INDEX_ENABLED)) { + for (LogicalPlan.QueryBlock block : context.getPlan().getQueryBlocks()) { + for (RelationNode relationNode : block.getRelations()) { + List<AccessPathInfo> accessPathInfos = block.getAccessInfos(relationNode); + // If there are any alternative access paths + if (accessPathInfos.size() > 1) { + for (AccessPathInfo accessPathInfo : accessPathInfos) { + if (accessPathInfo.getScanType() == AccessPathInfo.ScanTypeControl.INDEX_SCAN) { + return true; + } + } + } + } + } + } + return false; + } + + @Override + public LogicalPlan rewrite(LogicalPlanRewriteRuleContext context) throws TajoException { + LogicalPlan plan = context.getPlan(); + LogicalPlan.QueryBlock rootBlock = plan.getRootBlock(); + rewriter.init(context.getQueryContext()); + rewriter.visit(rootBlock, plan, rootBlock, rootBlock.getRoot(), new Stack<LogicalNode>()); + return plan; + } + + private final class Rewriter extends BasicLogicalPlanVisitor<Object, Object> { + + private OverridableConf conf; + + public void init(OverridableConf conf) { + this.conf = conf; + } + + @Override + public Object visitScan(Object object, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode scanNode, + Stack<LogicalNode> stack) throws TajoException { + List<AccessPathInfo> accessPaths = block.getAccessInfos(scanNode); + AccessPathInfo optimalPath = null; + // initialize + for (AccessPathInfo accessPath : accessPaths) { + if (accessPath.getScanType() == AccessPathInfo.ScanTypeControl.SEQ_SCAN) { + optimalPath = accessPath; + break; + } + } + // find the optimal path + for (AccessPathInfo accessPath : accessPaths) { + if (accessPath.getScanType() == AccessPathInfo.ScanTypeControl.INDEX_SCAN) { + // estimation selectivity and choose the better path + // TODO: improve the selectivity estimation + double estimateSelectivity = 0.001; + double selectivityThreshold = conf.getFloat(SessionVars.INDEX_SELECTIVITY_THRESHOLD); + LOG.info("Selectivity threshold: " + selectivityThreshold); + LOG.info("Estimated selectivity: " + estimateSelectivity); + if (estimateSelectivity < selectivityThreshold) { + // if the estimated selectivity is greater than threshold, use the index scan + optimalPath = accessPath; + } + } + } + + if (optimalPath != null && optimalPath.getScanType() == AccessPathInfo.ScanTypeControl.INDEX_SCAN) { + IndexScanInfo indexScanInfo = (IndexScanInfo) optimalPath; + plan.addHistory("AccessPathRewriter chooses the index scan for " + scanNode.getTableName()); + IndexScanNode indexScanNode = new IndexScanNode(plan.newPID(), scanNode, indexScanInfo.getKeySchema(), + indexScanInfo.getPredicates(), indexScanInfo.getIndexPath()); + if (stack.empty() || block.getRoot().equals(scanNode)) { + block.setRoot(indexScanNode); + } else { + PlannerUtil.replaceNode(plan, stack.peek(), scanNode, indexScanNode); + } + } + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java index 98e674e..4fb8aac 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java @@ -21,10 +21,13 @@ package org.apache.tajo.plan.rewrite.rules; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.OverridableConf; import org.apache.tajo.algebra.JoinType; +import org.apache.tajo.catalog.*; +import org.apache.tajo.datum.Datum; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; @@ -35,7 +38,10 @@ import org.apache.tajo.plan.*; import org.apache.tajo.plan.LogicalPlan.QueryBlock; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext; import org.apache.tajo.plan.rewrite.rules.FilterPushDownRule.FilterPushDownContext; +import org.apache.tajo.plan.rewrite.rules.IndexScanInfo.SimplePredicate; +import org.apache.tajo.plan.util.IndexUtil; import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor; @@ -52,6 +58,8 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo private final static Log LOG = LogFactory.getLog(FilterPushDownRule.class); private static final String NAME = "FilterPushDown"; + private CatalogService catalog; + static class FilterPushDownContext { Set<EvalNode> pushingDownFilters = TUtil.newHashSet(); @@ -85,8 +93,8 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo } @Override - public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) { - for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) { + public boolean isEligible(LogicalPlanRewriteRuleContext context) { + for (LogicalPlan.QueryBlock block : context.getPlan().getQueryBlocks()) { if (block.hasNode(NodeType.SELECTION) || block.hasNode(NodeType.JOIN)) { return true; } @@ -95,7 +103,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo } @Override - public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws TajoException { + public LogicalPlan rewrite(LogicalPlanRewriteRuleContext rewriteRuleContext) throws TajoException { /* FilterPushDown rule: processing when visits each node - If a target which is corresponding on a filter EvalNode's column is not FieldEval, do not PushDown. @@ -107,6 +115,8 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo . It not, create new HavingNode and set parent's child. */ FilterPushDownContext context = new FilterPushDownContext(); + LogicalPlan plan = rewriteRuleContext.getPlan(); + catalog = rewriteRuleContext.getCatalog(); for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) { context.clear(); this.visit(context, plan, block, block.getRoot(), new Stack<LogicalNode>()); @@ -877,7 +887,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo @Override public LogicalNode visitScan(FilterPushDownContext context, LogicalPlan plan, - LogicalPlan.QueryBlock block, ScanNode scanNode, + LogicalPlan.QueryBlock block, final ScanNode scanNode, Stack<LogicalNode> stack) throws TajoException { List<EvalNode> matched = TUtil.newList(); @@ -947,8 +957,42 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo qual = matched.iterator().next(); } + block.addAccessPath(scanNode, new SeqScanInfo(table)); if (qual != null) { // if a matched qual exists scanNode.setQual(qual); + + // Index path can be identified only after filters are pushed into each scan. + String databaseName, tableName; + databaseName = CatalogUtil.extractQualifier(table.getName()); + tableName = CatalogUtil.extractSimpleName(table.getName()); + Set<Predicate> predicates = TUtil.newHashSet(); + for (EvalNode eval : IndexUtil.getAllEqualEvals(qual)) { + BinaryEval binaryEval = (BinaryEval) eval; + // TODO: consider more complex predicates + if (binaryEval.getLeftExpr().getType() == EvalType.FIELD && + binaryEval.getRightExpr().getType() == EvalType.CONST) { + predicates.add(new Predicate(binaryEval.getType(), + ((FieldEval) binaryEval.getLeftExpr()).getColumnRef(), + ((ConstEval)binaryEval.getRightExpr()).getValue())); + } else if (binaryEval.getLeftExpr().getType() == EvalType.CONST && + binaryEval.getRightExpr().getType() == EvalType.FIELD) { + predicates.add(new Predicate(binaryEval.getType(), + ((FieldEval) binaryEval.getRightExpr()).getColumnRef(), + ((ConstEval)binaryEval.getLeftExpr()).getValue())); + } + } + + // for every subset of the set of columns, find all matched index paths + for (Set<Predicate> subset : Sets.powerSet(predicates)) { + if (subset.size() == 0) + continue; + Column[] columns = extractColumns(subset); + if (catalog.existIndexByColumns(databaseName, tableName, columns)) { + IndexDesc indexDesc = catalog.getIndexByColumns(databaseName, tableName, columns); + block.addAccessPath(scanNode, new IndexScanInfo( + table.getStats(), indexDesc, getSimplePredicates(indexDesc, subset))); + } + } } for (EvalNode matchedEval: matched) { @@ -961,6 +1005,49 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo return scanNode; } + private static class Predicate { + Column column; + Datum value; + EvalType evalType; + + public Predicate(EvalType evalType, Column column, Datum value) { + this.evalType = evalType; + this.column = column; + this.value = value; + } + } + + private static SimplePredicate[] getSimplePredicates(IndexDesc desc, Set<Predicate> predicates) { + SimplePredicate[] simplePredicates = new SimplePredicate[predicates.size()]; + Map<Column, Datum> colToValue = TUtil.newHashMap(); + for (Predicate predicate : predicates) { + colToValue.put(predicate.column, predicate.value); + } + SortSpec [] keySortSpecs = desc.getKeySortSpecs(); + for (int i = 0; i < keySortSpecs.length; i++) { + simplePredicates[i] = new SimplePredicate(keySortSpecs[i], + colToValue.get(keySortSpecs[i].getSortKey())); + } + return simplePredicates; + } + + private static Datum[] extractPredicateValues(List<Predicate> predicates) { + Datum[] values = new Datum[predicates.size()]; + for (int i = 0; i < values.length; i++) { + values[i] = predicates.get(i).value; + } + return values; + } + + private static Column[] extractColumns(Set<Predicate> predicates) { + Column[] columns = new Column[predicates.size()]; + int i = 0; + for (Predicate p : predicates) { + columns[i++] = p.column; + } + return columns; + } + private void errorFilterPushDown(LogicalPlan plan, LogicalNode node, FilterPushDownContext context) { String prefix = ""; http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/IndexScanInfo.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/IndexScanInfo.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/IndexScanInfo.java new file mode 100644 index 0000000..9ac8ccf --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/IndexScanInfo.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.rewrite.rules; + +import com.google.gson.annotations.Expose; +import org.apache.tajo.catalog.IndexDesc; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.ProtoObject; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.plan.serder.EvalNodeDeserializer; +import org.apache.tajo.plan.serder.EvalNodeSerializer; +import org.apache.tajo.plan.serder.PlanProto.SimplePredicateProto; + +import java.net.URI; + +public class IndexScanInfo extends AccessPathInfo { + + /** + * Simple predicate represents an equal eval expression which consists of + * a column and a value. + */ + // TODO: extend to represent more complex expressions + public static class SimplePredicate implements ProtoObject<SimplePredicateProto> { + @Expose private SortSpec keySortSpec; + @Expose private Datum value; + + public SimplePredicate(SortSpec keySortSpec, Datum value) { + this.keySortSpec = keySortSpec; + this.value = value; + } + + public SimplePredicate(SimplePredicateProto proto) { + keySortSpec = new SortSpec(proto.getKeySortSpec()); + value = EvalNodeDeserializer.deserialize(proto.getValue()); + } + + public SortSpec getKeySortSpec() { + return keySortSpec; + } + + public Datum getValue() { + return value; + } + + @Override + public boolean equals(Object o) { + if (o instanceof SimplePredicate) { + SimplePredicate other = (SimplePredicate) o; + return this.keySortSpec.equals(other.keySortSpec) && this.value.equals(other.value); + } else { + return false; + } + } + + @Override + public Object clone() throws CloneNotSupportedException { + SimplePredicate clone = new SimplePredicate(this.keySortSpec, this.value); + return clone; + } + + @Override + public SimplePredicateProto getProto() { + SimplePredicateProto.Builder builder = SimplePredicateProto.newBuilder(); + builder.setKeySortSpec(keySortSpec.getProto()); + builder.setValue(EvalNodeSerializer.serialize(value)); + return builder.build(); + } + } + + private final URI indexPath; + private final Schema keySchema; + private final SimplePredicate[] predicates; + + public IndexScanInfo(TableStats tableStats, IndexDesc indexDesc, SimplePredicate[] predicates) { + super(ScanTypeControl.INDEX_SCAN, tableStats); + this.indexPath = indexDesc.getIndexPath(); + keySchema = new Schema(); + this.predicates = predicates; + for (SimplePredicate predicate : predicates) { + keySchema.addColumn(predicate.getKeySortSpec().getSortKey()); + } + } + + public URI getIndexPath() { + return indexPath; + } + + public Schema getKeySchema() { + return keySchema; + } + + public SimplePredicate[] getPredicates() { + return predicates; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java index afe0c4d..c35194e 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java @@ -18,11 +18,11 @@ package org.apache.tajo.plan.rewrite.rules; -import org.apache.tajo.OverridableConf; import org.apache.tajo.exception.TajoException; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.logical.LogicalNode; import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; +import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext; import org.apache.tajo.plan.serder.LogicalNodeDeserializer; import org.apache.tajo.plan.serder.LogicalNodeSerializer; import org.apache.tajo.plan.serder.PlanProto; @@ -40,15 +40,16 @@ public class LogicalPlanEqualityTester implements LogicalPlanRewriteRule { } @Override - public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) { + public boolean isEligible(LogicalPlanRewriteRuleContext context) { return true; } @Override - public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws TajoException { + public LogicalPlan rewrite(LogicalPlanRewriteRuleContext context) throws TajoException { + LogicalPlan plan = context.getPlan(); LogicalNode root = plan.getRootBlock().getRoot(); PlanProto.LogicalNodeTree serialized = LogicalNodeSerializer.serialize(plan.getRootBlock().getRoot()); - LogicalNode deserialized = LogicalNodeDeserializer.deserialize(queryContext, null, serialized); + LogicalNode deserialized = LogicalNodeDeserializer.deserialize(context.getQueryContext(), null, serialized); assert root.deepEquals(deserialized); return plan; } http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java index 71b9fd7..244d385 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java @@ -36,6 +36,7 @@ import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; +import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor; import org.apache.tajo.storage.Tuple; @@ -59,8 +60,8 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule { } @Override - public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) { - for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) { + public boolean isEligible(LogicalPlanRewriteRuleContext context) { + for (LogicalPlan.QueryBlock block : context.getPlan().getQueryBlocks()) { for (RelationNode relation : block.getRelations()) { if (relation.getType() == NodeType.SCAN) { TableDesc table = ((ScanNode)relation).getTableDesc(); @@ -74,9 +75,10 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule { } @Override - public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws TajoException { + public LogicalPlan rewrite(LogicalPlanRewriteRuleContext context) throws TajoException { + LogicalPlan plan = context.getPlan(); LogicalPlan.QueryBlock rootBlock = plan.getRootBlock(); - rewriter.visit(queryContext, plan, rootBlock, rootBlock.getRoot(), new Stack<LogicalNode>()); + rewriter.visit(context.getQueryContext(), plan, rootBlock, rootBlock.getRoot(), new Stack<LogicalNode>()); return plan; } http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java index 650a484..78a5dad 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java @@ -37,6 +37,7 @@ import org.apache.tajo.plan.Target; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; +import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor; import org.apache.tajo.util.TUtil; @@ -61,13 +62,13 @@ public class ProjectionPushDownRule extends } @Override - public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) { - LogicalNode toBeOptimized = plan.getRootBlock().getRoot(); + public boolean isEligible(LogicalPlanRewriteRuleContext context) { + LogicalNode toBeOptimized = context.getPlan().getRootBlock().getRoot(); if (PlannerUtil.checkIfDDLPlan(toBeOptimized)) { return false; } - for (QueryBlock eachBlock: plan.getQueryBlocks()) { + for (QueryBlock eachBlock: context.getPlan().getQueryBlocks()) { if (eachBlock.hasTableExpression()) { return true; } @@ -76,7 +77,8 @@ public class ProjectionPushDownRule extends } @Override - public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws TajoException { + public LogicalPlan rewrite(LogicalPlanRewriteRuleContext rewriteRuleContext) throws TajoException { + LogicalPlan plan = rewriteRuleContext.getPlan(); LogicalPlan.QueryBlock rootBlock = plan.getRootBlock(); LogicalPlan.QueryBlock topmostBlock = rootBlock; @@ -524,6 +526,11 @@ public class ProjectionPushDownRule extends createTableNode.setChild(child); createTableNode.setInSchema(child.getOutSchema()); break; + case CREATE_INDEX: + CreateIndexNode createIndexNode = (CreateIndexNode) parentNode; + createIndexNode.setChild(child); + createIndexNode.setInSchema(child.getOutSchema()); + break; default: throw new TajoInternalError("unexpected parent node: " + parentNode.getType()); } @@ -1111,6 +1118,12 @@ public class ProjectionPushDownRule extends } @Override + public LogicalNode visitIndexScan(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, + IndexScanNode node, Stack<LogicalNode> stack) throws TajoException { + return visitScan(context, plan, block,node, stack); + } + + @Override public LogicalNode visitTableSubQuery(Context upperContext, LogicalPlan plan, LogicalPlan.QueryBlock block, TableSubQueryNode node, Stack<LogicalNode> stack) throws TajoException { Context childContext = new Context(plan, upperContext.requiredSet); http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/SeqScanInfo.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/SeqScanInfo.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/SeqScanInfo.java new file mode 100644 index 0000000..0c054bd --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/SeqScanInfo.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.rewrite.rules; + +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.statistics.TableStats; + +public class SeqScanInfo extends AccessPathInfo { + private TableDesc tableDesc; + + public SeqScanInfo(TableStats tableStats) { + super(ScanTypeControl.SEQ_SCAN, tableStats); + } + + public SeqScanInfo(TableDesc tableDesc) { + this(tableDesc.getStats()); + this.setTableDesc(tableDesc); + } + + public TableDesc getTableDesc() { + return tableDesc; + } + + public void setTableDesc(TableDesc tableDesc) { + this.tableDesc = tableDesc; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java index fe900c0..25897f2 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java @@ -34,10 +34,12 @@ import org.apache.tajo.exception.UnimplementedException; import org.apache.tajo.plan.Target; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.rewrite.rules.IndexScanInfo.SimplePredicate; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.TUtil; import java.net.URI; +import java.net.URISyntaxException; import java.util.*; /** @@ -124,6 +126,9 @@ public class LogicalNodeDeserializer { case SCAN: current = convertScan(context, evalContext, protoNode); break; + case INDEX_SCAN: + current = convertIndexScan(context, evalContext, protoNode); + break; case CREATE_TABLE: current = convertCreateTable(nodeMap, protoNode); @@ -152,6 +157,13 @@ public class LogicalNodeDeserializer { current = convertTruncateTable(protoNode); break; + case CREATE_INDEX: + current = convertCreateIndex(nodeMap, protoNode); + break; + case DROP_INDEX: + current = convertDropIndex(protoNode); + break; + default: throw new RuntimeException("Unknown NodeType: " + protoNode.getType().name()); } @@ -427,6 +439,23 @@ public class LogicalNodeDeserializer { scan.setOutSchema(convertSchema(protoNode.getOutSchema())); } + private static IndexScanNode convertIndexScan(OverridableConf context, EvalContext evalContext, + PlanProto.LogicalNode protoNode) { + IndexScanNode indexScan = new IndexScanNode(protoNode.getNodeId()); + fillScanNode(context, evalContext, protoNode, indexScan); + + PlanProto.IndexScanSpec indexScanSpec = protoNode.getIndexScan(); + SimplePredicate[] predicates = new SimplePredicate[indexScanSpec.getPredicatesCount()]; + for (int i = 0; i < predicates.length; i++) { + predicates[i] = new SimplePredicate(indexScanSpec.getPredicates(i)); + } + + indexScan.set(new Schema(indexScanSpec.getKeySchema()), predicates, + TUtil.stringToURI(indexScanSpec.getIndexPath())); + + return indexScan; + } + private static PartitionedTableScanNode convertPartitionScan(OverridableConf context, EvalContext evalContext, PlanProto.LogicalNode protoNode) { PartitionedTableScanNode partitionedScan = new PartitionedTableScanNode(protoNode.getNodeId()); @@ -628,6 +657,46 @@ public class LogicalNodeDeserializer { return truncateTable; } + private static CreateIndexNode convertCreateIndex(Map<Integer, LogicalNode> nodeMap, + PlanProto.LogicalNode protoNode) { + CreateIndexNode createIndex = new CreateIndexNode(protoNode.getNodeId()); + + PlanProto.CreateIndexNode createIndexProto = protoNode.getCreateIndex(); + createIndex.setIndexName(createIndexProto.getIndexName()); + createIndex.setIndexMethod(createIndexProto.getIndexMethod()); + try { + createIndex.setIndexPath(new URI(createIndexProto.getIndexPath())); + } catch (URISyntaxException e) { + e.printStackTrace(); + } + SortSpec[] keySortSpecs = new SortSpec[createIndexProto.getKeySortSpecsCount()]; + for (int i = 0; i < keySortSpecs.length; i++) { + keySortSpecs[i] = new SortSpec(createIndexProto.getKeySortSpecs(i)); + } + createIndex.setKeySortSpecs(new Schema(createIndexProto.getTargetRelationSchema()), + keySortSpecs); + createIndex.setUnique(createIndexProto.getIsUnique()); + createIndex.setClustered(createIndexProto.getIsClustered()); + if (createIndexProto.hasIndexProperties()) { + createIndex.setOptions(new KeyValueSet(createIndexProto.getIndexProperties())); + } + createIndex.setChild(nodeMap.get(createIndexProto.getChildSeq())); + createIndex.setInSchema(convertSchema(protoNode.getInSchema())); + createIndex.setOutSchema(convertSchema(protoNode.getOutSchema())); + createIndex.setExternal(createIndexProto.getIsExternal()); + + return createIndex; + } + + private static DropIndexNode convertDropIndex(PlanProto.LogicalNode protoNode) { + DropIndexNode dropIndex = new DropIndexNode(protoNode.getNodeId()); + + PlanProto.DropIndexNode dropIndexProto = protoNode.getDropIndex(); + dropIndex.setIndexName(dropIndexProto.getIndexName()); + + return dropIndex; + } + private static AggregationFunctionCallEval [] convertAggFuncCallEvals(OverridableConf context, EvalContext evalContext, List<PlanProto.EvalNodeTree> evalTrees) { AggregationFunctionCallEval [] aggFuncs = new AggregationFunctionCallEval[evalTrees.size()]; http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java index 0207932..fe69fc1 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java @@ -21,13 +21,16 @@ package org.apache.tajo.plan.serder; import com.google.common.collect.Maps; import org.apache.hadoop.fs.Path; import org.apache.tajo.algebra.JoinType; +import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto; import org.apache.tajo.exception.TajoException; import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.exception.UnimplementedException; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.Target; import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.rewrite.rules.IndexScanInfo.SimplePredicate; import org.apache.tajo.plan.serder.PlanProto.AlterTableNode.AddColumn; import org.apache.tajo.plan.serder.PlanProto.AlterTableNode.RenameColumn; import org.apache.tajo.plan.serder.PlanProto.AlterTableNode.RenameTable; @@ -105,6 +108,7 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe private LogicalNodeTree.Builder treeBuilder = LogicalNodeTree.newBuilder(); } + @Override public LogicalNode visitRoot(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, LogicalRootNode root, Stack<LogicalNode> stack) throws TajoException { super.visitRoot(context, plan, block, root, stack); @@ -139,6 +143,7 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe return node; } + @Override public LogicalNode visitEvalExpr(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, EvalExprNode exprEval, Stack<LogicalNode> stack) throws TajoException { PlanProto.EvalExprNode.Builder exprEvalBuilder = PlanProto.EvalExprNode.newBuilder(); @@ -152,6 +157,7 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe return exprEval; } + @Override public LogicalNode visitProjection(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, ProjectionNode projection, Stack<LogicalNode> stack) throws TajoException { super.visitProjection(context, plan, block, projection, stack); @@ -189,6 +195,7 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe return limit; } + @Override public LogicalNode visitWindowAgg(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, WindowAggNode windowAgg, Stack<LogicalNode> stack) throws TajoException { super.visitWindowAgg(context, plan, block, windowAgg, stack); @@ -263,6 +270,7 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe return having; } + @Override public LogicalNode visitGroupBy(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, GroupbyNode node, Stack<LogicalNode> stack) throws TajoException { super.visitGroupBy(context, plan, block, node, new Stack<LogicalNode>()); @@ -298,6 +306,7 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe return nodeBuilder; } + @Override public LogicalNode visitDistinctGroupby(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, DistinctGroupbyNode node, Stack<LogicalNode> stack) throws TajoException { super.visitDistinctGroupby(context, plan, block, node, new Stack<LogicalNode>()); @@ -354,6 +363,7 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe return filter; } + @Override public LogicalNode visitJoin(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode join, Stack<LogicalNode> stack) throws TajoException { super.visitJoin(context, plan, block, join, stack); @@ -438,6 +448,26 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe } @Override + public LogicalNode visitIndexScan(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + IndexScanNode node, Stack<LogicalNode> stack) throws TajoException { + + PlanProto.ScanNode.Builder scanBuilder = buildScanNode(node); + + PlanProto.IndexScanSpec.Builder indexScanSpecBuilder = PlanProto.IndexScanSpec.newBuilder(); + indexScanSpecBuilder.setKeySchema(node.getKeySchema().getProto()); + indexScanSpecBuilder.setIndexPath(node.getIndexPath().toString()); + for (SimplePredicate predicate : node.getPredicates()) { + indexScanSpecBuilder.addPredicates(predicate.getProto()); + } + + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node); + nodeBuilder.setScan(scanBuilder); + nodeBuilder.setIndexScan(indexScanSpecBuilder); + context.treeBuilder.addNodes(nodeBuilder); + return node; + } + + @Override public LogicalNode visitPartitionedTableScan(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, PartitionedTableScanNode node, Stack<LogicalNode> stack) throws TajoException { @@ -461,6 +491,7 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe return node; } + @Override public LogicalNode visitTableSubQuery(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, TableSubQueryNode node, Stack<LogicalNode> stack) throws TajoException { super.visitTableSubQuery(context, plan, block, node, stack); @@ -483,6 +514,7 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe return node; } + @Override public LogicalNode visitCreateTable(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, CreateTableNode node, Stack<LogicalNode> stack) throws TajoException { super.visitCreateTable(context, plan, block, node, stack); @@ -623,6 +655,7 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe return node; } + @Override public LogicalNode visitInsert(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, InsertNode node, Stack<LogicalNode> stack) throws TajoException { super.visitInsert(context, plan, block, node, stack); @@ -711,6 +744,48 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe return node; } + @Override + public LogicalNode visitCreateIndex(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + CreateIndexNode node, Stack<LogicalNode> stack) throws TajoException { + super.visitCreateIndex(context, plan, block, node, new Stack<LogicalNode>()); + + PlanProto.CreateIndexNode.Builder createIndexBuilder = PlanProto.CreateIndexNode.newBuilder(); + int [] childIds = registerGetChildIds(context, node); + createIndexBuilder.setChildSeq(childIds[0]); + createIndexBuilder.setIndexName(node.getIndexName()); + createIndexBuilder.setIndexMethod(node.getIndexMethod()); + createIndexBuilder.setIndexPath(node.getIndexPath().toString()); + for (SortSpec sortSpec : node.getKeySortSpecs()) { + createIndexBuilder.addKeySortSpecs(sortSpec.getProto()); + } + createIndexBuilder.setTargetRelationSchema(node.getTargetRelationSchema().getProto()); + createIndexBuilder.setIsUnique(node.isUnique()); + createIndexBuilder.setIsClustered(node.isClustered()); + if (node.hasOptions()) { + createIndexBuilder.setIndexProperties(node.getOptions().getProto()); + } + createIndexBuilder.setIsExternal(node.isExternal()); + + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node); + nodeBuilder.setCreateIndex(createIndexBuilder); + context.treeBuilder.addNodes(nodeBuilder); + + return node; + } + + @Override + public LogicalNode visitDropIndex(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + DropIndexNode node, Stack<LogicalNode> stack) { + PlanProto.DropIndexNode.Builder dropIndexBuilder = PlanProto.DropIndexNode.newBuilder(); + dropIndexBuilder.setIndexName(node.getIndexName()); + + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node); + nodeBuilder.setDropIndex(dropIndexBuilder); + context.treeBuilder.addNodes(nodeBuilder); + + return node; + } + public static PlanProto.NodeType convertType(NodeType type) { return PlanProto.NodeType.valueOf(type.name()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/IndexUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/IndexUtil.java new file mode 100644 index 0000000..73b33d5 --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/IndexUtil.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.util; + +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.plan.expr.*; + +import java.util.LinkedList; +import java.util.List; +import java.util.Stack; + +public class IndexUtil { + + public static String getIndexName(String indexName , SortSpec[] keys) { + StringBuilder builder = new StringBuilder(); + builder.append(indexName + "_"); + for(int i = 0 ; i < keys.length ; i ++) { + builder.append(keys[i].getSortKey().getSimpleName() + "_"); + } + return builder.toString(); + } + + public static List<EvalNode> getAllEqualEvals(EvalNode qual) { + EvalTreeUtil.EvalFinder finder = new EvalTreeUtil.EvalFinder(EvalType.EQUAL); + finder.visitChild(null, qual, new Stack<EvalNode>()); + return finder.getEvalNodes(); + } + + private static class FieldAndValueFinder implements EvalNodeVisitor { + private LinkedList<BinaryEval> nodeList = new LinkedList<BinaryEval>(); + + public LinkedList<BinaryEval> getNodeList () { + return this.nodeList; + } + + @Override + public void visit(EvalNode node) { + BinaryEval binaryEval = (BinaryEval) node; + switch(node.getType()) { + case AND: + break; + case EQUAL: + if( binaryEval.getLeftExpr().getType() == EvalType.FIELD + && binaryEval.getRightExpr().getType() == EvalType.CONST ) { + nodeList.add(binaryEval); + } + break; + case IS_NULL: + if( binaryEval.getLeftExpr().getType() == EvalType.FIELD + && binaryEval.getRightExpr().getType() == EvalType.CONST) { + nodeList.add(binaryEval); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java index 9c34826..c4a4367 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java @@ -69,10 +69,31 @@ public class PlannerUtil { type == NodeType.CREATE_DATABASE || type == NodeType.DROP_DATABASE || (type == NodeType.CREATE_TABLE && !((CreateTableNode) baseNode).hasSubQuery()) || - baseNode.getType() == NodeType.DROP_TABLE || - baseNode.getType() == NodeType.ALTER_TABLESPACE || - baseNode.getType() == NodeType.ALTER_TABLE || - baseNode.getType() == NodeType.TRUNCATE_TABLE; + type == NodeType.DROP_TABLE || + type == NodeType.ALTER_TABLESPACE || + type == NodeType.ALTER_TABLE || + type == NodeType.TRUNCATE_TABLE || + type == NodeType.CREATE_INDEX || + type == NodeType.DROP_INDEX; + } + + /** + * Most update queries require only the updates to the catalog information, + * but some queries such as "CREATE INDEX" or CTAS requires distributed execution on multiple cluster nodes. + * This function checks whether the given DDL plan requires distributed execution or not. + * @param node the root node of a query plan + * @return Return true if the input query plan requires distributed execution. Otherwise, return false. + */ + public static boolean isDistExecDDL(LogicalNode node) { + LogicalNode baseNode = node; + if (node instanceof LogicalRootNode) { + baseNode = ((LogicalRootNode) node).getChild(); + } + + NodeType type = baseNode.getType(); + + return type == NodeType.CREATE_INDEX && !((CreateIndexNode)baseNode).isExternal() || + type == NodeType.CREATE_TABLE && ((CreateTableNode)baseNode).hasSubQuery(); } /** @@ -386,6 +407,12 @@ public class PlannerUtil { QueryBlock block, PartitionedTableScanNode node, Stack<LogicalNode> stack) { return node; } + + @Override + public LogicalNode visitIndexScan(ReplacerContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + IndexScanNode node, Stack<LogicalNode> stack) throws TajoException { + return node; + } } public static void replaceNode(LogicalNode plan, LogicalNode newNode, NodeType type) { http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java index ebb5c80..2c8f344 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java @@ -21,7 +21,6 @@ package org.apache.tajo.plan.visitor; import org.apache.tajo.exception.TajoException; import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.plan.LogicalPlan; -import org.apache.tajo.plan.PlanningException; import org.apache.tajo.plan.logical.*; import java.util.Stack; @@ -113,6 +112,9 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi case PARTITIONS_SCAN: current = visitPartitionedTableScan(context, plan, block, (PartitionedTableScanNode) node, stack); break; + case INDEX_SCAN: + current = visitIndexScan(context, plan, block, (IndexScanNode) node, stack); + break; case STORE: current = visitStoreTable(context, plan, block, (StoreTableNode) node, stack); break; @@ -137,9 +139,15 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi case ALTER_TABLE: current = visitAlterTable(context, plan, block, (AlterTableNode) node, stack); break; + case CREATE_INDEX: + current = visitCreateIndex(context, plan, block, (CreateIndexNode) node, stack); + break; case TRUNCATE_TABLE: current = visitTruncateTable(context, plan, block, (TruncateTableNode) node, stack); break; + case DROP_INDEX: + current = visitDropIndex(context, plan, block, (DropIndexNode) node, stack); + break; default: throw new TajoInternalError("Unknown logical node type: " + node.getType()); } @@ -312,6 +320,12 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi } @Override + public RESULT visitIndexScan(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, IndexScanNode node, + Stack<LogicalNode> stack) throws TajoException { + return null; + } + + @Override public RESULT visitPartitionedTableScan(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, PartitionedTableScanNode node, Stack<LogicalNode> stack) throws TajoException { @@ -379,6 +393,22 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi } @Override + public RESULT visitCreateIndex(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, CreateIndexNode node, + Stack<LogicalNode> stack) throws TajoException { + RESULT result = null; + stack.push(node); + result = visit(context, plan, block, node.getChild(), stack); + stack.pop(); + return result; + } + + @Override + public RESULT visitDropIndex(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, DropIndexNode node, + Stack<LogicalNode> stack) { + return null; + } + + @Override public RESULT visitTruncateTable(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, TruncateTableNode node, Stack<LogicalNode> stack) throws TajoException { return null; http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/ExplainLogicalPlanVisitor.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/ExplainLogicalPlanVisitor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/ExplainLogicalPlanVisitor.java index 925742a..f826479 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/ExplainLogicalPlanVisitor.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/ExplainLogicalPlanVisitor.java @@ -22,7 +22,6 @@ import org.apache.tajo.annotation.Nullable; import org.apache.tajo.exception.TajoException; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.PlanString; -import org.apache.tajo.plan.PlanningException; import org.apache.tajo.plan.logical.*; import java.util.Stack; @@ -235,6 +234,19 @@ public class ExplainLogicalPlanVisitor extends BasicLogicalPlanVisitor<ExplainLo return node; } + @Override + public LogicalNode visitCreateIndex(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, + CreateIndexNode node, Stack<LogicalNode> stack) throws TajoException { + return visitUnaryNode(context, plan, block, node, stack); + } + + @Override + public LogicalNode visitDropIndex(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, + DropIndexNode node, Stack<LogicalNode> stack) { + context.add(context.depth, node.getPlanString()); + return node; + } + public static String printDepthString(int maxDepth, DepthString planStr) { StringBuilder output = new StringBuilder(); String pad = new String(new char[planStr.getDepth() * 3]).replace('\0', ' '); http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/LogicalPlanVisitor.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/LogicalPlanVisitor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/LogicalPlanVisitor.java index dd832ae..9e9389d 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/LogicalPlanVisitor.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/LogicalPlanVisitor.java @@ -20,7 +20,6 @@ package org.apache.tajo.plan.visitor; import org.apache.tajo.exception.TajoException; import org.apache.tajo.plan.LogicalPlan; -import org.apache.tajo.plan.PlanningException; import org.apache.tajo.plan.logical.*; import java.util.Stack; @@ -76,6 +75,9 @@ public interface LogicalPlanVisitor<CONTEXT, RESULT> { RESULT visitScan(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode node, Stack<LogicalNode> stack) throws TajoException; + RESULT visitIndexScan(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, IndexScanNode node, + Stack<LogicalNode> stack) throws TajoException; + RESULT visitPartitionedTableScan(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, PartitionedTableScanNode node, Stack<LogicalNode> stack) throws TajoException; @@ -103,6 +105,12 @@ public interface LogicalPlanVisitor<CONTEXT, RESULT> { RESULT visitAlterTable(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, AlterTableNode node, Stack<LogicalNode> stack) throws TajoException; + RESULT visitCreateIndex(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, CreateIndexNode node, + Stack<LogicalNode> stack) throws TajoException; + + RESULT visitDropIndex(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, DropIndexNode node, + Stack<LogicalNode> stack) throws TajoException; + RESULT visitTruncateTable(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, TruncateTableNode node, Stack<LogicalNode> stack) throws TajoException; } http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/SimpleAlgebraVisitor.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/SimpleAlgebraVisitor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/SimpleAlgebraVisitor.java index 989af88..4854d7f 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/SimpleAlgebraVisitor.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/SimpleAlgebraVisitor.java @@ -20,7 +20,6 @@ package org.apache.tajo.plan.visitor; import org.apache.tajo.algebra.*; import org.apache.tajo.exception.TajoException; -import org.apache.tajo.plan.PlanningException; import org.apache.tajo.plan.algebra.BaseAlgebraVisitor; import java.util.Stack; http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/proto/Plan.proto ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/proto/Plan.proto b/tajo-plan/src/main/proto/Plan.proto index 537d180..4019322 100644 --- a/tajo-plan/src/main/proto/Plan.proto +++ b/tajo-plan/src/main/proto/Plan.proto @@ -46,7 +46,7 @@ enum NodeType { TABLE_SUBQUERY = 15; SCAN = 16; PARTITIONS_SCAN = 17; - BST_INDEX_SCAN = 18; + INDEX_SCAN = 18; STORE = 19; INSERT = 20; @@ -57,6 +57,8 @@ enum NodeType { ALTER_TABLESPACE = 25; ALTER_TABLE = 26; TRUNCATE_TABLE = 27; + CREATE_INDEX = 28; + DROP_INDEX = 29; } message LogicalNodeTree { @@ -72,31 +74,35 @@ message LogicalNode { optional ScanNode scan = 6; optional PartitionScanSpec partitionScan = 7; - optional JoinNode join = 8; - optional FilterNode filter = 9; - optional GroupbyNode groupby = 10; - optional DistinctGroupbyNode distinctGroupby = 11; - optional SortNode sort = 12; - optional LimitNode limit = 13; - optional WindowAggNode windowAgg = 14; - optional ProjectionNode projection = 15; - optional EvalExprNode exprEval = 16; - optional UnionNode union = 17; - optional TableSubQueryNode tableSubQuery = 18; - optional PersistentStoreNode persistentStore = 19; - optional StoreTableNodeSpec storeTable = 20; - optional InsertNodeSpec insert = 21; - optional CreateTableNodeSpec createTable = 22; - optional RootNode root = 23; - optional SetSessionNode setSession = 24; - - optional CreateDatabaseNode createDatabase = 25; - optional DropDatabaseNode dropDatabase = 26; - optional DropTableNode dropTable = 27; - - optional AlterTablespaceNode alterTablespace = 28; - optional AlterTableNode alterTable = 29; - optional TruncateTableNode truncateTableNode = 30; + optional IndexScanSpec indexScan = 8; + optional JoinNode join = 9; + optional FilterNode filter = 10; + optional GroupbyNode groupby = 11; + optional DistinctGroupbyNode distinctGroupby = 12; + optional SortNode sort = 13; + optional LimitNode limit = 14; + optional WindowAggNode windowAgg = 15; + optional ProjectionNode projection = 16; + optional EvalExprNode exprEval = 17; + optional UnionNode union = 18; + optional TableSubQueryNode tableSubQuery = 19; + optional PersistentStoreNode persistentStore = 20; + optional StoreTableNodeSpec storeTable = 21; + optional InsertNodeSpec insert = 22; + optional CreateTableNodeSpec createTable = 23; + optional RootNode root = 24; + optional SetSessionNode setSession = 25; + + optional CreateDatabaseNode createDatabase = 26; + optional DropDatabaseNode dropDatabase = 27; + optional DropTableNode dropTable = 28; + + optional AlterTablespaceNode alterTablespace = 29; + optional AlterTableNode alterTable = 30; + optional TruncateTableNode truncateTableNode = 31; + + optional CreateIndexNode createIndex = 32; + optional DropIndexNode dropIndex = 33; } message ScanNode { @@ -112,6 +118,17 @@ message PartitionScanSpec { repeated string paths = 1; } +message IndexScanSpec { + required SchemaProto keySchema = 1; + required string indexPath = 2; + repeated SimplePredicateProto predicates = 3; +} + +message SimplePredicateProto { + required SortSpecProto keySortSpec = 1; + required Datum value = 2; +} + message FilterNode { required int32 childSeq = 1; required EvalNodeTree qual = 2; @@ -314,6 +331,23 @@ message AlterTableNode { optional AlterPartition alterPartition = 7; } +message CreateIndexNode { + required int32 childSeq = 1; + required string indexName = 2; + required IndexMethod indexMethod = 3; + required string indexPath = 4; + repeated SortSpecProto keySortSpecs = 5; + required SchemaProto targetRelationSchema = 6; + optional bool isUnique = 7 [default = false]; + optional bool isClustered = 8 [default = false]; + optional KeyValueSetProto indexProperties = 9; + optional bool isExternal = 10; +} + +message DropIndexNode { + required string indexName = 1; +} + enum EvalType { NOT = 0; AND = 1; http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java index ef33a8e..d12c6bd 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java @@ -174,22 +174,6 @@ public class OldStorageManager { } /** - * Returns Scanner instance. - * - * @param conf The system property - * @param meta The table meta - * @param schema The input schema - * @param fragment The fragment for scanning - * @param target The output schema - * @return Scanner instance - * @throws IOException - */ - public static synchronized SeekableScanner getSeekableScanner( - TajoConf conf, TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException { - return (SeekableScanner)getStorageManager(conf, meta.getStoreType()).getScanner(meta, schema, fragment, target); - } - - /** * Creates a scanner instance. * * @param theClass Concrete class of scanner http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java index f8883b0..4d7fac4 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java @@ -268,6 +268,21 @@ public abstract class Tablespace { } /** + * Returns Scanner instance. + * + * @param meta The table meta + * @param schema The input schema + * @param fragment The fragment for scanning + * @param target The output schema + * @return Scanner instance + * @throws IOException + */ + public synchronized SeekableScanner getSeekableScanner(TableMeta meta, Schema schema, FragmentProto fragment, + Schema target) throws IOException { + return (SeekableScanner)this.getScanner(meta, schema, fragment, target); + } + + /** * Returns Appender instance. * @param queryContext Query property. * @param taskAttemptId Task id. http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java index 00cac77..3060d53 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java @@ -49,6 +49,7 @@ import org.apache.tajo.plan.logical.CreateTableNode; import org.apache.tajo.plan.logical.LogicalNode; import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext; import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.util.*; @@ -1091,8 +1092,8 @@ public class HBaseTablespace extends Tablespace { @Override public void rewritePlan(OverridableConf context, LogicalPlan plan) throws TajoException { - if (REWRITE_RULE.isEligible(context, plan)) { - REWRITE_RULE.rewrite(context, plan); + if (REWRITE_RULE.isEligible(new LogicalPlanRewriteRuleContext(context, plan))) { + REWRITE_RULE.rewrite(new LogicalPlanRewriteRuleContext(context, plan)); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java index db9f3c8..40789ac 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java @@ -18,7 +18,6 @@ package org.apache.tajo.storage.hbase; -import org.apache.tajo.OverridableConf; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SortSpec; @@ -28,6 +27,7 @@ import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.logical.SortNode.SortPurpose; import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; +import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext; import org.apache.tajo.util.KeyValueSet; import java.io.IOException; @@ -46,9 +46,9 @@ public class SortedInsertRewriter implements LogicalPlanRewriteRule { } @Override - public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) { - boolean hbaseMode = "false".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false")); - LogicalRootNode rootNode = plan.getRootBlock().getRoot(); + public boolean isEligible(LogicalPlanRewriteRuleContext context) { + boolean hbaseMode = "false".equalsIgnoreCase(context.getQueryContext().get(HBaseStorageConstants.INSERT_PUT_MODE, "false")); + LogicalRootNode rootNode = context.getPlan().getRootBlock().getRoot(); LogicalNode node = rootNode.getChild(); return hbaseMode && node.getType() == NodeType.CREATE_TABLE || node.getType() == NodeType.INSERT; } @@ -69,7 +69,8 @@ public class SortedInsertRewriter implements LogicalPlanRewriteRule { } @Override - public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws TajoException { + public LogicalPlan rewrite(LogicalPlanRewriteRuleContext context) throws TajoException { + LogicalPlan plan = context.getPlan(); LogicalRootNode rootNode = plan.getRootBlock().getRoot(); StoreTableNode storeTable = rootNode.getChild(); http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java index ed010c0..5437b0d 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java @@ -124,7 +124,8 @@ public class TestBSTIndex { creater.setLoadNum(LOAD_NUM); creater.open(); - SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()). + getSeekableScanner(meta, schema, tablet.getProto(), schema); scanner.init(); Tuple keyTuple; @@ -147,7 +148,8 @@ public class TestBSTIndex { tuple = new VTuple(keySchema.size()); BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + storeType + ".idx"), keySchema, comp); reader.open(); - scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()). + getSeekableScanner(meta, schema, tablet.getProto(), schema); scanner.init(); for (int i = 0; i < TUPLE_NUM - 1; i++) { @@ -226,7 +228,8 @@ public class TestBSTIndex { BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"), keySchema, comp); reader.open(); - SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()). + getSeekableScanner(meta, schema, tablet.getProto(), schema); scanner.init(); for (int i = 0; i < TUPLE_NUM - 1; i++) { @@ -289,7 +292,8 @@ public class TestBSTIndex { creater.setLoadNum(LOAD_NUM); creater.open(); - SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()). + getSeekableScanner(meta, schema, tablet.getProto(), schema); scanner.init(); Tuple keyTuple; @@ -361,7 +365,8 @@ public class TestBSTIndex { creater.setLoadNum(LOAD_NUM); creater.open(); - SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()). + getSeekableScanner(meta, schema, tablet.getProto(), schema); scanner.init(); Tuple keyTuple; @@ -384,7 +389,8 @@ public class TestBSTIndex { BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"), keySchema, comp); reader.open(); - scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()). + getSeekableScanner(meta, schema, tablet.getProto(), schema); scanner.init(); Tuple result; @@ -452,7 +458,8 @@ public class TestBSTIndex { creater.setLoadNum(LOAD_NUM); creater.open(); - SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()). + getSeekableScanner(meta, schema, tablet.getProto(), schema); scanner.init(); Tuple keyTuple; @@ -475,7 +482,8 @@ public class TestBSTIndex { BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"), keySchema, comp); reader.open(); - scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()). + getSeekableScanner(meta, schema, tablet.getProto(), schema); scanner.init(); Tuple result; @@ -532,7 +540,8 @@ public class TestBSTIndex { creater.setLoadNum(LOAD_NUM); creater.open(); - SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()). + getSeekableScanner(meta, schema, tablet.getProto(), schema); scanner.init(); Tuple keyTuple; @@ -557,7 +566,8 @@ public class TestBSTIndex { BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindMinValue_" + storeType + ".idx"), keySchema, comp); reader.open(); - scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()). + getSeekableScanner(meta, schema, tablet.getProto(), schema); scanner.init(); tuple.put(0, DatumFactory.createInt8(0)); @@ -616,7 +626,8 @@ public class TestBSTIndex { creater.setLoadNum(LOAD_NUM); creater.open(); - SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()). + getSeekableScanner(meta, schema, tablet.getProto(), schema); scanner.init(); Tuple keyTuple; @@ -722,7 +733,8 @@ public class TestBSTIndex { creater.setLoadNum(LOAD_NUM); creater.open(); - SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()). + getSeekableScanner(meta, schema, tablet.getProto(), schema); scanner.init(); Tuple keyTuple; @@ -804,7 +816,8 @@ public class TestBSTIndex { creater.setLoadNum(LOAD_NUM); creater.open(); - SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()). + getSeekableScanner(meta, schema, tablet.getProto(), schema); scanner.init(); Tuple keyTuple; @@ -829,7 +842,8 @@ public class TestBSTIndex { BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValueDescOrder_" + storeType + ".idx"), keySchema, comp); reader.open(); - scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()). + getSeekableScanner(meta, schema, tablet.getProto(), schema); scanner.init(); for (int i = (TUPLE_NUM - 1); i > 0; i--) { @@ -894,7 +908,8 @@ public class TestBSTIndex { creater.setLoadNum(LOAD_NUM); creater.open(); - SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()). + getSeekableScanner(meta, schema, tablet.getProto(), schema); scanner.init(); Tuple keyTuple; @@ -922,7 +937,8 @@ public class TestBSTIndex { assertEquals(keySchema, reader.getKeySchema()); assertEquals(comp, reader.getComparator()); - scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()). + getSeekableScanner(meta, schema, tablet.getProto(), schema); scanner.init(); Tuple result; http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java index c198965..a001492 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java @@ -111,7 +111,8 @@ public class TestSingleCSVFileBSTIndex { creater.setLoadNum(LOAD_NUM); creater.open(); - SeekableScanner fileScanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + SeekableScanner fileScanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()) + .getSeekableScanner(meta, schema, tablet.getProto(), schema); fileScanner.init(); Tuple keyTuple; long offset; @@ -135,7 +136,8 @@ public class TestSingleCSVFileBSTIndex { BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindValueInCSV.idx"), keySchema, comp); reader.open(); - fileScanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + fileScanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()) + .getSeekableScanner(meta, schema, tablet.getProto(), schema); fileScanner.init(); for (int i = 0; i < TUPLE_NUM - 1; i++) { tuple.put(0, DatumFactory.createInt8(i)); @@ -200,7 +202,8 @@ public class TestSingleCSVFileBSTIndex { creater.setLoadNum(LOAD_NUM); creater.open(); - SeekableScanner fileScanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + SeekableScanner fileScanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()) + .getSeekableScanner(meta, schema, tablet.getProto(), schema); fileScanner.init(); Tuple keyTuple; long offset; @@ -221,7 +224,8 @@ public class TestSingleCSVFileBSTIndex { BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyValueInCSV.idx"), keySchema, comp); reader.open(); - fileScanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + fileScanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()) + .getSeekableScanner(meta, schema, tablet.getProto(), schema); fileScanner.init(); Tuple result; for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
