TAJO-269: Protocol buffer De/Serialization for LogicalNode. Closes #322
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/32be38d4 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/32be38d4 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/32be38d4 Branch: refs/heads/master Commit: 32be38d41affc498b01286938f3fea89a8def1a9 Parents: 6fde9e5 Author: Hyunsik Choi <[email protected]> Authored: Tue Dec 30 21:52:53 2014 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Tue Dec 30 21:53:29 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../java/org/apache/tajo/catalog/Schema.java | 33 +- .../java/org/apache/tajo/catalog/TableDesc.java | 2 +- .../java/org/apache/tajo/conf/TajoConf.java | 20 + .../java/org/apache/tajo/util/ProtoUtil.java | 19 + .../org/apache/tajo/util/ReflectionUtil.java | 61 +- .../main/java/org/apache/tajo/util/TUtil.java | 41 ++ .../engine/codegen/ExecutorPreCompiler.java | 8 +- .../engine/planner/PhysicalPlannerImpl.java | 17 +- .../tajo/engine/planner/enforce/Enforcer.java | 12 +- .../engine/planner/global/GlobalPlanner.java | 36 +- .../global/builder/DistinctGroupbyBuilder.java | 32 +- .../BaseGlobalPlanRewriteRuleProvider.java | 39 + .../rewriter/GlobalPlanRewriteEngine.java | 84 +++ .../global/rewriter/GlobalPlanRewriteRule.java | 49 ++ .../rewriter/GlobalPlanRewriteRuleProvider.java | 33 + .../rewriter/GlobalPlanTestRuleProvider.java | 44 ++ .../rules/GlobalPlanEqualityTester.java | 63 ++ .../DistinctGroupbyFirstAggregationExec.java | 2 +- .../DistinctGroupbyHashAggregationExec.java | 4 +- .../DistinctGroupbySecondAggregationExec.java | 2 +- .../DistinctGroupbySortAggregationExec.java | 4 +- .../DistinctGroupbyThirdAggregationExec.java | 2 +- .../apache/tajo/engine/query/TaskRequest.java | 3 +- .../tajo/engine/query/TaskRequestImpl.java | 28 +- .../utils/test/ErrorInjectionRewriter.java | 10 +- .../tajo/master/DefaultTaskScheduler.java | 12 +- .../org/apache/tajo/master/GlobalEngine.java | 1 + .../apache/tajo/master/exec/QueryExecutor.java | 6 +- .../master/querymaster/QueryMasterTask.java | 9 +- .../main/java/org/apache/tajo/worker/Task.java | 4 +- .../src/main/proto/TajoWorkerProtocol.proto | 14 +- .../org/apache/tajo/TajoTestingCluster.java | 15 +- .../apache/tajo/engine/eval/ExprTestBase.java | 14 +- .../tajo/engine/query/TestGroupByQuery.java | 59 +- .../tajo/engine/query/TestSelectQuery.java | 27 +- .../tajo/engine/query/TestTruncateTable.java | 8 +- .../tajo/engine/query/TestWindowQuery.java | 6 +- .../apache/tajo/master/TestGlobalPlanner.java | 6 +- .../org/apache/tajo/plan/LogicalOptimizer.java | 52 +- .../tajo/plan/LogicalPlanPreprocessor.java | 4 +- .../org/apache/tajo/plan/LogicalPlanner.java | 13 +- .../main/java/org/apache/tajo/plan/Target.java | 13 +- .../plan/expr/AggregationFunctionCallEval.java | 38 +- .../org/apache/tajo/plan/expr/EvalNode.java | 10 +- .../tajo/plan/expr/WindowFunctionEval.java | 12 + .../tajo/plan/logical/AlterTableNode.java | 10 + .../tajo/plan/logical/AlterTablespaceNode.java | 13 +- .../apache/tajo/plan/logical/BinaryNode.java | 16 + .../tajo/plan/logical/CreateDatabaseNode.java | 10 + .../tajo/plan/logical/CreateTableNode.java | 11 +- .../tajo/plan/logical/DistinctGroupbyNode.java | 39 +- .../tajo/plan/logical/DropDatabaseNode.java | 17 +- .../apache/tajo/plan/logical/DropTableNode.java | 10 + .../apache/tajo/plan/logical/EvalExprNode.java | 12 +- .../apache/tajo/plan/logical/GroupbyNode.java | 47 +- .../apache/tajo/plan/logical/InsertNode.java | 15 +- .../apache/tajo/plan/logical/LogicalNode.java | 16 +- .../org/apache/tajo/plan/logical/NodeType.java | 6 +- .../tajo/plan/logical/ProjectionNode.java | 12 +- .../apache/tajo/plan/logical/RelationNode.java | 2 +- .../org/apache/tajo/plan/logical/ScanNode.java | 13 +- .../tajo/plan/logical/SetSessionNode.java | 22 +- .../tajo/plan/logical/StoreTableNode.java | 11 +- .../tajo/plan/logical/TableSubQueryNode.java | 12 +- .../tajo/plan/logical/TruncateTableNode.java | 10 + .../org/apache/tajo/plan/logical/UnaryNode.java | 14 + .../apache/tajo/plan/logical/WindowSpec.java | 57 +- .../tajo/plan/nameresolver/NameResolver.java | 6 +- .../plan/nameresolver/ResolverByLegacy.java | 2 +- .../rewrite/BaseLogicalPlanRewriteEngine.java | 89 +++ .../BaseLogicalPlanRewriteRuleProvider.java | 59 ++ .../plan/rewrite/BasicQueryRewriteEngine.java | 72 -- .../plan/rewrite/LogicalPlanRewriteEngine.java | 33 + .../plan/rewrite/LogicalPlanRewriteRule.java | 57 ++ .../rewrite/LogicalPlanRewriteRuleProvider.java | 44 ++ .../rewrite/LogicalPlanTestRuleProvider.java | 44 ++ .../tajo/plan/rewrite/QueryRewriteEngine.java | 32 - .../apache/tajo/plan/rewrite/RewriteRule.java | 56 -- .../plan/rewrite/rules/FilterPushDownRule.java | 9 +- .../rules/LogicalPlanEqualityTester.java | 55 ++ .../rewrite/rules/PartitionedTableRewriter.java | 44 +- .../rewrite/rules/ProjectionPushDownRule.java | 11 +- .../tajo/plan/serder/EvalNodeDeserializer.java | 301 ++++++++ .../tajo/plan/serder/EvalNodeSerializer.java | 397 ++++++++++ .../plan/serder/EvalTreeProtoDeserializer.java | 218 ------ .../plan/serder/EvalTreeProtoSerializer.java | 310 -------- .../plan/serder/LogicalNodeDeserializer.java | 678 +++++++++++++++++ .../tajo/plan/serder/LogicalNodeSerializer.java | 724 +++++++++++++++++++ .../apache/tajo/plan/serder/package-info.java | 23 + .../org/apache/tajo/plan/util/PlannerUtil.java | 16 +- .../plan/visitor/BasicLogicalPlanVisitor.java | 39 +- .../plan/visitor/ExplainLogicalPlanVisitor.java | 5 +- .../tajo/plan/visitor/LogicalPlanVisitor.java | 7 +- tajo-plan/src/main/proto/Plan.proto | 363 ++++++++-- .../org/apache/tajo/storage/StorageManager.java | 4 +- .../storage/hbase/AddSortForInsertRewriter.java | 10 +- .../tajo/storage/hbase/HBaseStorageManager.java | 6 +- 98 files changed, 3960 insertions(+), 1102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 013bb25..25d6e55 100644 --- a/CHANGES +++ b/CHANGES @@ -27,6 +27,8 @@ Release 0.9.1 - unreleased IMPROVEMENT + TAJO-269: Protocol buffer De/Serialization for LogicalNode. (hyunsik) + TAJO-1266: Too many logs when writing a parquet relation. (DaeMyung Kang via jihoon) http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java index 672b8e3..71c1b01 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java @@ -199,24 +199,21 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject { } public int getColumnId(String name) { - String [] parts = name.split("\\."); - if (parts.length == 2 || parts.length == 3) { - if (fieldsByQualifiedName.containsKey(name)) { - return fieldsByQualifiedName.get(name); - } else { - return -1; - } - } else { - List<Integer> list = fieldsByName.get(name); - if (list == null) { - return -1; - } else if (list.size() == 1) { - return fieldsByName.get(name).get(0); - } else if (list.size() == 0) { - return -1; - } else { // if list.size > 2 - throw throwAmbiguousFieldException(list); - } + // if the same column exists, immediately return that column. + if (fieldsByQualifiedName.containsKey(name)) { + return fieldsByQualifiedName.get(name); + } + + // The following is some workaround code. + List<Integer> list = fieldsByName.get(name); + if (list == null) { + return -1; + } else if (list.size() == 1) { + return fieldsByName.get(name).get(0); + } else if (list.size() == 0) { + return -1; + } else { // if list.size > 2 + throw throwAmbiguousFieldException(list); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java index ce167e1..ec679f9 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java @@ -168,7 +168,7 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone boolean eq = tableName.equals(other.tableName); eq = eq && schema.equals(other.schema); eq = eq && meta.equals(other.meta); - eq = eq && uri.equals(other.uri); + eq = eq && TUtil.checkEquals(uri, other.uri); eq = eq && TUtil.checkEquals(partitionMethodDesc, other.partitionMethodDesc); eq = eq && TUtil.checkEquals(external, other.external); return eq && TUtil.checkEquals(stats, other.stats); http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index d0c6460..ab11ddd 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -249,6 +249,12 @@ public class TajoConf extends Configuration { TASK_DEFAULT_SIZE("tajo.task.size-mb", 128), // Query and Optimization ------------------------------------------------- + // This class provides a ordered list of logical plan rewrite rule classes. + LOGICAL_PLAN_REWRITE_RULE_PROVIDER_CLASS("tajo.plan.logical.rewriter.provider", + "org.apache.tajo.plan.rewrite.BaseLogicalPlanRewriteRuleProvider"), + // This class provides a ordered list of global plan rewrite rule classes. + GLOBAL_PLAN_REWRITE_RULE_PROVIDER_CLASS("tajo.plan.global.rewriter.provider", + "org.apache.tajo.engine.planner.global.rewriter.BaseGlobalPlanRewriteRuleProvider"), EXECUTOR_EXTERNAL_SORT_THREAD_NUM("tajo.executor.external-sort.thread-num", 1), EXECUTOR_EXTERNAL_SORT_FANOUT("tajo.executor.external-sort.fanout-num", 8), @@ -561,6 +567,20 @@ public class TajoConf extends Configuration { setBoolVar(this, var, val); } + public void setClassVar(ConfVars var, Class<?> clazz) { + setVar(var, clazz.getCanonicalName()); + } + + public Class<?> getClassVar(ConfVars var) { + String valueString = getVar(var); + + try { + return getClassByName(valueString); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + public static String getVar(Configuration conf, ConfVars var) { return conf.get(var.varname, var.defaultVal); } http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-common/src/main/java/org/apache/tajo/util/ProtoUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/ProtoUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/ProtoUtil.java index dbc987d..f9d759b 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/ProtoUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/ProtoUtil.java @@ -18,7 +18,11 @@ package org.apache.tajo.util; +import com.google.common.collect.Lists; +import org.apache.tajo.common.ProtoObject; + import java.util.Collection; +import java.util.List; import java.util.Map; import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.*; @@ -52,4 +56,19 @@ public class ProtoUtil { public static KeyValueSetProto convertFromMap(Map<String, String> map) { return new KeyValueSet(map).getProto(); } + + /** + * It converts an array of ProtoObjects into Iteratable one. + * + * @param protoObjects + * @param <T> + * @return + */ + public static <T> Iterable<T> toProtoObjects(ProtoObject[] protoObjects) { + List<T> converted = Lists.newArrayList(); + for (int i = 0; i < protoObjects.length; i++) { + converted.add((T) protoObjects[i].getProto()); + } + return converted; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java index eccc61f..e2def69 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java @@ -18,22 +18,71 @@ package org.apache.tajo.util; +import org.apache.tajo.conf.TajoConf; + import java.lang.reflect.Constructor; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class ReflectionUtil { - private static final Class<?>[] EMPTY_ARRAY = new Class[]{}; + private static final Class<?>[] EMPTY_PARAM = new Class[]{}; + private static final Object [] EMPTY_OBJECT = new Object[] {}; + private static final Class<?>[] CONF_PARAM = new Class[]{TajoConf.class}; /** - * Cache of constructors for each class. Pins the classes so they + * Caches of constructors for each class. Pins the classes so they * can't be garbage collected until ReflectionUtils can be collected. + * + * EMPTY_CONSTRUCTOR_CACHE keeps classes which don't have any parameterized constructor, and + * CONF_CONSTRUCTOR_CACHE keeps classes which have one constructor to take TajoConf. */ - private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = + private static final Map<Class<?>, Constructor<?>> EMPTY_CONSTRUCTOR_CACHE = + new ConcurrentHashMap<Class<?>, Constructor<?>>(); + private static final Map<Class<?>, Constructor<?>> CONF_CONSTRUCTOR_CACHE = new ConcurrentHashMap<Class<?>, Constructor<?>>(); - public static Object newInstance(Class<?> clazz) - throws InstantiationException, IllegalAccessException { - return clazz.newInstance(); + /** + * Initialize an instance by a given class + * + * @param clazz Class to be initialized + * @return initialized object + */ + public static <T> T newInstance(Class<? extends T> clazz) { + try { + Constructor<?> constructor; + if (EMPTY_CONSTRUCTOR_CACHE.containsKey(clazz)) { + constructor = EMPTY_CONSTRUCTOR_CACHE.get(clazz); + } else { + constructor = clazz.getConstructor(EMPTY_PARAM); + EMPTY_CONSTRUCTOR_CACHE.put(clazz, constructor); + } + + return (T) constructor.newInstance(EMPTY_OBJECT); + } catch (Throwable t) { + throw new RuntimeException(t); + } } + + /** + * Initialize an instance by a given class with TajoConf parameter + * + * @param clazz Class to be initialized + * @param conf TajoConf instance + * @return initialized object + */ + public static <T> T newInstance(Class<? extends T> clazz, TajoConf conf) { + try { + Constructor<?> constructor; + if (CONF_CONSTRUCTOR_CACHE.containsKey(clazz)) { + constructor = CONF_CONSTRUCTOR_CACHE.get(clazz); + } else { + constructor = clazz.getConstructor(CONF_PARAM); + CONF_CONSTRUCTOR_CACHE.put(clazz, constructor); + } + + return (T) constructor.newInstance(new Object[]{conf}); + } catch (Throwable t) { + throw new RuntimeException(t); + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java index 0ceb2b2..a1de860 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java @@ -42,6 +42,37 @@ public class TUtil { } /** + * check two collections as equals. It also check the equivalence of null. + * It will return true even if they are all null. + * + * @param s1 the first collection to be compared. + * @param s2 the second collection to be compared + * @return true if they are equal or all null + */ + public static boolean checkEquals(Collection<?> s1, Collection<?> s2) { + if (s1 == null ^ s2 == null) { + return false; + } else if (s1 == null && s2 == null) { + return true; + } else { + if (s1.size() == 0 && s2.size() == 0) { + return true; + } else if (s1.size() == s2.size()) { + Iterator<?> it1 = s1.iterator(); + Iterator<?> it2 = s2.iterator(); + Object o1; + Object o2; + for (o1 = it1.next(), o2 = it2.next(); it1.hasNext() && it2.hasNext(); o1 = it1.next(), o2 = it2.next()) { + if (!o1.equals(o2)) { + return false; + } + } + } + return true; + } + } + + /** * check two arrays as equals. It also check the equivalence of null. * It will return true even if they are all null. * @@ -59,6 +90,16 @@ public class TUtil { } } + public static boolean checkEquals(int [] s1, int [] s2) { + if (s1 == null ^ s2 == null) { + return false; + } else if (s1 == null && s2 == null) { + return true; + } else { + return Arrays.equals(s1, s2); + } + } + public static <T> T[] concat(T[] first, T[] second) { T[] result = Arrays.copyOf(first, first.length + second.length); System.arraycopy(second, 0, result, first.length, second.length); http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java index d588e7f..79513dc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java @@ -149,9 +149,9 @@ public class ExecutorPreCompiler extends BasicLogicalPlanVisitor<ExecutorPreComp return node; } - public LogicalNode visitDistinct(CompilationContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, - DistinctGroupbyNode node, Stack<LogicalNode> stack) throws PlanningException { - super.visitDistinct(context, plan, block, node, stack); + public LogicalNode visitDistinctGroupby(CompilationContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + DistinctGroupbyNode node, Stack<LogicalNode> stack) throws PlanningException { + super.visitDistinctGroupby(context, plan, block, node, stack); compileProjectableNode(context, node.getInSchema(), node); return node; @@ -190,7 +190,7 @@ public class ExecutorPreCompiler extends BasicLogicalPlanVisitor<ExecutorPreComp if (node.hasTargets()) { for (Target target : node.getTargets()) { - compileIfAbsent(context, node.getTableSchema(), target.getEvalTree()); + compileIfAbsent(context, node.getLogicalSchema(), target.getEvalTree()); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java index 2a34637..d043a27 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java @@ -34,6 +34,7 @@ import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.plan.serder.LogicalNodeDeserializer; import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.physical.*; @@ -877,7 +878,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { TajoWorkerProtocol.SortedInputEnforce sortEnforcer = property.get(0).getSortedInput(); boolean condition = scanNode.getTableName().equals(sortEnforcer.getTableName()); - SortSpec [] sortSpecs = PlannerUtil.convertSortSpecs(sortEnforcer.getSortSpecsList()); + SortSpec [] sortSpecs = LogicalNodeDeserializer.convertSortSpecs(sortEnforcer.getSortSpecsList()); return condition && TUtil.checkEquals(sortNode.getSortKeys(), sortSpecs); } else { return false; @@ -1089,7 +1090,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { if (phase == 3) { sortSpecs.add(new SortSpec(distinctNode.getTargets()[0].getNamedColumn())); } - for (GroupbyNode eachGroupbyNode: distinctNode.getGroupByNodes()) { + for (GroupbyNode eachGroupbyNode: distinctNode.getSubPlans()) { for (Column eachColumn: eachGroupbyNode.getGroupingColumns()) { sortSpecs.add(new SortSpec(eachColumn)); } @@ -1110,7 +1111,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { private PhysicalExec createSortAggregationDistinctGroupbyExec(TaskAttemptContext ctx, DistinctGroupbyNode distinctGroupbyNode, PhysicalExec subOp, DistinctGroupbyEnforcer enforcer) throws IOException { - List<GroupbyNode> groupbyNodes = distinctGroupbyNode.getGroupByNodes(); + List<GroupbyNode> groupbyNodes = distinctGroupbyNode.getSubPlans(); SortAggregateExec[] sortAggregateExec = new SortAggregateExec[groupbyNodes.size()]; @@ -1216,15 +1217,15 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { List<EnforceProperty> properties = enforcer.getEnforceProperties(type); EnforceProperty found = null; for (EnforceProperty property : properties) { - if (type == EnforceType.JOIN && property.getJoin().getPid() == node.getPID()) { + if (type == EnforceType.JOIN && property.getJoin().getNodeId() == node.getPID()) { found = property; - } else if (type == EnforceType.GROUP_BY && property.getGroupby().getPid() == node.getPID()) { + } else if (type == EnforceType.GROUP_BY && property.getGroupby().getNodeId() == node.getPID()) { found = property; - } else if (type == EnforceType.DISTINCT_GROUP_BY && property.getDistinct().getPid() == node.getPID()) { + } else if (type == EnforceType.DISTINCT_GROUP_BY && property.getDistinct().getNodeId() == node.getPID()) { found = property; - } else if (type == EnforceType.SORT && property.getSort().getPid() == node.getPID()) { + } else if (type == EnforceType.SORT && property.getSort().getNodeId() == node.getPID()) { found = property; - } else if (type == EnforceType.COLUMN_PARTITION && property.getColumnPartition().getPid() == node.getPID()) { + } else if (type == EnforceType.COLUMN_PARTITION && property.getColumnPartition().getNodeId() == node.getPID()) { found = property; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java index e2d7744..8128390 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java @@ -98,7 +98,7 @@ public class Enforcer implements ProtoObject<EnforcerProto> { public void enforceJoinAlgorithm(int pid, JoinEnforce.JoinAlgorithm algorithm) { EnforceProperty.Builder builder = newProperty(); JoinEnforce.Builder enforce = JoinEnforce.newBuilder(); - enforce.setPid(pid); + enforce.setNodeId(pid); enforce.setAlgorithm(algorithm); builder.setType(EnforceType.JOIN); @@ -109,7 +109,7 @@ public class Enforcer implements ProtoObject<EnforcerProto> { public void enforceSortAggregation(int pid, @Nullable SortSpec[] sortSpecs) { EnforceProperty.Builder builder = newProperty(); GroupbyEnforce.Builder enforce = GroupbyEnforce.newBuilder(); - enforce.setPid(pid); + enforce.setNodeId(pid); enforce.setAlgorithm(GroupbyAlgorithm.SORT_AGGREGATION); if (sortSpecs != null) { for (SortSpec sortSpec : sortSpecs) { @@ -125,7 +125,7 @@ public class Enforcer implements ProtoObject<EnforcerProto> { public void enforceHashAggregation(int pid) { EnforceProperty.Builder builder = newProperty(); GroupbyEnforce.Builder enforce = GroupbyEnforce.newBuilder(); - enforce.setPid(pid); + enforce.setNodeId(pid); enforce.setAlgorithm(GroupbyAlgorithm.HASH_AGGREGATION); builder.setType(EnforceType.GROUP_BY); @@ -146,7 +146,7 @@ public class Enforcer implements ProtoObject<EnforcerProto> { List<SortSpecArray> sortSpecArrays) { EnforceProperty.Builder builder = newProperty(); DistinctGroupbyEnforcer.Builder enforce = DistinctGroupbyEnforcer.newBuilder(); - enforce.setPid(pid); + enforce.setNodeId(pid); enforce.setIsMultipleAggregation(isMultipleAggregation); enforce.setAlgorithm(algorithm); if (sortSpecArrays != null) { @@ -164,7 +164,7 @@ public class Enforcer implements ProtoObject<EnforcerProto> { public void enforceSortAlgorithm(int pid, SortEnforce.SortAlgorithm algorithm) { EnforceProperty.Builder builder = newProperty(); SortEnforce.Builder enforce = SortEnforce.newBuilder(); - enforce.setPid(pid); + enforce.setNodeId(pid); enforce.setAlgorithm(algorithm); builder.setType(EnforceType.SORT); @@ -203,7 +203,7 @@ public class Enforcer implements ProtoObject<EnforcerProto> { public void enforceColumnPartitionAlgorithm(int pid, ColumnPartitionAlgorithm algorithm) { EnforceProperty.Builder builder = newProperty(); ColumnPartitionEnforcer.Builder enforce = ColumnPartitionEnforcer.newBuilder(); - enforce.setPid(pid); + enforce.setNodeId(pid); enforce.setAlgorithm(algorithm); builder.setType(EnforceType.COLUMN_PARTITION); http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java index c75b348..6c3e3b8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java @@ -18,6 +18,7 @@ package org.apache.tajo.engine.planner.global; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -36,17 +37,20 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.BroadcastJoinMarkCandidateVisitor; import org.apache.tajo.engine.planner.BroadcastJoinPlanVisitor; import org.apache.tajo.engine.planner.global.builder.DistinctGroupbyBuilder; +import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanRewriteEngine; +import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanRewriteRuleProvider; import org.apache.tajo.exception.InternalException; import org.apache.tajo.plan.LogicalPlan; -import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.PlanningException; import org.apache.tajo.plan.Target; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.function.AggFunction; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.rewrite.rules.ProjectionPushDownRule; +import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor; import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.ReflectionUtil; import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.TajoWorker; @@ -54,6 +58,7 @@ import java.io.IOException; import java.util.*; import static org.apache.tajo.conf.TajoConf.ConfVars; +import static org.apache.tajo.conf.TajoConf.ConfVars.GLOBAL_PLAN_REWRITE_RULE_PROVIDER_CLASS; import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.*; /** @@ -64,34 +69,29 @@ public class GlobalPlanner { private final TajoConf conf; private final CatalogProtos.StoreType storeType; - private CatalogService catalog; - private TajoWorker.WorkerContext workerContext; + private final CatalogService catalog; + private final GlobalPlanRewriteEngine rewriteEngine; + @VisibleForTesting public GlobalPlanner(final TajoConf conf, final CatalogService catalog) throws IOException { this.conf = conf; this.catalog = catalog; this.storeType = CatalogProtos.StoreType.valueOf(conf.getVar(ConfVars.SHUFFLE_FILE_FORMAT).toUpperCase()); Preconditions.checkArgument(storeType != null); + + Class<? extends GlobalPlanRewriteRuleProvider> clazz = + (Class<? extends GlobalPlanRewriteRuleProvider>) conf.getClassVar(GLOBAL_PLAN_REWRITE_RULE_PROVIDER_CLASS); + GlobalPlanRewriteRuleProvider provider = ReflectionUtil.newInstance(clazz, conf); + rewriteEngine = new GlobalPlanRewriteEngine(); + rewriteEngine.addRewriteRule(provider.getRules()); } public GlobalPlanner(final TajoConf conf, final TajoWorker.WorkerContext workerContext) throws IOException { - this.conf = conf; - this.workerContext = workerContext; - this.storeType = CatalogProtos.StoreType.valueOf(conf.getVar(ConfVars.SHUFFLE_FILE_FORMAT).toUpperCase()); - Preconditions.checkArgument(storeType != null); + this(conf, workerContext.getCatalog()); } - /** - * TODO: this is hack. it must be refactored at TAJO-602. - */ public CatalogService getCatalog() { - if (workerContext.getCatalog() != null) { - return workerContext.getCatalog(); - } else if (catalog != null) { - return catalog; - } else { - throw new IllegalStateException("No Catalog Instance"); - } + return catalog; } public CatalogProtos.StoreType getStoreType() { @@ -163,6 +163,8 @@ public class GlobalPlanner { masterPlan.setTerminal(terminalBlock); LOG.info("\n" + masterPlan.toString()); + + masterPlan = rewriteEngine.rewrite(masterPlan); } private static void setFinalOutputChannel(DataChannel outputChannel, Schema outputSchema) { http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java index 671bb19..5c6e80e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java @@ -99,7 +99,7 @@ public class DistinctGroupbyBuilder { DistinctGroupbyNode thirdStageDistinctNode = PlannerUtil.clone(plan, baseDistinctNode); // Set second, third non-distinct aggregation's eval node to field eval - GroupbyNode lastGroupbyNode = secondStageDistinctNode.getGroupByNodes().get(secondStageDistinctNode.getGroupByNodes().size() - 1); + GroupbyNode lastGroupbyNode = secondStageDistinctNode.getSubPlans().get(secondStageDistinctNode.getSubPlans().size() - 1); if (!lastGroupbyNode.isDistinct()) { int index = 0; for (AggregationFunctionCallEval aggrFunction: lastGroupbyNode.getAggFunctions()) { @@ -108,7 +108,7 @@ public class DistinctGroupbyBuilder { index++; } } - lastGroupbyNode = thirdStageDistinctNode.getGroupByNodes().get(thirdStageDistinctNode.getGroupByNodes().size() - 1); + lastGroupbyNode = thirdStageDistinctNode.getSubPlans().get(thirdStageDistinctNode.getSubPlans().size() - 1); if (!lastGroupbyNode.isDistinct()) { int index = 0; for (AggregationFunctionCallEval aggrFunction: lastGroupbyNode.getAggFunctions()) { @@ -300,11 +300,11 @@ public class DistinctGroupbyBuilder { DistinctGroupbyNode baseDistinctNode = new DistinctGroupbyNode(context.getPlan().getLogicalPlan().newPID()); baseDistinctNode.setTargets(baseGroupByTargets.toArray(new Target[]{})); - baseDistinctNode.setGroupColumns(groupbyNode.getGroupingColumns()); + baseDistinctNode.setGroupingColumns(groupbyNode.getGroupingColumns()); baseDistinctNode.setInSchema(groupbyNode.getInSchema()); baseDistinctNode.setChild(groupbyNode.getChild()); - baseDistinctNode.setGroupbyNodes(childGroupbyNodes); + baseDistinctNode.setSubPlans(childGroupbyNodes); return baseDistinctNode; } @@ -468,11 +468,11 @@ public class DistinctGroupbyBuilder { DistinctGroupbyNode baseDistinctNode = new DistinctGroupbyNode(context.getPlan().getLogicalPlan().newPID()); baseDistinctNode.setTargets(groupbyNode.getTargets()); - baseDistinctNode.setGroupColumns(groupbyNode.getGroupingColumns()); + baseDistinctNode.setGroupingColumns(groupbyNode.getGroupingColumns()); baseDistinctNode.setInSchema(groupbyNode.getInSchema()); baseDistinctNode.setChild(groupbyNode.getChild()); - baseDistinctNode.setGroupbyNodes(childGroupbyNodes); + baseDistinctNode.setSubPlans(childGroupbyNodes); return baseDistinctNode; } @@ -529,12 +529,12 @@ public class DistinctGroupbyBuilder { // - Change SecondStage's aggregation expr and target column name. For example: // exprs: (sum(default.lineitem.l_quantity (FLOAT8))) ==> exprs: (sum(?sum_3 (FLOAT8))) int grpIdx = 0; - for (GroupbyNode firstStageGroupbyNode: firstStageDistinctNode.getGroupByNodes()) { - GroupbyNode secondStageGroupbyNode = secondStageDistinctNode.getGroupByNodes().get(grpIdx); + for (GroupbyNode firstStageGroupbyNode: firstStageDistinctNode.getSubPlans()) { + GroupbyNode secondStageGroupbyNode = secondStageDistinctNode.getSubPlans().get(grpIdx); if (firstStageGroupbyNode.isDistinct()) { // FirstStage: Remove aggregation, Set target with only grouping columns - firstStageGroupbyNode.setAggFunctions(null); + firstStageGroupbyNode.setAggFunctions(PlannerUtil.EMPTY_AGG_FUNCS); List<Target> firstGroupbyTargets = new ArrayList<Target>(); for (Column column : firstStageGroupbyNode.getGroupingColumns()) { @@ -614,7 +614,7 @@ public class DistinctGroupbyBuilder { // In the case of distinct query without group by clause // other aggregation function is added to last distinct group by node. - List<GroupbyNode> secondStageGroupbyNodes = secondStageDistinctNode.getGroupByNodes(); + List<GroupbyNode> secondStageGroupbyNodes = secondStageDistinctNode.getSubPlans(); GroupbyNode lastSecondStageGroupbyNode = secondStageGroupbyNodes.get(secondStageGroupbyNodes.size() - 1); if (!lastSecondStageGroupbyNode.isDistinct() && lastSecondStageGroupbyNode.isEmptyGrouping()) { GroupbyNode otherGroupbyNode = lastSecondStageGroupbyNode; @@ -644,7 +644,7 @@ public class DistinctGroupbyBuilder { List<Integer> firstStageColumnIds = new ArrayList<Integer>(); columnIdIndex = 0; List<Target> firstTargets = new ArrayList<Target>(); - for (GroupbyNode firstStageGroupbyNode: firstStageDistinctNode.getGroupByNodes()) { + for (GroupbyNode firstStageGroupbyNode: firstStageDistinctNode.getSubPlans()) { if (firstStageGroupbyNode.isDistinct()) { for (Column column : firstStageGroupbyNode.getGroupingColumns()) { Target firstTarget = new Target(new FieldEval(column)); @@ -674,7 +674,7 @@ public class DistinctGroupbyBuilder { Schema secondStageInSchema = new Schema(); //TODO merged tuple schema int index = 0; - for(GroupbyNode eachNode: secondStageDistinctNode.getGroupByNodes()) { + for(GroupbyNode eachNode: secondStageDistinctNode.getSubPlans()) { eachNode.setInSchema(firstStageDistinctNode.getOutSchema()); for (Column column: eachNode.getOutSchema().getColumns()) { if (secondStageInSchema.getColumn(column) == null) { @@ -695,13 +695,13 @@ public class DistinctGroupbyBuilder { List<SortSpecArray> sortSpecArrays = new ArrayList<SortSpecArray>(); int index = 0; - for (GroupbyNode groupbyNode: firstStageDistinctNode.getGroupByNodes()) { + for (GroupbyNode groupbyNode: firstStageDistinctNode.getSubPlans()) { List<SortSpecProto> sortSpecs = new ArrayList<SortSpecProto>(); for (Column column: groupbyNode.getGroupingColumns()) { sortSpecs.add(SortSpecProto.newBuilder().setColumn(column.getProto()).build()); } sortSpecArrays.add( SortSpecArray.newBuilder() - .setPid(secondStageDistinctNode.getGroupByNodes().get(index).getPID()) + .setNodeId(secondStageDistinctNode.getSubPlans().get(index).getPID()) .addAllSortSpecs(sortSpecs).build()); } secondStageBlock.getEnforcer().enforceDistinctAggregation(secondStageDistinctNode.getPID(), @@ -723,13 +723,13 @@ public class DistinctGroupbyBuilder { List<SortSpecArray> sortSpecArrays = new ArrayList<SortSpecArray>(); int index = 0; - for (GroupbyNode groupbyNode: firstStageDistinctNode.getGroupByNodes()) { + for (GroupbyNode groupbyNode: firstStageDistinctNode.getSubPlans()) { List<SortSpecProto> sortSpecs = new ArrayList<SortSpecProto>(); for (Column column: groupbyNode.getGroupingColumns()) { sortSpecs.add(SortSpecProto.newBuilder().setColumn(column.getProto()).build()); } sortSpecArrays.add( SortSpecArray.newBuilder() - .setPid(thirdStageDistinctNode.getGroupByNodes().get(index).getPID()) + .setNodeId(thirdStageDistinctNode.getSubPlans().get(index).getPID()) .addAllSortSpecs(sortSpecs).build()); } thirdStageBlock.getEnforcer().enforceDistinctAggregation(thirdStageDistinctNode.getPID(), http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/BaseGlobalPlanRewriteRuleProvider.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/BaseGlobalPlanRewriteRuleProvider.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/BaseGlobalPlanRewriteRuleProvider.java new file mode 100644 index 0000000..96ee2c6 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/BaseGlobalPlanRewriteRuleProvider.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.global.rewriter; + +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.util.TUtil; + +import java.util.Collection; +import java.util.List; + +@SuppressWarnings("unused") +public class BaseGlobalPlanRewriteRuleProvider extends GlobalPlanRewriteRuleProvider { + private static final List<Class<? extends GlobalPlanRewriteRule>> EMPTY_RULES = TUtil.newList(); + + public BaseGlobalPlanRewriteRuleProvider(TajoConf conf) { + super(conf); + } + + @Override + public Collection<Class<? extends GlobalPlanRewriteRule>> getRules() { + return EMPTY_RULES; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteEngine.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteEngine.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteEngine.java new file mode 100644 index 0000000..c01ed0e --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteEngine.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.global.rewriter; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.engine.planner.global.MasterPlan; +import org.apache.tajo.plan.PlanningException; +import org.apache.tajo.util.ReflectionUtil; + +import java.util.LinkedHashMap; +import java.util.Map; + +public class GlobalPlanRewriteEngine { + /** class logger */ + private static final Log LOG = LogFactory.getLog(GlobalPlanRewriteEngine.class); + + /** a map for query rewrite rules */ + private final Map<String, GlobalPlanRewriteRule> rewriteRules = new LinkedHashMap<String, GlobalPlanRewriteRule>(); + + /** + * Add a query rewrite rule to this engine. + * + * @param rules Rule classes + */ + public void addRewriteRule(Iterable<Class<? extends GlobalPlanRewriteRule>> rules) { + for (Class<? extends GlobalPlanRewriteRule> clazz : rules) { + try { + GlobalPlanRewriteRule rule = ReflectionUtil.newInstance(clazz); + addRewriteRule(rule); + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + } + + /** + * Add a query rewrite rule to this engine. + * + * @param rule The rule to be added to this engine. + */ + public void addRewriteRule(GlobalPlanRewriteRule rule) { + if (!rewriteRules.containsKey(rule.getName())) { + rewriteRules.put(rule.getName(), rule); + } + } + + /** + * Rewrite a global plan with all query rewrite rules added to this engine. + * + * @param plan The plan to be rewritten with all query rewrite rule. + * @return The rewritten plan. + */ + public MasterPlan rewrite(MasterPlan plan) throws PlanningException { + GlobalPlanRewriteRule rule; + for (Map.Entry<String, GlobalPlanRewriteRule> rewriteRule : rewriteRules.entrySet()) { + rule = rewriteRule.getValue(); + if (rule.isEligible(plan)) { + plan = rule.rewrite(plan); + if (LOG.isDebugEnabled()) { + LOG.debug("The rule \"" + rule.getName() + " \" rewrites the query."); + } + } + } + + return plan; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteRule.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteRule.java new file mode 100644 index 0000000..4a37207 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteRule.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.global.rewriter; + +import org.apache.tajo.engine.planner.global.MasterPlan; + +/** + * A rewrite rule for global plans + */ +public interface GlobalPlanRewriteRule { + + /** + * Return rule name + * @return Rule name + */ + public abstract String getName(); + + /** + * Check if this rule should be applied. + * + * @param plan Global Plan + * @return + */ + public abstract boolean isEligible(MasterPlan plan); + + /** + * Rewrite a global plan + * + * @param plan Global Plan + * @return + */ + public abstract MasterPlan rewrite(MasterPlan plan); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteRuleProvider.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteRuleProvider.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteRuleProvider.java new file mode 100644 index 0000000..638b5f3 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteRuleProvider.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.global.rewriter; + +import org.apache.tajo.conf.TajoConf; + +import java.util.Collection; + +public abstract class GlobalPlanRewriteRuleProvider { + protected final TajoConf conf; + + public GlobalPlanRewriteRuleProvider(TajoConf conf) { + this.conf = conf; + } + + public abstract Collection<Class<? extends GlobalPlanRewriteRule>> getRules(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanTestRuleProvider.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanTestRuleProvider.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanTestRuleProvider.java new file mode 100644 index 0000000..dc91577 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanTestRuleProvider.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.global.rewriter; + +import com.google.common.collect.Lists; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.planner.global.rewriter.rules.GlobalPlanEqualityTester; + +import java.util.Collection; +import java.util.List; + +/** + * It is used only for test. + */ +@SuppressWarnings("unused") +public class GlobalPlanTestRuleProvider extends BaseGlobalPlanRewriteRuleProvider { + + public GlobalPlanTestRuleProvider(TajoConf conf) { + super(conf); + } + + @Override + public Collection<Class<? extends GlobalPlanRewriteRule>> getRules() { + List<Class<? extends GlobalPlanRewriteRule>> injectedRules = Lists.newArrayList(super.getRules()); + injectedRules.add(GlobalPlanEqualityTester.class); + return injectedRules; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java new file mode 100644 index 0000000..e2fd47f --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.global.rewriter.rules; + +import org.apache.tajo.engine.planner.global.ExecutionBlock; +import org.apache.tajo.engine.planner.global.ExecutionBlockCursor; +import org.apache.tajo.engine.planner.global.MasterPlan; +import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanRewriteRule; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.serder.LogicalNodeDeserializer; +import org.apache.tajo.plan.serder.LogicalNodeSerializer; +import org.apache.tajo.plan.serder.PlanProto; + +/** + * It verifies the equality between the input and output of LogicalNodeTree(De)Serializer in global planning. + */ +public class GlobalPlanEqualityTester implements GlobalPlanRewriteRule { + + @Override + public String getName() { + return "GlobalPlanEqualityTester"; + } + + @Override + public boolean isEligible(MasterPlan plan) { + return true; + } + + @Override + public MasterPlan rewrite(MasterPlan plan) { + try { + ExecutionBlockCursor cursor = new ExecutionBlockCursor(plan); + while (cursor.hasNext()) { + ExecutionBlock eb = cursor.nextBlock(); + LogicalNode node = eb.getPlan(); + if (node != null) { + PlanProto.LogicalNodeTree tree = LogicalNodeSerializer.serialize(node); + LogicalNode deserialize = LogicalNodeDeserializer.deserialize(plan.getContext(), tree); + assert node.deepEquals(deserialize); + } + } + return plan; + } catch (Throwable t) { + throw new RuntimeException(t); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java index bd24fa3..aca4879 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java @@ -130,7 +130,7 @@ public class DistinctGroupbyFirstAggregationExec extends PhysicalExec { } resultTupleLength = groupingKeyIndexes.length + 1; //1 is Sequence Datum which indicates sequence of DistinctNode. - List<GroupbyNode> groupbyNodes = plan.getGroupByNodes(); + List<GroupbyNode> groupbyNodes = plan.getSubPlans(); List<DistinctHashAggregator> distinctAggrList = new ArrayList<DistinctHashAggregator>(); int distinctSeq = 0; http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java index eac5c70..37d61a9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java @@ -76,7 +76,7 @@ public class DistinctGroupbyHashAggregationExec extends PhysicalExec { distinctGroupingKeyIds[idx++] = intVal.intValue(); } - List<GroupbyNode> groupbyNodes = plan.getGroupByNodes(); + List<GroupbyNode> groupbyNodes = plan.getSubPlans(); groupbyNodeNum = groupbyNodes.size(); this.hashAggregators = new HashAggregator[groupbyNodeNum]; @@ -88,7 +88,7 @@ public class DistinctGroupbyHashAggregationExec extends PhysicalExec { outputColumnNum = plan.getOutSchema().size(); int allGroupbyOutColNum = 0; - for (GroupbyNode eachGroupby: plan.getGroupByNodes()) { + for (GroupbyNode eachGroupby: plan.getSubPlans()) { allGroupbyOutColNum += eachGroupby.getOutSchema().size(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java index 383ccd3..cce9a24 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java @@ -100,7 +100,7 @@ public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec { numGroupingColumns = plan.getGroupingColumns().length; - List<GroupbyNode> groupbyNodes = plan.getGroupByNodes(); + List<GroupbyNode> groupbyNodes = plan.getSubPlans(); // Finding distinct group by column index. Set<Integer> groupingKeyIndexSet = new HashSet<Integer>(); http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java index 06b241c..6641633 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java @@ -47,13 +47,13 @@ public class DistinctGroupbySortAggregationExec extends PhysicalExec { super(context, plan.getInSchema(), plan.getOutSchema()); this.plan = plan; this.aggregateExecs = aggregateExecs; - this.groupbyNodeNum = plan.getGroupByNodes().size(); + this.groupbyNodeNum = plan.getSubPlans().size(); currentTuples = new Tuple[groupbyNodeNum]; outColumnNum = outSchema.size(); int allGroupbyOutColNum = 0; - for (GroupbyNode eachGroupby: plan.getGroupByNodes()) { + for (GroupbyNode eachGroupby: plan.getSubPlans()) { allGroupbyOutColNum += eachGroupby.getOutSchema().size(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java index ff6fc4a..a76b91d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java @@ -66,7 +66,7 @@ public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec { numGroupingColumns = plan.getGroupingColumns().length; resultTupleLength = numGroupingColumns; - List<GroupbyNode> groupbyNodes = plan.getGroupByNodes(); + List<GroupbyNode> groupbyNodes = plan.getSubPlans(); List<DistinctFinalAggregator> aggregatorList = new ArrayList<DistinctFinalAggregator>(); int inTupleIndex = 1 + numGroupingColumns; http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java index a3e586a..2fa272a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java @@ -28,6 +28,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.worker.FetchImpl; import java.util.List; @@ -38,7 +39,7 @@ public interface TaskRequest extends ProtoObject<TajoWorkerProtocol.TaskRequestP public List<CatalogProtos.FragmentProto> getFragments(); public String getOutputTableId(); public boolean isClusteredOutput(); - public String getSerializedData(); + public PlanProto.LogicalNodeTree getPlan(); public boolean isInterQuery(); public void setInterQuery(); public void addFetch(String name, FetchImpl fetch); http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java index cef5488..b4727dc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java @@ -25,6 +25,7 @@ import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol.TaskRequestProto; import org.apache.tajo.ipc.TajoWorkerProtocol.TaskRequestProtoOrBuilder; +import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.worker.FetchImpl; import java.util.ArrayList; @@ -39,7 +40,7 @@ public class TaskRequestImpl implements TaskRequest { private String outputTable; private boolean isUpdated; private boolean clusteredOutput; - private String serializedData; // logical node + private PlanProto.LogicalNodeTree plan; // logical node private Boolean interQuery; private List<FetchImpl> fetches; private Boolean shouldDie; @@ -59,9 +60,10 @@ public class TaskRequestImpl implements TaskRequest { public TaskRequestImpl(TaskAttemptId id, List<FragmentProto> fragments, String outputTable, boolean clusteredOutput, - String serializedData, QueryContext queryContext, DataChannel channel, Enforcer enforcer) { + PlanProto.LogicalNodeTree plan, QueryContext queryContext, DataChannel channel, + Enforcer enforcer) { this(); - this.set(id, fragments, outputTable, clusteredOutput, serializedData, queryContext, channel, enforcer); + this.set(id, fragments, outputTable, clusteredOutput, plan, queryContext, channel, enforcer); } public TaskRequestImpl(TaskRequestProto proto) { @@ -73,12 +75,12 @@ public class TaskRequestImpl implements TaskRequest { public void set(TaskAttemptId id, List<FragmentProto> fragments, String outputTable, boolean clusteredOutput, - String serializedData, QueryContext queryContext, DataChannel dataChannel, Enforcer enforcer) { + PlanProto.LogicalNodeTree plan, QueryContext queryContext, DataChannel dataChannel, Enforcer enforcer) { this.id = id; this.fragments = fragments; this.outputTable = outputTable; this.clusteredOutput = clusteredOutput; - this.serializedData = serializedData; + this.plan = plan; this.isUpdated = true; this.queryContext = queryContext; this.queryContext = queryContext; @@ -150,16 +152,16 @@ public class TaskRequestImpl implements TaskRequest { } @Override - public String getSerializedData() { + public PlanProto.LogicalNodeTree getPlan() { TaskRequestProtoOrBuilder p = viaProto ? proto : builder; - if (this.serializedData != null) { - return this.serializedData; + if (this.plan != null) { + return this.plan; } - if (!p.hasSerializedData()) { + if (!p.hasPlan()) { return null; } - this.serializedData = p.getSerializedData(); - return this.serializedData; + this.plan = p.getPlan(); + return this.plan; } public boolean isInterQuery() { @@ -292,8 +294,8 @@ public class TaskRequestImpl implements TaskRequest { if (this.isUpdated) { builder.setClusteredOutput(this.clusteredOutput); } - if (this.serializedData != null) { - builder.setSerializedData(this.serializedData); + if (this.plan != null) { + builder.setPlan(this.plan); } if (this.interQuery != null) { builder.setInterQuery(this.interQuery); http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java index 9787276..29dc845 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java @@ -18,23 +18,25 @@ package org.apache.tajo.engine.utils.test; +import org.apache.tajo.OverridableConf; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.PlanningException; -import org.apache.tajo.plan.rewrite.RewriteRule; +import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; -public class ErrorInjectionRewriter implements RewriteRule { +@SuppressWarnings("unused") +public class ErrorInjectionRewriter implements LogicalPlanRewriteRule { @Override public String getName() { return "ErrorInjectionRewriter"; } @Override - public boolean isEligible(LogicalPlan plan) { + public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) { return true; } @Override - public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException { + public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws PlanningException { throw new NullPointerException(); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java index dd6233c..1cd6587 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java @@ -35,13 +35,15 @@ import org.apache.tajo.engine.query.TaskRequest; import org.apache.tajo.engine.query.TaskRequestImpl; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.master.event.*; import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; import org.apache.tajo.master.event.TaskSchedulerEvent.EventType; +import org.apache.tajo.master.querymaster.Stage; import org.apache.tajo.master.querymaster.Task; import org.apache.tajo.master.querymaster.TaskAttempt; -import org.apache.tajo.master.querymaster.Stage; -import org.apache.tajo.master.container.TajoContainerId; +import org.apache.tajo.plan.serder.LogicalNodeSerializer; +import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.storage.DataLocation; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; @@ -125,7 +127,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { builder.setId(NULL_ATTEMPT_ID.getProto()); builder.setShouldDie(true); builder.setOutputTable(""); - builder.setSerializedData(""); + builder.setPlan(PlanProto.LogicalNodeTree.newBuilder()); builder.setClusteredOutput(false); stopTaskRunnerReq = builder.build(); } @@ -838,7 +840,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { new ArrayList<FragmentProto>(task.getAllFragments()), "", false, - task.getLogicalPlan().toJson(), + LogicalNodeSerializer.serialize(task.getLogicalPlan()), context.getMasterContext().getQueryContext(), stage.getDataChannel(), stage.getBlock().getEnforcer()); if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) { @@ -894,7 +896,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { Lists.newArrayList(task.getAllFragments()), "", false, - task.getLogicalPlan().toJson(), + LogicalNodeSerializer.serialize(task.getLogicalPlan()), context.getMasterContext().getQueryContext(), stage.getDataChannel(), stage.getBlock().getEnforcer()); http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java index d7e7670..51964f0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java @@ -94,6 +94,7 @@ public class GlobalEngine extends AbstractService { annotatedPlanVerifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog()); } catch (Throwable t) { LOG.error(t.getMessage(), t); + throw new RuntimeException(t); } super.start(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index 3585ae7..10701f9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -144,10 +144,10 @@ public class QueryExecutor { // others } else { - if (setSessionNode.isDefaultValue()) { - session.removeVariable(varName); - } else { + if (setSessionNode.hasValue()) { session.setVariable(varName, setSessionNode.getValue()); + } else { + session.removeVariable(varName); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index e3d3d79..720d60a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -37,13 +37,11 @@ import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager; -import org.apache.tajo.master.exec.prehook.InsertIntoHook; import org.apache.tajo.plan.LogicalOptimizer; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; import org.apache.tajo.plan.logical.LogicalRootNode; -import org.apache.tajo.plan.rewrite.RewriteRule; +import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.plan.logical.LogicalNode; @@ -53,7 +51,6 @@ import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.exception.UnimplementedException; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.master.GlobalEngine; import org.apache.tajo.master.TajoAsyncDispatcher; import org.apache.tajo.master.TajoContainerProxy; import org.apache.tajo.master.event.*; @@ -380,10 +377,10 @@ public class QueryMasterTask extends CompositeService { if (tableDesc == null) { throw new VerifyException("Can't get table meta data from catalog: " + tableName); } - List<RewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules( + List<LogicalPlanRewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules( getQueryTaskContext().getQueryContext(), tableDesc); if (storageSpecifiedRewriteRules != null) { - for (RewriteRule eachRule: storageSpecifiedRewriteRules) { + for (LogicalPlanRewriteRule eachRule: storageSpecifiedRewriteRules) { optimizer.addRuleAfterToJoinOpt(eachRule); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 70a3202..5f9c6ac 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -39,8 +39,8 @@ import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.engine.json.CoreGsonHelper; import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.plan.serder.LogicalNodeDeserializer; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.engine.planner.physical.PhysicalExec; import org.apache.tajo.engine.query.QueryContext; @@ -124,7 +124,7 @@ public class Task { this.context.setEnforcer(request.getEnforcer()); this.inputStats = new TableStats(); - plan = CoreGsonHelper.fromJson(request.getSerializedData(), LogicalNode.class); + plan = LogicalNodeDeserializer.deserialize(queryContext, request.getPlan()); LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN); if (scanNode != null) { for (LogicalNode node : scanNode) { http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/proto/TajoWorkerProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto index 5acbcd9..b8c9575 100644 --- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto +++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto @@ -70,7 +70,7 @@ message TaskRequestProto { repeated FragmentProto fragments = 2; required string outputTable = 3; required bool clusteredOutput = 4; - required string serializedData = 5; + required LogicalNodeTree plan = 5; optional bool interQuery = 6 [default = false]; repeated FetchProto fetches = 7; optional bool shouldDie = 8; @@ -261,7 +261,7 @@ message JoinEnforce { MERGE_JOIN = 4; } - required int32 pid = 1; + required int32 nodeId = 1; required JoinAlgorithm algorithm = 2; } @@ -271,7 +271,7 @@ message GroupbyEnforce { SORT_AGGREGATION = 1; } - required int32 pid = 1; + required int32 nodeId = 1; required GroupbyAlgorithm algorithm = 2; repeated SortSpecProto sortSpecs = 3; } @@ -282,7 +282,7 @@ message SortEnforce { MERGE_SORT = 1; } - required int32 pid = 1; + required int32 nodeId = 1; required SortAlgorithm algorithm = 2; } @@ -296,7 +296,7 @@ message ColumnPartitionEnforcer { SORT_PARTITION = 1; } - required int32 pid = 1; + required int32 nodeId = 1; required ColumnPartitionAlgorithm algorithm = 2; } @@ -313,10 +313,10 @@ message DistinctGroupbyEnforcer { } message SortSpecArray { - required int32 pid = 1; + required int32 nodeId = 1; repeated SortSpecProto sortSpecs = 2; } - required int32 pid = 1; + required int32 nodeId = 1; required DistinctAggregationAlgorithm algorithm = 2; repeated SortSpecArray sortSpecArrays = 3; required bool isMultipleAggregation = 4 [default = false]; http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index 7dc1089..5ff637c 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -42,12 +42,14 @@ import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.client.TajoClientUtil; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanTestRuleProvider; import org.apache.tajo.master.TajoMaster; import org.apache.tajo.master.querymaster.Query; import org.apache.tajo.master.querymaster.QueryMasterTask; import org.apache.tajo.master.querymaster.Stage; import org.apache.tajo.master.querymaster.StageState; import org.apache.tajo.master.rm.TajoWorkerResourceManager; +import org.apache.tajo.plan.rewrite.LogicalPlanTestRuleProvider; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.NetUtils; @@ -57,10 +59,7 @@ import java.io.*; import java.net.InetSocketAddress; import java.net.URL; import java.sql.ResultSet; -import java.util.ArrayList; -import java.util.List; -import java.util.TimeZone; -import java.util.UUID; +import java.util.*; public class TajoTestingCluster { private static Log LOG = LogFactory.getLog(TajoTestingCluster.class); @@ -119,10 +118,18 @@ public class TajoTestingCluster { } void initPropertiesAndConfigs() { + + // Set time zone TimeZone testDefaultTZ = TimeZone.getTimeZone(TajoConstants.DEFAULT_SYSTEM_TIMEZONE); conf.setSystemTimezone(testDefaultTZ); TimeZone.setDefault(testDefaultTZ); + // Injection of equality testing code of logical plan (de)serialization + conf.setClassVar(ConfVars.LOGICAL_PLAN_REWRITE_RULE_PROVIDER_CLASS, LogicalPlanTestRuleProvider.class); + conf.setClassVar(ConfVars.GLOBAL_PLAN_REWRITE_RULE_PROVIDER_CLASS, GlobalPlanTestRuleProvider.class); + + + // default resource manager if (System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname) != null) { String testResourceManager = System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname); Preconditions.checkState(testResourceManager.equals(TajoWorkerResourceManager.class.getCanonicalName())); http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java index e286b92..4e4b710 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java @@ -38,8 +38,8 @@ import org.apache.tajo.engine.json.CoreGsonHelper; import org.apache.tajo.engine.parser.SQLAnalyzer; import org.apache.tajo.plan.*; import org.apache.tajo.plan.expr.EvalNode; -import org.apache.tajo.plan.serder.EvalTreeProtoDeserializer; -import org.apache.tajo.plan.serder.EvalTreeProtoSerializer; +import org.apache.tajo.plan.serder.EvalNodeDeserializer; +import org.apache.tajo.plan.serder.EvalNodeSerializer; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.plan.serder.PlanProto; @@ -62,7 +62,9 @@ import java.util.TimeZone; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; public class ExprTestBase { private static TajoTestingCluster util; @@ -141,7 +143,7 @@ public class ExprTestBase { assertFalse(state.getErrorMessages().get(0), true); } LogicalPlan plan = planner.createPlan(context, expr, true); - optimizer.optimize(plan); + optimizer.optimize(context, plan); annotatedPlanVerifier.verify(context, state, plan); if (state.getErrorMessages().size() > 0) { @@ -318,7 +320,7 @@ public class ExprTestBase { } public static void assertEvalTreeProtoSerDer(OverridableConf context, EvalNode evalNode) { - PlanProto.EvalTree converted = EvalTreeProtoSerializer.serialize(evalNode); - assertEquals(evalNode, EvalTreeProtoDeserializer.deserialize(context, converted)); + PlanProto.EvalNodeTree converted = EvalNodeSerializer.serialize(evalNode); + assertEquals(evalNode, EvalNodeDeserializer.deserialize(context, converted)); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java index bfd1700..794c14f 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java @@ -344,55 +344,84 @@ public class TestGroupByQuery extends QueryTestCaseBase { } @Test - public final void testDistinctAggregationCasebyCase() throws Exception { - ResultSet res; - + public final void testDistinctAggregationCasebyCase1() throws Exception { // one groupby, distinct, aggregation - res = executeFile("testDistinctAggregation_case1.sql"); + ResultSet res = executeFile("testDistinctAggregation_case1.sql"); assertResultSet(res, "testDistinctAggregation_case1.result"); res.close(); + } + @Test + public final void testDistinctAggregationCasebyCase2() throws Exception { // one groupby, two distinct, one aggregation - res = executeFile("testDistinctAggregation_case2.sql"); + ResultSet res = executeFile("testDistinctAggregation_case2.sql"); assertResultSet(res, "testDistinctAggregation_case2.result"); res.close(); + } + @Test + public final void testDistinctAggregationCasebyCase3() throws Exception { // one groupby, two distinct, two aggregation(no alias) - res = executeFile("testDistinctAggregation_case3.sql"); + ResultSet res = executeFile("testDistinctAggregation_case3.sql"); assertResultSet(res, "testDistinctAggregation_case3.result"); res.close(); + } + @Test + public final void testDistinctAggregationCasebyCase4() throws Exception { // two groupby, two distinct, two aggregation - res = executeFile("testDistinctAggregation_case4.sql"); + ResultSet res = executeFile("testDistinctAggregation_case4.sql"); assertResultSet(res, "testDistinctAggregation_case4.result"); res.close(); + } + @Test + public final void testDistinctAggregationCasebyCase5() throws Exception { // two groupby, two distinct, two aggregation with stage - res = executeFile("testDistinctAggregation_case5.sql"); + ResultSet res = executeFile("testDistinctAggregation_case5.sql"); assertResultSet(res, "testDistinctAggregation_case5.result"); res.close(); + } - res = executeFile("testDistinctAggregation_case6.sql"); + @Test + public final void testDistinctAggregationCasebyCase6() throws Exception { + ResultSet res = executeFile("testDistinctAggregation_case6.sql"); assertResultSet(res, "testDistinctAggregation_case6.result"); res.close(); + } - res = executeFile("testDistinctAggregation_case7.sql"); + @Test + public final void testDistinctAggregationCasebyCase7() throws Exception { + ResultSet res = executeFile("testDistinctAggregation_case7.sql"); assertResultSet(res, "testDistinctAggregation_case7.result"); res.close(); + } - res = executeFile("testDistinctAggregation_case8.sql"); + @Test + public final void testDistinctAggregationCasebyCase8() throws Exception { + ResultSet res = executeFile("testDistinctAggregation_case8.sql"); assertResultSet(res, "testDistinctAggregation_case8.result"); res.close(); + } - res = executeFile("testDistinctAggregation_case9.sql"); + @Test + public final void testDistinctAggregationCasebyCase9() throws Exception { + ResultSet res = executeFile("testDistinctAggregation_case9.sql"); assertResultSet(res, "testDistinctAggregation_case9.result"); res.close(); + } - res = executeFile("testDistinctAggregation_case10.sql"); + @Test + public final void testDistinctAggregationCasebyCase10() throws Exception { + ResultSet res = executeFile("testDistinctAggregation_case10.sql"); assertResultSet(res, "testDistinctAggregation_case10.result"); res.close(); + } + + @Test + public final void testDistinctAggregationCasebyCase11() throws Exception { + ResultSet res; - // case9 KeyValueSet tableOptions = new KeyValueSet(); tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N"); @@ -417,7 +446,7 @@ public class TestGroupByQuery extends QueryTestCaseBase { assertEquals(expected, resultSetToString(res)); - // multiple distinct with expression + // multiple distinct with expression res = executeString( "select count(distinct code) + count(distinct qty) from table10" );
