http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimilarToPredicateEval.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimilarToPredicateEval.java b/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimilarToPredicateEval.java index 9ac0e62..a690759 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimilarToPredicateEval.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimilarToPredicateEval.java @@ -24,6 +24,11 @@ import java.util.regex.PatternSyntaxException; public class SimilarToPredicateEval extends PatternMatchPredicateEval { private static final String SIMILARTO_ESCAPE_SPATIAL_CHARACTERS = "([.])"; + public SimilarToPredicateEval(boolean not, EvalNode field, ConstEval pattern, + @SuppressWarnings("unused") boolean isCaseSensitive) { + super(EvalType.SIMILAR_TO, not, field, pattern, false); + } + public SimilarToPredicateEval(boolean not, EvalNode field, ConstEval pattern) { super(EvalType.SIMILAR_TO, not, field, pattern); }
http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimpleEvalNodeVisitor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimpleEvalNodeVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimpleEvalNodeVisitor.java index 15e34de..15b628b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimpleEvalNodeVisitor.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimpleEvalNodeVisitor.java @@ -60,9 +60,6 @@ public abstract class SimpleEvalNodeVisitor<CONTEXT> { case IF_THEN: result = visitIfThen(context, (CaseWhenEval.IfThenEval) evalNode, stack); break; - case IN: - result = visitInPredicate(context, (InEval) evalNode, stack); - break; // Functions case FUNCTION: http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceBoolean.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceBoolean.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceBoolean.java index 8c714c5..05640c9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceBoolean.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceBoolean.java @@ -32,12 +32,12 @@ import org.apache.tajo.engine.function.annotation.ParamTypes; example = "> SELECT coalesce(null, null, true);\n" + "true", returnType = Type.BOOLEAN, - paramTypes = {@ParamTypes(paramTypes = {Type.BOOLEAN, TajoDataTypes.Type.BOOLEAN_ARRAY})} + paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.BOOLEAN, TajoDataTypes.Type.BOOLEAN_ARRAY})} ) public class CoalesceBoolean extends Coalesce { public CoalesceBoolean() { super(new Column[] { - new Column("column", TajoDataTypes.Type.BOOLEAN), + new Column("param", TajoDataTypes.Type.BOOLEAN), new Column("params", TajoDataTypes.Type.BOOLEAN_ARRAY), }); } http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDate.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDate.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDate.java index 23f8f0c..35df518 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDate.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDate.java @@ -32,12 +32,11 @@ import org.apache.tajo.engine.function.annotation.ParamTypes; example = "> SELECT coalesce(null, null, date '2014-01-01');\n" + "2014-01-01", returnType = Type.DATE, - paramTypes = {@ParamTypes(paramTypes = {Type.DATE, Type.DATE_ARRAY})} + paramTypes = {@ParamTypes(paramTypes = {Type.DATE_ARRAY})} ) public class CoalesceDate extends Coalesce { public CoalesceDate() { super(new Column[] { - new Column("column", TajoDataTypes.Type.DATE), new Column("params", TajoDataTypes.Type.DATE_ARRAY), }); } http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDouble.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDouble.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDouble.java index 3e94150..47c363f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDouble.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDouble.java @@ -31,12 +31,11 @@ import org.apache.tajo.engine.function.annotation.ParamTypes; example = "> SELECT coalesce(null, null, 10.0);\n" + "10.0", returnType = TajoDataTypes.Type.FLOAT8, - paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.FLOAT8, TajoDataTypes.Type.FLOAT8_ARRAY})} + paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.FLOAT8_ARRAY})} ) public class CoalesceDouble extends Coalesce { public CoalesceDouble() { super(new Column[] { - new Column("column", TajoDataTypes.Type.FLOAT8), new Column("params", TajoDataTypes.Type.FLOAT8_ARRAY), }); } http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceLong.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceLong.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceLong.java index 5d55255..f975615 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceLong.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceLong.java @@ -31,13 +31,12 @@ import org.apache.tajo.engine.function.annotation.ParamTypes; example = "> SELECT coalesce(null, null, 10);\n" + "10", returnType = TajoDataTypes.Type.INT8, - paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.INT8, TajoDataTypes.Type.INT8_ARRAY})} + paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.INT8_ARRAY})} ) public class CoalesceLong extends Coalesce { public CoalesceLong() { super(new Column[] { - new Column("column", TajoDataTypes.Type.INT8), new Column("params", TajoDataTypes.Type.INT8_ARRAY), }); } http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceString.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceString.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceString.java index 50e4786..6441e00 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceString.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceString.java @@ -31,13 +31,12 @@ import org.apache.tajo.engine.function.annotation.ParamTypes; example = "> SELECT coalesce(null, null, 'default');\n" + "default", returnType = TajoDataTypes.Type.TEXT, - paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.TEXT, TajoDataTypes.Type.TEXT_ARRAY})} + paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.TEXT_ARRAY})} ) public class CoalesceString extends Coalesce { public CoalesceString() { super(new Column[] { - new Column("column", TajoDataTypes.Type.TEXT), new Column("params", TajoDataTypes.Type.TEXT_ARRAY), }); } http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTime.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTime.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTime.java index 01bb6de..56cfe32 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTime.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTime.java @@ -32,12 +32,11 @@ import org.apache.tajo.engine.function.annotation.ParamTypes; example = "> SELECT coalesce(null, null, time '12:10:00');\n" + "12:10:00", returnType = Type.TIME, - paramTypes = {@ParamTypes(paramTypes = {Type.TIME, Type.TIME_ARRAY})} + paramTypes = {@ParamTypes(paramTypes = {Type.TIME_ARRAY})} ) public class CoalesceTime extends Coalesce { public CoalesceTime() { super(new Column[] { - new Column("column", TajoDataTypes.Type.TIME), new Column("params", TajoDataTypes.Type.TIME_ARRAY), }); } http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTimestamp.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTimestamp.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTimestamp.java index 2609717..ec02e46 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTimestamp.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTimestamp.java @@ -32,12 +32,11 @@ import org.apache.tajo.engine.function.annotation.ParamTypes; example = "> SELECT coalesce(null, null, timestamp '2014-01-01');\n" + "2014-01-01 00:00:00", returnType = Type.TIMESTAMP, - paramTypes = {@ParamTypes(paramTypes = {Type.TIMESTAMP, Type.TIMESTAMP_ARRAY})} + paramTypes = {@ParamTypes(paramTypes = {Type.TIMESTAMP_ARRAY})} ) public class CoalesceTimestamp extends Coalesce { public CoalesceTimestamp() { super(new Column[] { - new Column("column", TajoDataTypes.Type.TIMESTAMP), new Column("params", TajoDataTypes.Type.TIMESTAMP_ARRAY), }); } http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java index 6a3af98..574e32c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java @@ -439,11 +439,18 @@ public class ExprAnnotator extends BaseAlgebraVisitor<ExprAnnotator.Context, Eva @Override public EvalNode visitConcatenate(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException { stack.push(expr); - EvalNode left = visit(ctx, stack, expr.getLeft()); - EvalNode right = visit(ctx, stack, expr.getRight()); + EvalNode lhs = visit(ctx, stack, expr.getLeft()); + EvalNode rhs = visit(ctx, stack, expr.getRight()); stack.pop(); - return new BinaryEval(EvalType.CONCATENATE, left, right); + if (lhs.getValueType().getType() != Type.TEXT) { + lhs = convertType(lhs, CatalogUtil.newSimpleDataType(Type.TEXT)); + } + if (rhs.getValueType().getType() != Type.TEXT) { + rhs = convertType(rhs, CatalogUtil.newSimpleDataType(Type.TEXT)); + } + + return new BinaryEval(EvalType.CONCATENATE, lhs, rhs); } private EvalNode visitPatternMatchPredicate(Context ctx, Stack<Expr> stack, PatternMatchPredicate expr) @@ -599,7 +606,7 @@ public class ExprAnnotator extends BaseAlgebraVisitor<ExprAnnotator.Context, Eva // trying the implicit type conversion between actual parameter types and the definition types. if (CatalogUtil.checkIfVariableLengthParamDefinition(TUtil.newList(funcDesc.getParamTypes()))) { - DataType lastDataType = null; + DataType lastDataType = funcDesc.getParamTypes()[0]; for (int i = 0; i < givenArgs.length; i++) { if (i < (funcDesc.getParamTypes().length - 1)) { // variable length lastDataType = funcDesc.getParamTypes()[i]; http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java index e34548c..2730202 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java @@ -55,6 +55,7 @@ import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.util.List; import java.util.Stack; @@ -147,6 +148,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { stack.push(selNode); leftExec = createPlanRecursive(ctx, selNode.getChild(), stack); stack.pop(); + return new SelectionExec(ctx, selNode, leftExec); case PROJECTION: @@ -154,6 +156,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { stack.push(prjNode); leftExec = createPlanRecursive(ctx, prjNode.getChild(), stack); stack.pop(); + return new ProjectionExec(ctx, prjNode, leftExec); case TABLE_SUBQUERY: { @@ -210,6 +213,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { leftExec = createPlanRecursive(ctx, joinNode.getLeftChild(), stack); rightExec = createPlanRecursive(ctx, joinNode.getRightChild(), stack); stack.pop(); + return createJoinPlan(ctx, joinNode, leftExec, rightExec); case UNION: http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java index 161d39b..d8499d0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java @@ -18,34 +18,49 @@ package org.apache.tajo.engine.planner; +import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.Schema; import org.apache.tajo.engine.eval.EvalNode; import org.apache.tajo.storage.Tuple; +import org.apache.tajo.worker.TaskAttemptContext; public class Projector { + private final TaskAttemptContext context; private final Schema inSchema; + private final Target [] targets; // for projection private final int targetNum; private final EvalNode[] evals; - public Projector(Schema inSchema, Schema outSchema, Target [] targets) { + public Projector(TaskAttemptContext context, Schema inSchema, Schema outSchema, Target [] targets) { + this.context = context; this.inSchema = inSchema; if (targets == null) { - targets = PlannerUtil.schemaToTargets(outSchema); + this.targets = PlannerUtil.schemaToTargets(outSchema); + } else { + this.targets = targets; } - this.targetNum = targets.length; + + this.targetNum = this.targets.length; evals = new EvalNode[targetNum]; - for (int i = 0; i < targetNum; i++) { - evals[i] = targets[i].getEvalTree(); + + if (context.getQueryContext().getBool(SessionVars.CODEGEN)) { + EvalNode eval; + for (int i = 0; i < targetNum; i++) { + eval = this.targets[i].getEvalTree(); + evals[i] = context.getPrecompiledEval(inSchema, eval); + } + } else { + for (int i = 0; i < targetNum; i++) { + evals[i] = this.targets[i].getEvalTree(); + } } } public void eval(Tuple in, Tuple out) { - if (targetNum > 0) { - for (int i = 0; i < evals.length; i++) { - out.put(i, evals[i].eval(inSchema, in)); - } + for (int i = 0; i < evals.length; i++) { + out.put(i, evals[i].eval(inSchema, in)); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/HavingNode.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/HavingNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/HavingNode.java index 6c45868..aa6d597 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/HavingNode.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/HavingNode.java @@ -22,21 +22,26 @@ import com.google.gson.annotations.Expose; import org.apache.tajo.engine.eval.EvalNode; import org.apache.tajo.engine.planner.PlanString; -public class HavingNode extends UnaryNode implements Cloneable { +public class HavingNode extends UnaryNode implements SelectableNode, Cloneable { @Expose private EvalNode qual; public HavingNode(int pid) { super(pid, NodeType.HAVING); } - public EvalNode getQual() { - return this.qual; - } + @Override + public boolean hasQual() { + return true; + } - public void setQual(EvalNode qual) { + public void setQual(EvalNode qual) { this.qual = qual; } + public EvalNode getQual() { + return this.qual; + } + @Override public boolean equals(Object obj) { if (obj instanceof HavingNode) { http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java index 9582dee..8d28e6e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java @@ -30,7 +30,7 @@ import org.apache.tajo.engine.planner.Target; import org.apache.tajo.engine.utils.SchemaUtil; import org.apache.tajo.util.TUtil; -public class ScanNode extends RelationNode implements Projectable, Cloneable { +public class ScanNode extends RelationNode implements Projectable, SelectableNode, Cloneable { @Expose protected TableDesc tableDesc; @Expose protected String alias; @Expose protected Schema logicalSchema; @@ -101,6 +101,7 @@ public class ScanNode extends RelationNode implements Projectable, Cloneable { } } + @Override public Schema getTableSchema() { return logicalSchema; } @@ -108,15 +109,18 @@ public class ScanNode extends RelationNode implements Projectable, Cloneable { public Schema getPhysicalSchema() { return getInSchema(); } - + + @Override public boolean hasQual() { return qual != null; } - + + @Override public EvalNode getQual() { return this.qual; } - + + @Override public void setQual(EvalNode evalTree) { this.qual = evalTree; } http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectableNode.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectableNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectableNode.java new file mode 100644 index 0000000..7082f4b --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectableNode.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.logical; + +import org.apache.tajo.engine.eval.EvalNode; + +/** + * An interface for logical node which is able to filter tuples. + */ +public interface SelectableNode { + + /** + * Checking if it has filter condition + * + * @return True if it has filter condition. Otherwise, it will return false. + */ + public boolean hasQual(); + + /** + * Set a filter condition. + * + * @param eval EvalNode resulting in a boolean result. + */ + public void setQual(EvalNode eval); + + /** + * Get a filter condition + * + * @return Filter Condition + */ + public EvalNode getQual(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java index b8a9680..3bbbd82 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java @@ -22,21 +22,26 @@ import com.google.gson.annotations.Expose; import org.apache.tajo.engine.eval.EvalNode; import org.apache.tajo.engine.planner.PlanString; -public class SelectionNode extends UnaryNode implements Cloneable { +public class SelectionNode extends UnaryNode implements SelectableNode, Cloneable { @Expose private EvalNode qual; public SelectionNode(int pid) { super(pid, NodeType.SELECTION); } - public EvalNode getQual() { - return this.qual; - } + @Override + public boolean hasQual() { + return true; + } - public void setQual(EvalNode qual) { + public void setQual(EvalNode qual) { this.qual = qual; } + public EvalNode getQual() { + return this.qual; + } + @Override public PlanString getPlanString() { PlanString planStr = new PlanString(this); http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java index 60a7c19..91cefa1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java @@ -79,13 +79,20 @@ public class BNLJoinExec extends BinaryPhysicalExec { plan.setTargets(PlannerUtil.schemaToTargets(outSchema)); } - projector = new Projector(inSchema, outSchema, plan.getTargets()); + projector = new Projector(context, inSchema, outSchema, plan.getTargets()); // for join frameTuple = new FrameTuple(); outputTuple = new VTuple(outSchema.size()); } + @Override + protected void compile() { + if (hasJoinQual) { + joinQual = context.getPrecompiledEval(inSchema, joinQual); + } + } + public JoinNode getPlan() { return plan; } http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java index 35de707..f831525 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java @@ -59,7 +59,7 @@ public class BSTIndexScanExec extends PhysicalExec { this.fileScanner = StorageManagerFactory.getSeekableScanner(context.getConf(), scanNode.getTableDesc().getMeta(), scanNode.getInSchema(), fragment, outSchema); this.fileScanner.init(); - this.projector = new Projector(inSchema, outSchema, scanNode.getTargets()); + this.projector = new Projector(context, inSchema, outSchema, scanNode.getTargets()); this.reader = new BSTIndex(sm.getFileSystem().getConf()). getIndexReader(fileName, keySchema, comparator); http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java index f6f3e52..42611b0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java @@ -18,6 +18,8 @@ package org.apache.tajo.engine.planner.physical; +import org.apache.tajo.engine.planner.PhysicalPlanningException; + import java.util.Stack; public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalExecutorVisitor<CONTEXT, RESULT> { http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java index 628c18c..03ec396 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java @@ -52,6 +52,8 @@ public abstract class BinaryPhysicalExec extends PhysicalExec { leftChild.init(); rightChild.init(); progress = 0.0f; + + super.init(); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java index 6215527..700e34d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java @@ -34,6 +34,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.engine.planner.PhysicalPlanningException; import org.apache.tajo.engine.planner.logical.SortNode; import org.apache.tajo.storage.*; import org.apache.tajo.storage.Scanner; http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java index 65ebe2f..9dabbb3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java @@ -19,6 +19,7 @@ package org.apache.tajo.engine.planner.physical; import org.apache.tajo.catalog.Column; +import org.apache.tajo.engine.codegen.CompilationError; import org.apache.tajo.engine.eval.EvalNode; import org.apache.tajo.engine.planner.PlannerUtil; import org.apache.tajo.engine.planner.Projector; @@ -91,7 +92,7 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec { } // for projection - this.projector = new Projector(inSchema, outSchema, plan.getTargets()); + this.projector = new Projector(context, inSchema, outSchema, plan.getTargets()); // for join frameTuple = new FrameTuple(); @@ -102,6 +103,11 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec { rightNumCols = inner.getSchema().size(); } + @Override + protected void compile() throws CompilationError { + joinQual = context.getPrecompiledEval(inSchema, joinQual); + } + protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) { for (int i = 0; i < leftKeyList.length; i++) { keyTuple.put(i, outerTuple.get(leftKeyList[i])); http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java index a5e9df0..426a7a1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java @@ -82,7 +82,7 @@ public class HashJoinExec extends BinaryPhysicalExec { } // for projection - this.projector = new Projector(inSchema, outSchema, plan.getTargets()); + this.projector = new Projector(context, inSchema, outSchema, plan.getTargets()); // for join frameTuple = new FrameTuple(); @@ -90,6 +90,11 @@ public class HashJoinExec extends BinaryPhysicalExec { leftKeyTuple = new VTuple(leftKeyList.length); } + @Override + protected void compile() { + joinQual = context.getPrecompiledEval(inSchema, joinQual); + } + protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) { for (int i = 0; i < leftKeyList.length; i++) { keyTuple.put(i, outerTuple.get(leftKeyList[i])); http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java index ac8b28f..b752db5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java @@ -109,7 +109,7 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec { } // for projection - this.projector = new Projector(inSchema, outSchema, plan.getTargets()); + this.projector = new Projector(context, inSchema, outSchema, plan.getTargets()); // for join frameTuple = new FrameTuple(); @@ -119,6 +119,11 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec { rightNumCols = rightChild.getSchema().size(); } + @Override + protected void compile() { + joinQual = context.getPrecompiledEval(inSchema, joinQual); + } + protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) { for (int i = 0; i < leftKeyList.length; i++) { keyTuple.put(i, outerTuple.get(leftKeyList[i])); http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java index 4fbb5e4..4fdd03a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java @@ -45,6 +45,10 @@ public class HashLeftSemiJoinExec extends HashJoinExec { } } + protected void compile() { + joinQual = context.getPrecompiledEval(inSchema, joinQual); + } + /** * The End of Tuple (EOT) condition is true only when no more tuple in the left relation (on disk). * next() method finds the first unmatched tuple from both tables. http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java index ff1f7b3..e1cc6a8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java @@ -88,7 +88,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec { plan.getJoinQual(), leftChild.getSchema(), rightChild.getSchema()); // for projection - this.projector = new Projector(inSchema, outSchema, plan.getTargets()); + this.projector = new Projector(context, inSchema, outSchema, plan.getTargets()); // for join frameTuple = new FrameTuple(); @@ -98,6 +98,11 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec { rightNumCols = rightChild.getSchema().size(); } + @Override + protected void compile() { + joinQual = context.getPrecompiledEval(inSchema, joinQual); + } + public JoinNode getPlan(){ return this.joinNode; } http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java index 470e1c9..bbfe973 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java @@ -84,13 +84,18 @@ public class MergeJoinExec extends BinaryPhysicalExec { this.innerIterator = innerTupleSlots.iterator(); // for projection - this.projector = new Projector(inSchema, outSchema, plan.getTargets()); + this.projector = new Projector(context, inSchema, outSchema, plan.getTargets()); // for join frameTuple = new FrameTuple(); outTuple = new VTuple(outSchema.size()); } + @Override + protected void compile() { + joinQual = context.getPrecompiledEval(inSchema, joinQual); + } + public JoinNode getPlan(){ return this.joinNode; } http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java index 6e5900e..dc061ed 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java @@ -54,7 +54,7 @@ public class NLJoinExec extends BinaryPhysicalExec { } // for projection - projector = new Projector(inSchema, outSchema, plan.getTargets()); + projector = new Projector(context, inSchema, outSchema, plan.getTargets()); // for join needNewOuter = true; http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java index 5c17c40..37ef7df 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java @@ -57,7 +57,7 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec { } // for projection - projector = new Projector(inSchema, outSchema, plan.getTargets()); + projector = new Projector(context, inSchema, outSchema, plan.getTargets()); // for join needNextRightTuple = true; http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java index 9fa5b76..d87a30d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java @@ -62,8 +62,9 @@ public class PartitionMergeScanExec extends PhysicalExec { public void init() throws IOException { for (CatalogProtos.FragmentProto fragment : fragments) { - scanners.add(new SeqScanExec(context, sm, (ScanNode) PlannerUtil.clone(null, plan), - new CatalogProtos.FragmentProto[] {fragment})); + SeqScanExec scanExec = new SeqScanExec(context, sm, (ScanNode) PlannerUtil.clone(null, plan), + new CatalogProtos.FragmentProto[] {fragment}); + scanners.add(scanExec); } progress = 0.0f; rescan(); http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java index e30a10b..31cfc4d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java @@ -20,9 +20,11 @@ package org.apache.tajo.engine.planner.physical; import org.apache.commons.logging.Log; import org.apache.hadoop.fs.Path; +import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SchemaObject; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.engine.codegen.CompilationError; import org.apache.tajo.storage.Tuple; import org.apache.tajo.worker.TaskAttemptContext; @@ -47,7 +49,14 @@ public abstract class PhysicalExec implements SchemaObject { return outSchema; } - public abstract void init() throws IOException; + public void init() throws IOException { + if (context.getQueryContext().getBool(SessionVars.CODEGEN)) { + this.compile(); + } + } + + protected void compile() throws CompilationError { + } public abstract Tuple next() throws IOException; http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java index 738db62..505b599 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java @@ -18,6 +18,8 @@ package org.apache.tajo.engine.planner.physical; +import org.apache.tajo.engine.planner.PhysicalPlanningException; + import java.util.Stack; public interface PhysicalExecutorVisitor<CONTEXT, RESULT> { http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java index 0909c76..2f55cf7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java @@ -20,6 +20,7 @@ package org.apache.tajo.engine.planner.physical; import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.engine.planner.PhysicalPlanningException; import org.apache.tajo.engine.planner.logical.NodeType; import org.apache.tajo.engine.planner.logical.PersistentStoreNode; import org.apache.tajo.engine.query.QueryContext; http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java deleted file mode 100644 index 62add1e..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.engine.planner.physical; - -import java.io.IOException; - -public class PhysicalPlanningException extends IOException { - public PhysicalPlanningException(String message) { - super(message); - } - - public PhysicalPlanningException(Exception ioe) { - super(ioe); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java index ee6ef1d..89cd75a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java @@ -46,7 +46,7 @@ public class ProjectionExec extends UnaryPhysicalExec { super.init(); this.outTuple = new VTuple(outSchema.size()); - this.projector = new Projector(inSchema, outSchema, this.plan.getTargets()); + this.projector = new Projector(context, inSchema, outSchema, this.plan.getTargets()); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java index 365fc22..5d4dad5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java @@ -87,7 +87,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec { plan.getJoinQual(), outer.getSchema(), inner.getSchema()); // for projection - this.projector = new Projector(inSchema, outSchema, plan.getTargets()); + this.projector = new Projector(context, inSchema, outSchema, plan.getTargets()); // for join frameTuple = new FrameTuple(); @@ -96,6 +96,11 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec { leftNumCols = outer.getSchema().size(); } + @Override + protected void compile() { + joinQual = context.getPrecompiledEval(inSchema, joinQual); + } + public JoinNode getPlan() { return this.joinNode; } http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java index 2e676e9..5ae9a8f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java @@ -18,6 +18,7 @@ package org.apache.tajo.engine.planner.physical; +import org.apache.tajo.engine.codegen.CompilationError; import org.apache.tajo.engine.eval.EvalNode; import org.apache.tajo.engine.planner.logical.SelectionNode; import org.apache.tajo.storage.Tuple; @@ -26,7 +27,7 @@ import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; public class SelectionExec extends UnaryPhysicalExec { - private final EvalNode qual; + private EvalNode qual; public SelectionExec(TaskAttemptContext context, SelectionNode plan, @@ -36,6 +37,11 @@ public class SelectionExec extends UnaryPhysicalExec { } @Override + public void compile() throws CompilationError { + qual = context.getPrecompiledEval(inSchema, qual); + } + + @Override public Tuple next() throws IOException { Tuple tuple; while ((tuple = child.next()) != null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index 507cb6c..2f0c12f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -26,6 +26,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.datum.Datum; +import org.apache.tajo.engine.codegen.CompilationError; import org.apache.tajo.engine.eval.ConstEval; import org.apache.tajo.engine.eval.EvalNode; import org.apache.tajo.engine.eval.EvalTreeUtil; @@ -66,8 +67,8 @@ public class SeqScanExec extends PhysicalExec { private boolean cacheRead = false; - public SeqScanExec(TaskAttemptContext context, AbstractStorageManager sm, - ScanNode plan, CatalogProtos.FragmentProto [] fragments) throws IOException { + public SeqScanExec(TaskAttemptContext context, AbstractStorageManager sm, ScanNode plan, + CatalogProtos.FragmentProto [] fragments) throws IOException { super(context, plan.getInSchema(), plan.getOutSchema()); this.plan = plan; @@ -87,6 +88,12 @@ public class SeqScanExec extends PhysicalExec { cacheKey = new TupleCacheKey( context.getTaskId().getQueryUnitId().getExecutionBlockId().toString(), plan.getTableName(), pathNameKey); } + + if (fragments != null + && plan.getTableDesc().hasPartition() + && plan.getTableDesc().getPartitionMethod().getPartitionType() == CatalogProtos.PartitionType.COLUMN) { + rewriteColumnPartitionedTableSchema(); + } } /** @@ -120,12 +127,16 @@ public class SeqScanExec extends PhysicalExec { Datum datum = targetExpr.eval(columnPartitionSchema, partitionRow); ConstEval constExpr = new ConstEval(datum); - for (Target target : plan.getTargets()) { + + for (int i = 0; i < plan.getTargets().length; i++) { + Target target = plan.getTargets()[i]; + if (target.getEvalTree().equals(targetExpr)) { if (!target.hasAlias()) { target.setAlias(target.getEvalTree().getName()); } target.setExpr(constExpr); + } else { EvalTreeUtil.replace(target.getEvalTree(), targetExpr, constExpr); } @@ -140,12 +151,6 @@ public class SeqScanExec extends PhysicalExec { public void init() throws IOException { Schema projected; - if (fragments != null - && plan.getTableDesc().hasPartition() - && plan.getTableDesc().getPartitionMethod().getPartitionType() == CatalogProtos.PartitionType.COLUMN) { - rewriteColumnPartitionedTableSchema(); - } - if (plan.hasTargets()) { projected = new Schema(); Set<Column> columnSet = new HashSet<Column>(); @@ -193,10 +198,19 @@ public class SeqScanExec extends PhysicalExec { } else { initScanner(projected); } + + super.init(); + } + + @Override + protected void compile() throws CompilationError { + if (plan.hasQual()) { + qual = context.getPrecompiledEval(inSchema, qual); + } } private void initScanner(Schema projected) throws IOException { - this.projector = new Projector(inSchema, outSchema, plan.getTargets()); + this.projector = new Projector(context, inSchema, outSchema, plan.getTargets()); if (fragments != null) { if (fragments.length > 1) { this.scanner = new MergeScanner(context.getConf(), plan.getPhysicalSchema(), plan.getTableDesc().getMeta(), http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java index ab67d7b..2c63ea6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java @@ -46,13 +46,17 @@ public abstract class UnaryPhysicalExec extends PhysicalExec { this.child = child; } + @Override public void init() throws IOException { progress = 0.0f; if (child != null) { child.init(); } + + super.init(); } + @Override public void rescan() throws IOException { progress = 0.0f; if (child != null) { @@ -60,6 +64,7 @@ public abstract class UnaryPhysicalExec extends PhysicalExec { } } + @Override public void close() throws IOException { progress = 1.0f; if (child != null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java index 56df48d..ef82427 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java @@ -161,7 +161,7 @@ public class QueryUnitRequestImpl implements QueryUnitRequest { this.serializedData = p.getSerializedData(); return this.serializedData; } - + public boolean isInterQuery() { QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder; if (interQuery != null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java index fc1be9b..2c62d42 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java @@ -60,6 +60,7 @@ import org.apache.tajo.master.querymaster.QueryJobManager; import org.apache.tajo.master.querymaster.QueryMasterTask; import org.apache.tajo.master.session.Session; import org.apache.tajo.storage.*; +import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -115,9 +116,20 @@ public class GlobalEngine extends AbstractService { super.stop(); } + private QueryContext createQueryContext(Session session) { + QueryContext newQueryContext = new QueryContext(context.getConf(), session); + + String tajoTest = System.getProperty(CommonTestingUtil.TAJO_TEST_KEY); + if (tajoTest != null && tajoTest.equalsIgnoreCase(CommonTestingUtil.TAJO_TEST_TRUE)) { + newQueryContext.putAll(CommonTestingUtil.getSessionVarsForTest()); + } + + return newQueryContext; + } + public SubmitQueryResponse executeQuery(Session session, String query, boolean isJson) { LOG.info("Query: " + query); - QueryContext queryContext = new QueryContext(context.getConf(), session); + QueryContext queryContext = createQueryContext(session); Expr planningContext; try { http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java new file mode 100644 index 0000000..9a4a01d --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.engine.query.QueryContext; + +import java.util.Collection; + +public class LaunchTaskRunnersEvent extends TaskRunnerGroupEvent { + private final QueryContext queryContext; + private final String planJson; + + public LaunchTaskRunnersEvent(ExecutionBlockId executionBlockId, + Collection<Container> containers, QueryContext queryContext, String planJson) { + super(EventType.CONTAINER_REMOTE_LAUNCH, executionBlockId, containers); + this.queryContext = queryContext; + this.planJson = planJson; + } + + public QueryContext getQueryContext() { + return queryContext; + } + + public String getPlanJson() { + return planJson; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java index cb7861c..f3e4b72 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryUnitAttemptId; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.querymaster.QueryMasterTask; @@ -42,10 +43,15 @@ import java.util.ArrayList; import java.util.List; public class TajoContainerProxy extends ContainerProxy { + private final QueryContext queryContext; + private final String planJson; + public TajoContainerProxy(QueryMasterTask.QueryMasterTaskContext context, Configuration conf, Container container, - ExecutionBlockId executionBlockId) { + QueryContext queryContext, ExecutionBlockId executionBlockId, String planJson) { super(context, conf, executionBlockId, container); + this.queryContext = queryContext; + this.planJson = planJson; } @Override @@ -101,6 +107,8 @@ public class TajoContainerProxy extends ContainerProxy { .setNodeId(container.getNodeId().toString()) .setContainerId(container.getId().toString()) .setQueryOutputPath(context.getStagingDir().toString()) + .setQueryContext(queryContext.getProto()) + .setPlanJson(planJson) .build(); tajoWorkerRpcClient.executeExecutionBlock(null, request, NullCallback.get()); http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java index 4e51460..8d4a6a3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java @@ -214,7 +214,7 @@ public class TajoMaster extends CompositeService { } private void initWebServer() throws Exception { - if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE")) { + if (!systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) { InetSocketAddress address = systemConf.getSocketAddrVar(ConfVars.TAJO_MASTER_INFO_ADDRESS); webServer = StaticHttpServer.getInstance(this ,"admin", address.getHostName(), address.getPort(), true, null, context.getConf(), null); http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java index deadd39..3a86802 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java @@ -190,7 +190,6 @@ public class QueryMaster extends CompositeService implements EventHandler { builder.addAllExecutionBlockId(Lists.newArrayList(executionBlockIds)); TajoWorkerProtocol.ExecutionBlockListProto executionBlockListProto = builder.build(); - List<IntermediateEntryProto> intermediateEntries = new ArrayList<IntermediateEntryProto>(); for (TajoMasterProtocol.WorkerResourceProto worker : workers) { try { if (worker.getPeerRpcPort() == 0) continue; http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java index 40c5406..87da175 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java @@ -42,6 +42,8 @@ import org.apache.tajo.catalog.statistics.ColumnStats; import org.apache.tajo.catalog.statistics.StatisticsUtil; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.json.CoreGsonHelper; +import org.apache.tajo.engine.plan.proto.PlanProto; import org.apache.tajo.engine.planner.PlannerUtil; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.global.ExecutionBlock; @@ -1034,8 +1036,9 @@ public class SubQuery implements EventHandler<SubQueryEvent> { } LOG.info("SubQuery (" + subQuery.getId() + ") has " + subQuery.containers.size() + " containers!"); subQuery.eventHandler.handle( - new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_LAUNCH, - subQuery.getId(), allocationEvent.getAllocatedContainer()) + new LaunchTaskRunnersEvent(subQuery.getId(), allocationEvent.getAllocatedContainer(), + subQuery.getContext().getQueryContext(), + CoreGsonHelper.toJson(subQuery.getBlock().getPlan(), LogicalNode.class)) ); subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_START)); http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java new file mode 100644 index 0000000..a70fbfd --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.SessionVars; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.engine.codegen.ExecutorPreCompiler; +import org.apache.tajo.engine.codegen.TajoClassLoader; +import org.apache.tajo.engine.eval.EvalNode; +import org.apache.tajo.engine.json.CoreGsonHelper; +import org.apache.tajo.engine.planner.PlanningException; +import org.apache.tajo.engine.planner.logical.LogicalNode; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.util.Pair; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +public class ExecutionBlockSharedResource { + private static Log LOG = LogFactory.getLog(ExecutionBlockSharedResource.class); + private AtomicBoolean initializing = new AtomicBoolean(false); + private volatile Boolean resourceInitSuccess = new Boolean(false); + private CountDownLatch initializedResourceLatch = new CountDownLatch(1); + + // Query + private QueryContext context; + + // Resources + private TajoClassLoader classLoader; + private ExecutorPreCompiler.CompilationContext compilationContext; + private LogicalNode plan; + private boolean codeGenEnabled = false; + + public void initialize(final QueryContext context, final String planJson) throws InterruptedException { + + if (!initializing.getAndSet(true)) { + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + try { + ExecutionBlockSharedResource.this.context = context; + initPlan(planJson); + initCodeGeneration(); + resourceInitSuccess = true; + } catch (Throwable t) { + LOG.error(t); + LOG.error(ExceptionUtils.getStackTrace(t)); + } finally { + initializedResourceLatch.countDown(); + } + } + }); + thread.run(); + thread.join(); + + if (!resourceInitSuccess) { + throw new RuntimeException("Resource cannot be initialized"); + } + } + } + + private void initPlan(String planJson) { + plan = CoreGsonHelper.fromJson(planJson, LogicalNode.class); + } + + private void initCodeGeneration() throws PlanningException { + if (context.getBool(SessionVars.CODEGEN)) { + codeGenEnabled = true; + classLoader = new TajoClassLoader(); + compilationContext = new ExecutorPreCompiler.CompilationContext(classLoader); + ExecutorPreCompiler.compile(compilationContext, plan); + } + } + + public boolean awaitInitializedResource() throws InterruptedException { + initializedResourceLatch.await(); + return resourceInitSuccess; + } + + public LogicalNode getPlan() { + return this.plan; + } + + public EvalNode compileEval(Schema schema, EvalNode eval) { + return compilationContext.getCompiler().compile(schema, eval); + } + + public EvalNode getPreCompiledEval(Schema schema, EvalNode eval) { + if (codeGenEnabled) { + + Pair<Schema, EvalNode> key = new Pair<Schema, EvalNode>(schema, eval); + if (compilationContext.getPrecompiedEvals().containsKey(key)) { + return compilationContext.getPrecompiedEvals().get(key); + } else { + try { + LOG.warn(eval.toString() + " does not exists. Immediately compile it: " + eval); + return compileEval(schema, eval); + } catch (Throwable t) { + LOG.warn(t); + return eval; + } + } + } else { + throw new IllegalStateException("CodeGen is disabled"); + } + } + + public void release() { + compilationContext = null; + + if (classLoader != null) { + try { + classLoader.clean(); + } catch (Throwable throwable) { + throwable.printStackTrace(); + } + classLoader = null; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java index c7e513d..aaff69c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java @@ -31,10 +31,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.TajoMasterProtocol; -import org.apache.tajo.master.ContainerProxy; -import org.apache.tajo.master.TajoContainerProxy; -import org.apache.tajo.master.TaskRunnerGroupEvent; -import org.apache.tajo.master.TaskRunnerLauncher; +import org.apache.tajo.master.*; import org.apache.tajo.master.event.ContainerAllocationEvent; import org.apache.tajo.master.event.ContainerAllocatorEventType; import org.apache.tajo.master.event.SubQueryContainerAllocationEvent; @@ -143,19 +140,20 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { @Override public void handle(TaskRunnerGroupEvent event) { if (event.getType() == TaskRunnerGroupEvent.EventType.CONTAINER_REMOTE_LAUNCH) { - launchTaskRunners(event.getExecutionBlockId(), event.getContainers()); + LaunchTaskRunnersEvent launchEvent = (LaunchTaskRunnersEvent) event; + launchTaskRunners(launchEvent); } else if (event.getType() == TaskRunnerGroupEvent.EventType.CONTAINER_REMOTE_CLEANUP) { stopContainers(event.getContainers()); } } } - private void launchTaskRunners(ExecutionBlockId executionBlockId, Collection<Container> containers) { + private void launchTaskRunners(LaunchTaskRunnersEvent event) { // Query in standby mode doesn't need launch Worker. // But, Assign ExecutionBlock to assigned tajo worker - for(Container eachContainer: containers) { + for(Container eachContainer: event.getContainers()) { TajoContainerProxy containerProxy = new TajoContainerProxy(queryTaskContext, tajoConf, - eachContainer, executionBlockId); + eachContainer, event.getQueryContext(), event.getExecutionBlockId(), event.getPlanJson()); executorService.submit(new LaunchRunner(eachContainer.getId(), containerProxy)); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index d8d09e1..f76176d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -28,12 +28,14 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.shell.PathData; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.yarn.util.RackResolver; +import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryId; import org.apache.tajo.TajoConstants; import org.apache.tajo.TajoProtos; import org.apache.tajo.catalog.CatalogClient; import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.master.ha.TajoMasterInfo; import org.apache.tajo.master.querymaster.QueryMaster; @@ -55,6 +57,7 @@ import java.lang.management.ThreadMXBean; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -207,7 +210,7 @@ public class TajoWorker extends CompositeService { addService(pullService); } - if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE")) { + if (!systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) { try { httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_INFO_ADDRESS).getPort(); if(queryMasterMode && !taskRunnerMode) { @@ -349,6 +352,9 @@ public class TajoWorker extends CompositeService { } public class WorkerContext { + private ConcurrentHashMap<ExecutionBlockId, ExecutionBlockSharedResource> sharedResourceMap = + new ConcurrentHashMap<ExecutionBlockId, ExecutionBlockSharedResource>(); + public QueryMaster getQueryMaster() { if(queryMasterManagerService == null) { return null; @@ -356,6 +362,10 @@ public class TajoWorker extends CompositeService { return queryMasterManagerService.getQueryMaster(); } + public TajoConf getConf() { + return systemConf; + } + public TajoWorkerManagerService getTajoWorkerManagerService() { return tajoWorkerManagerService; } @@ -398,6 +408,25 @@ public class TajoWorker extends CompositeService { } } + public void initSharedResource(QueryContext queryContext, ExecutionBlockId blockId, String planJson) + throws InterruptedException { + + if (!sharedResourceMap.containsKey(blockId)) { + ExecutionBlockSharedResource resource = new ExecutionBlockSharedResource(); + if (sharedResourceMap.putIfAbsent(blockId, resource) == null) { + resource.initialize(queryContext, planJson); + } + } + } + + public ExecutionBlockSharedResource getSharedResource(ExecutionBlockId blockId) { + return sharedResourceMap.get(blockId); + } + + public void releaseSharedResource(ExecutionBlockId blockId) { + sharedResourceMap.remove(blockId).release(); + } + protected void cleanup(String strPath) { if(deletionService == null) return; http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java index c5f1446..fa116c3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java @@ -25,16 +25,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.CompositeService; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryId; -import org.apache.tajo.QueryUnitAttemptId; -import org.apache.tajo.TajoIdProtos; +import org.apache.tajo.*; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.ipc.TajoWorkerProtocol.ExecutionBlockReport; import org.apache.tajo.rpc.AsyncRpcServer; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.util.NetUtils; +import org.apache.tajo.util.TajoIdUtils; import java.net.InetSocketAddress; @@ -118,7 +116,12 @@ public class TajoWorkerManagerService extends CompositeService TajoWorkerProtocol.RunExecutionBlockRequestProto request, RpcCallback<PrimitiveProtos.BoolProto> done) { workerContext.getWorkerSystemMetrics().counter("query", "executedExecutionBlocksNum").inc(); + try { + workerContext.initSharedResource( + new QueryContext(workerContext.getConf(), request.getQueryContext()), + TajoIdUtils.createExecutionBlockId(request.getExecutionBlockId()), request.getPlanJson()); + String[] params = new String[7]; params[0] = "standby"; //mode(never used) params[1] = request.getExecutionBlockId(); @@ -132,8 +135,8 @@ public class TajoWorkerManagerService extends CompositeService params[6] = request.getQueryOutputPath(); workerContext.getTaskRunnerManager().startTask(params); done.run(TajoWorker.TRUE_PROTO); - } catch (Exception e) { - LOG.error(e.getMessage(), e); + } catch (Throwable t) { + LOG.error(t.getMessage(), t); done.run(TajoWorker.FALSE_PROTO); } } @@ -163,6 +166,9 @@ public class TajoWorkerManagerService extends CompositeService workerContext.cleanup(inputDir); String outputDir = TaskRunner.getBaseOutputDir(new ExecutionBlockId(executionBlockIdProto)).toString(); workerContext.cleanup(outputDir); + + // Release shared resources + workerContext.releaseSharedResource(new ExecutionBlockId(executionBlockIdProto)); } done.run(TajoWorker.TRUE_PROTO); } http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 1881685..d0665ae 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -157,6 +157,20 @@ public class Task { this.reporter = new Reporter(taskId, masterProxy); this.reporter.startCommunicationThread(); + + // resource intiailization + boolean resourceInitialized = false; + try { + resourceInitialized = context.getSharedResource().awaitInitializedResource(); + } catch (InterruptedException e) { + LOG.error("Failed Resource Initialization", e); + } finally { + if (!resourceInitialized) { + setState(TaskAttemptState.TA_FAILED); + return; + } + } + plan = CoreGsonHelper.fromJson(request.getSerializedData(), LogicalNode.class); LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN); if (scanNode != null) { @@ -218,25 +232,27 @@ public class Task { } public void init() throws IOException { - // initialize a task temporal dir - localFS.mkdirs(taskDir); - - if (request.getFetches().size() > 0) { - inputTableBaseDir = localFS.makeQualified( - lDirAllocator.getLocalPathForWrite( - getTaskAttemptDir(context.getTaskId()).toString(), systemConf)); - localFS.mkdirs(inputTableBaseDir); - Path tableDir; - for (String inputTable : context.getInputTables()) { - tableDir = new Path(inputTableBaseDir, inputTable); - if (!localFS.exists(tableDir)) { - LOG.info("the directory is created " + tableDir.toUri()); - localFS.mkdirs(tableDir); + if (context.getState() == TaskAttemptState.TA_PENDING) { + // initialize a task temporal dir + localFS.mkdirs(taskDir); + + if (request.getFetches().size() > 0) { + inputTableBaseDir = localFS.makeQualified( + lDirAllocator.getLocalPathForWrite( + getTaskAttemptDir(context.getTaskId()).toString(), systemConf)); + localFS.mkdirs(inputTableBaseDir); + Path tableDir; + for (String inputTable : context.getInputTables()) { + tableDir = new Path(inputTableBaseDir, inputTable); + if (!localFS.exists(tableDir)) { + LOG.info("the directory is created " + tableDir.toUri()); + localFS.mkdirs(tableDir); + } } } + // for localizing the intermediate data + localize(request); } - // for localizing the intermediate data - localize(request); } public QueryUnitAttemptId getTaskId() { @@ -426,14 +442,21 @@ public class Task { while(!killed && executor.next() != null) { } - this.executor.close(); - reloadInputStats(); - this.executor = null; } catch (Exception e) { error = e ; LOG.error(e.getMessage(), e); aborted = true; } finally { + if (executor != null) { + try { + executor.close(); + reloadInputStats(); + } catch (IOException e) { + e.printStackTrace(); + } + this.executor = null; + } + context.setProgress(1.0f); taskRunnerContext.completedTasksNum.incrementAndGet(); context.getHashShuffleAppenderManager().finalizeTask(taskId); http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java index 6cb4bd7..d27fd6d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java @@ -26,8 +26,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.tajo.QueryUnitAttemptId; import org.apache.tajo.TajoProtos.TaskAttemptState; +import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.eval.EvalNode; import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.query.QueryContext; @@ -75,16 +77,23 @@ public class TaskAttemptContext { private Enforcer enforcer; private QueryContext queryContext; private WorkerContext workerContext; + private ExecutionBlockSharedResource sharedResource; /** a output volume for each partition */ private Map<Integer, Long> partitionOutputVolume; private HashShuffleAppenderManager hashShuffleAppenderManager; - public TaskAttemptContext(QueryContext queryContext, final WorkerContext workerContext, final QueryUnitAttemptId queryId, + public TaskAttemptContext(QueryContext queryContext, final WorkerContext workerContext, + final QueryUnitAttemptId queryId, final FragmentProto[] fragments, final Path workDir) { this.queryContext = queryContext; - this.workerContext = workerContext; + + if (workerContext != null) { // For unit tests + this.workerContext = workerContext; + this.sharedResource = workerContext.getSharedResource(queryId.getQueryUnitId().getExecutionBlockId()); + } + this.queryId = queryId; if (fragments != null) { @@ -154,6 +163,23 @@ public class TaskAttemptContext { return this.enforcer; } + public ExecutionBlockSharedResource getSharedResource() { + return sharedResource; + } + + public EvalNode compileEval(Schema schema, EvalNode eval) { + return sharedResource.compileEval(schema, eval); + } + + public EvalNode getPrecompiledEval(Schema schema, EvalNode eval) { + if (sharedResource != null) { + return sharedResource.getPreCompiledEval(schema, eval); + } else { + LOG.debug("Shared resource is not initialized. It is NORMAL in unit tests"); + return eval; + } + } + public boolean hasResultStats() { return resultStats != null; } http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/proto/TajoWorkerProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto index cdb1438..e100c48 100644 --- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto +++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto @@ -207,6 +207,9 @@ message RunExecutionBlockRequestProto { required string nodeId = 4; required string containerId = 5; optional string queryOutputPath = 6; + + required KeyValueSetProto queryContext = 7; + required string planJson = 8; } message ExecutionBlockListProto { http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java index 271ba70..4d9ca67 100644 --- a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java +++ b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java @@ -33,6 +33,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.master.session.Session; +import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.TajoIdUtils; @@ -66,12 +67,15 @@ public class LocalTajoTestingUtility { public static QueryUnitAttemptId newQueryUnitAttemptId(MasterPlan plan) { return QueryIdFactory.newQueryUnitAttemptId(QueryIdFactory.newQueryUnitId(plan.newExecutionBlockId()), 0); } + public static Session createDummySession() { return new Session(UUID.randomUUID().toString(), dummyUserInfo.getUserName(), TajoConstants.DEFAULT_DATABASE_NAME); } public static QueryContext createDummyContext(TajoConf conf) { - return new QueryContext(conf, createDummySession()); + QueryContext context = new QueryContext(conf, createDummySession()); + context.putAll(CommonTestingUtil.getSessionVarsForTest().getAllKeyValus()); + return context; } /** http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index 7b87112..346fa69 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -92,9 +92,16 @@ public class TajoTestingCluster { public TajoTestingCluster(boolean masterHaEMode) { this.conf = new TajoConf(); this.conf.setBoolVar(ConfVars.TAJO_MASTER_HA_ENABLE, masterHaEMode); + + setTestingFlagProperties(); initPropertiesAndConfigs(); } + void setTestingFlagProperties() { + System.setProperty(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE); + conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE); + } + void initPropertiesAndConfigs() { if (System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname) != null) { String testResourceManager = System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname); @@ -106,7 +113,6 @@ public class TajoTestingCluster { this.standbyWorkerMode = conf.getVar(ConfVars.RESOURCE_MANAGER_CLASS) .indexOf(TajoWorkerResourceManager.class.getName()) >= 0; - conf.set(CommonTestingUtil.TAJO_TEST, "TRUE"); } public TajoConf getConfiguration() {
