TAJO-1436: Add Bind method to EvalNode. Closes #456
Signed-off-by: Jihoon Son <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/f4c9e54c Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/f4c9e54c Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/f4c9e54c Branch: refs/heads/index_support Commit: f4c9e54c88fb9e69023da2b0a74ead76ad2a6f8d Parents: bda2d62 Author: navis.ryu <[email protected]> Authored: Thu Apr 2 20:04:27 2015 +0900 Committer: Jihoon Son <[email protected]> Committed: Thu Apr 2 20:04:27 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../java/org/apache/tajo/conf/TajoConf.java | 2 +- .../tajo/engine/codegen/CaseWhenEmitter.java | 4 +- .../tajo/engine/codegen/EvalCodeGenContext.java | 2 +- .../tajo/engine/codegen/EvalCodeGenerator.java | 22 ++---- .../apache/tajo/engine/planner/Projector.java | 9 ++- .../planner/physical/AggregationExec.java | 11 ++- .../engine/planner/physical/BNLJoinExec.java | 43 +---------- .../planner/physical/BSTIndexScanExec.java | 6 +- .../engine/planner/physical/CommonJoinExec.java | 79 ++++++++++++++++++++ .../DistinctGroupbyFirstAggregationExec.java | 20 +++-- .../DistinctGroupbyHashAggregationExec.java | 33 ++++---- .../DistinctGroupbySecondAggregationExec.java | 8 +- .../DistinctGroupbyThirdAggregationExec.java | 7 +- .../engine/planner/physical/EvalExprExec.java | 6 +- .../planner/physical/HashAggregateExec.java | 4 +- .../planner/physical/HashFullOuterJoinExec.java | 33 +------- .../engine/planner/physical/HashJoinExec.java | 32 +------- .../planner/physical/HashLeftAntiJoinExec.java | 2 +- .../planner/physical/HashLeftOuterJoinExec.java | 9 ++- .../planner/physical/HashLeftSemiJoinExec.java | 14 +--- .../engine/planner/physical/HavingExec.java | 8 +- .../physical/MergeFullOuterJoinExec.java | 34 +-------- .../engine/planner/physical/MergeJoinExec.java | 30 +------- .../engine/planner/physical/NLJoinExec.java | 30 +------- .../planner/physical/NLLeftOuterJoinExec.java | 28 +------ .../physical/RightOuterMergeJoinExec.java | 35 +-------- .../engine/planner/physical/SelectionExec.java | 8 +- .../engine/planner/physical/SeqScanExec.java | 31 +++++--- .../planner/physical/SortAggregateExec.java | 6 +- .../engine/planner/physical/WindowAggExec.java | 11 ++- .../org/apache/tajo/engine/utils/TupleUtil.java | 4 +- .../NonForwardQueryResultSystemScanner.java | 2 +- .../apache/tajo/master/exec/QueryExecutor.java | 2 +- .../apache/tajo/engine/eval/ExprTestBase.java | 3 +- .../apache/tajo/engine/eval/TestEvalTree.java | 68 +++++++++-------- .../tajo/engine/eval/TestEvalTreeUtil.java | 12 +-- .../org/apache/tajo/plan/LogicalPlanner.java | 2 +- .../plan/expr/AggregationFunctionCallEval.java | 22 ++---- .../apache/tajo/plan/expr/AlgebraicUtil.java | 6 +- .../tajo/plan/expr/BetweenPredicateEval.java | 79 +++++++++++++------- .../org/apache/tajo/plan/expr/BinaryEval.java | 10 ++- .../org/apache/tajo/plan/expr/CaseWhenEval.java | 23 +++--- .../org/apache/tajo/plan/expr/CastEval.java | 7 +- .../org/apache/tajo/plan/expr/ConstEval.java | 7 +- .../org/apache/tajo/plan/expr/EvalNode.java | 12 ++- .../org/apache/tajo/plan/expr/EvalTreeUtil.java | 3 +- .../org/apache/tajo/plan/expr/FieldEval.java | 29 +++---- .../org/apache/tajo/plan/expr/FunctionEval.java | 50 +++++++------ .../tajo/plan/expr/GeneralFunctionEval.java | 25 ++----- .../java/org/apache/tajo/plan/expr/InEval.java | 5 +- .../org/apache/tajo/plan/expr/IsNullEval.java | 6 +- .../java/org/apache/tajo/plan/expr/NotEval.java | 6 +- .../tajo/plan/expr/PartialBinaryExpr.java | 5 +- .../plan/expr/PatternMatchPredicateEval.java | 12 +-- .../apache/tajo/plan/expr/RowConstantEval.java | 4 +- .../org/apache/tajo/plan/expr/SignedEval.java | 6 +- .../org/apache/tajo/plan/expr/UnaryEval.java | 3 +- .../tajo/plan/expr/WindowFunctionEval.java | 21 +----- .../plan/exprrewrite/rules/ConstantFolding.java | 17 +++-- .../rewrite/rules/PartitionedTableRewriter.java | 3 +- 61 files changed, 463 insertions(+), 561 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 6eca36d..26b24a9 100644 --- a/CHANGES +++ b/CHANGES @@ -11,6 +11,9 @@ Release 0.11.0 - unreleased IMPROVEMENT + TAJO-1436: Add Bind method to EvalNode. (Contributed by navis, + Committed by jihoon) + TAJO-1495: Clean up CatalogStore. (jaehwa) TAJO-1460: Apply TAJO-1407 to ExternalSortExec. (Contributed by navis, http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 53773d8..221b341 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -326,7 +326,7 @@ public class TajoConf extends Configuration { $EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD("tajo.executor.groupby.in-memory-hash-threshold-bytes", (long)256 * 1048576), $MAX_OUTPUT_FILE_SIZE("tajo.query.max-outfile-size-mb", 0), // zero means infinite - $CODEGEN("tajo.executor.codegen.enabled", false), // Runtime code generation + $CODEGEN("tajo.executor.codegen.enabled", false), // Runtime code generation (todo this is broken) // Client ----------------------------------------------------------------- $CLIENT_SESSION_EXPIRY_TIME("tajo.client.session.expiry-time-sec", 3600), // default time is one hour. http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/codegen/CaseWhenEmitter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/CaseWhenEmitter.java b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/CaseWhenEmitter.java index 611d815..d09b290 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/CaseWhenEmitter.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/CaseWhenEmitter.java @@ -197,9 +197,9 @@ class CaseWhenEmitter { BinaryEval bin = (BinaryEval) predicate; if (bin.getLeftExpr().getType() == EvalType.CONST) { - return bin.getLeftExpr().eval(null, null).asInt4(); + return bin.getLeftExpr().eval(null).asInt4(); } else { - return bin.getRightExpr().eval(null, null).asInt4(); + return bin.getRightExpr().eval(null).asInt4(); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenContext.java b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenContext.java index 8384de7..c8197b7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenContext.java @@ -54,7 +54,7 @@ public class EvalCodeGenContext extends TajoGeneratorAdapter { emitConstructor(); String methodName = "eval"; - String methodDesc = TajoGeneratorAdapter.getMethodDescription(Datum.class, new Class[]{Schema.class, Tuple.class}); + String methodDesc = TajoGeneratorAdapter.getMethodDescription(Datum.class, new Class[]{Tuple.class}); MethodVisitor evalMethod = classWriter.visitMethod(Opcodes.ACC_PUBLIC, methodName, methodDesc, null, null); evalMethod.visitCode(); this.methodvisitor = evalMethod; http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenerator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenerator.java b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenerator.java index 1cb3755..cc740e7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenerator.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenerator.java @@ -364,7 +364,7 @@ public class EvalCodeGenerator extends SimpleEvalNodeVisitor<EvalCodeGenContext> fieldIdx = context.schema.getColumnIdByName(columnRef.getSimpleName()); } - context.methodvisitor.visitVarInsn(Opcodes.ALOAD, 2); + context.methodvisitor.visitVarInsn(Opcodes.ALOAD, 1); context.push(fieldIdx); context.invokeInterface(Tuple.class, "isNull", boolean.class, new Class [] {int.class}); @@ -434,7 +434,7 @@ public class EvalCodeGenerator extends SimpleEvalNodeVisitor<EvalCodeGenContext> throw new InvalidEvalException(field.getValueType() + " is not supported yet"); } - context.methodvisitor.visitVarInsn(Opcodes.ALOAD, 2); + context.methodvisitor.visitVarInsn(Opcodes.ALOAD, 1); context.push(fieldIdx); context.invokeInterface(Tuple.class, methodName, returnType, paramTypes); @@ -740,13 +740,8 @@ public class EvalCodeGenerator extends SimpleEvalNodeVisitor<EvalCodeGenContext> public EvalNode visitInPredicate(EvalCodeGenContext context, EvalNode patternEval, Stack<EvalNode> stack) { String fieldName = context.symbols.get(patternEval); emitGetField(context, context.owner, fieldName, InEval.class); - if (context.schema != null) { - emitGetField(context, context.owner, "schema", Schema.class); - } else { - context.methodvisitor.visitInsn(Opcodes.ACONST_NULL); - } - context.aload(2); // tuple - context.invokeVirtual(InEval.class, "eval", Datum.class, new Class[]{Schema.class, Tuple.class}); + context.aload(1); // tuple + context.invokeVirtual(InEval.class, "eval", Datum.class, new Class[]{Tuple.class}); context.convertToPrimitive(patternEval.getValueType()); return patternEval; @@ -756,13 +751,8 @@ public class EvalCodeGenerator extends SimpleEvalNodeVisitor<EvalCodeGenContext> Class clazz = getStringPatternEvalClass(patternEval.getType()); String fieldName = context.symbols.get(patternEval); emitGetField(context, context.owner, fieldName, clazz); - if (context.schema != null) { - emitGetField(context, context.owner, "schema", Schema.class); - } else { - context.methodvisitor.visitInsn(Opcodes.ACONST_NULL); - } - context.aload(2); // tuple - context.invokeVirtual(clazz, "eval", Datum.class, new Class[]{Schema.class, Tuple.class}); + context.aload(1); // tuple + context.invokeVirtual(clazz, "eval", Datum.class, new Class[]{Tuple.class}); context.convertToPrimitive(patternEval.getValueType()); return patternEval; http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java index 7c2e81f..cec1862 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java @@ -58,11 +58,18 @@ public class Projector { evals[i] = this.targets[i].getEvalTree(); } } + init(); + } + + public void init() { + for (EvalNode eval : evals) { + eval.bind(inSchema); + } } public void eval(Tuple in, Tuple out) { for (int i = 0; i < evals.length; i++) { - out.put(i, evals[i].eval(inSchema, in)); + out.put(i, evals[i].eval(in)); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java index a4b9fe4..6d9e38a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java @@ -19,8 +19,8 @@ package org.apache.tajo.engine.planner.physical; import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.Schema; import org.apache.tajo.plan.expr.AggregationFunctionCallEval; +import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.logical.GroupbyNode; import org.apache.tajo.worker.TaskAttemptContext; @@ -29,7 +29,7 @@ import java.io.IOException; public abstract class AggregationExec extends UnaryPhysicalExec { protected final int groupingKeyNum; - protected int groupingKeyIds[]; + protected final int groupingKeyIds[]; protected final int aggFunctionsNum; protected final AggregationFunctionCallEval aggFunctions[]; @@ -60,7 +60,10 @@ public abstract class AggregationExec extends UnaryPhysicalExec { } @Override - public void close() throws IOException { - super.close(); + public void init() throws IOException { + super.init(); + for (EvalNode aggFunction : aggFunctions) { + aggFunction.bind(inSchema); + } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java index 8b76097..6e1a553 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java @@ -18,9 +18,7 @@ package org.apache.tajo.engine.planner.physical; -import org.apache.tajo.engine.planner.Projector; import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.storage.FrameTuple; import org.apache.tajo.storage.Tuple; @@ -32,11 +30,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -public class BNLJoinExec extends BinaryPhysicalExec { - // from logical plan - private JoinNode plan; - private final boolean hasJoinQual; - private EvalNode joinQual; +public class BNLJoinExec extends CommonJoinExec { private List<Tuple> leftTupleSlots; private List<Tuple> rightTupleSlots; @@ -54,19 +48,9 @@ public class BNLJoinExec extends BinaryPhysicalExec { private final static int TUPLE_SLOT_SIZE = 10000; - // projection - private Projector projector; - public BNLJoinExec(final TaskAttemptContext context, final JoinNode plan, final PhysicalExec leftExec, PhysicalExec rightExec) { - super(context, plan.getInSchema(), plan.getOutSchema(), leftExec, rightExec); - this.plan = plan; - this.joinQual = plan.getJoinQual(); - if (joinQual != null) { // if join type is not 'cross join' - hasJoinQual = true; - } else { - hasJoinQual = false; - } + super(context, plan, leftExec, rightExec); this.leftTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE); this.rightTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE); this.leftIterator = leftTupleSlots.iterator(); @@ -79,24 +63,11 @@ public class BNLJoinExec extends BinaryPhysicalExec { plan.setTargets(PlannerUtil.schemaToTargets(outSchema)); } - projector = new Projector(context, inSchema, outSchema, plan.getTargets()); - // for join frameTuple = new FrameTuple(); outputTuple = new VTuple(outSchema.size()); } - @Override - protected void compile() { - if (hasJoinQual) { - joinQual = context.getPrecompiledEval(inSchema, joinQual); - } - } - - public JoinNode getPlan() { - return plan; - } - public Tuple next() throws IOException { if (leftTupleSlots.isEmpty()) { @@ -191,12 +162,7 @@ public class BNLJoinExec extends BinaryPhysicalExec { } frameTuple.set(leftTuple, rightIterator.next()); - if (hasJoinQual) { - if (joinQual.eval(inSchema, frameTuple).isTrue()) { - projector.eval(frameTuple, outputTuple); - return outputTuple; - } - } else { + if (!hasJoinQual || joinQual.eval(frameTuple).isTrue()) { projector.eval(frameTuple, outputTuple); return outputTuple; } @@ -224,8 +190,5 @@ public class BNLJoinExec extends BinaryPhysicalExec { leftTupleSlots = null; rightIterator = null; leftIterator = null; - plan = null; - joinQual = null; - projector = null; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java index 6adc523..be6c046 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java @@ -69,7 +69,11 @@ public class BSTIndexScanExec extends PhysicalExec { @Override public void init() throws IOException { + super.init(); progress = 0.0f; + if (qual != null) { + qual.bind(inSchema); + } } @Override @@ -111,7 +115,7 @@ public class BSTIndexScanExec extends PhysicalExec { } } else { while(reader.isCurInMemory() && (tuple = fileScanner.next()) != null) { - if (qual.eval(inSchema, tuple).isTrue()) { + if (qual.eval(tuple).isTrue()) { projector.eval(tuple, outTuple); return outTuple; } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java new file mode 100644 index 0000000..0781041 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.tajo.catalog.SchemaUtil; +import org.apache.tajo.engine.planner.Projector; +import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.logical.JoinNode; +import org.apache.tajo.worker.TaskAttemptContext; + +import java.io.IOException; + +// common join exec except HashLeftOuterJoinExec +public abstract class CommonJoinExec extends BinaryPhysicalExec { + + // from logical plan + protected JoinNode plan; + protected final boolean hasJoinQual; + + protected EvalNode joinQual; + + // projection + protected Projector projector; + + public CommonJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, + PhysicalExec inner) { + super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()), + plan.getOutSchema(), outer, inner); + this.plan = plan; + this.joinQual = plan.getJoinQual(); + this.hasJoinQual = plan.hasJoinQual(); + + // for projection + this.projector = new Projector(context, inSchema, outSchema, plan.getTargets()); + } + + @Override + public void init() throws IOException { + super.init(); + if (hasJoinQual) { + joinQual.bind(inSchema); + } + } + + @Override + protected void compile() { + if (hasJoinQual) { + joinQual = context.getPrecompiledEval(inSchema, joinQual); + } + } + + public JoinNode getPlan() { + return plan; + } + + @Override + public void close() throws IOException { + super.close(); + plan = null; + joinQual = null; + projector = null; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java index 2c6cc7e..37bc5a7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java @@ -84,17 +84,15 @@ import java.util.Map.Entry; * */ -public class DistinctGroupbyFirstAggregationExec extends PhysicalExec { +public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec { private static Log LOG = LogFactory.getLog(DistinctGroupbyFirstAggregationExec.class); private DistinctGroupbyNode plan; private boolean finished = false; private boolean preparedData = false; - private PhysicalExec child; private long totalNumRows; private int fetchedRows; - private float progress; private int[] groupingKeyIndexes; private NonDistinctHashAggregator nonDistinctHashAggregator; @@ -104,15 +102,13 @@ public class DistinctGroupbyFirstAggregationExec extends PhysicalExec { public DistinctGroupbyFirstAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, PhysicalExec subOp) throws IOException { - super(context, plan.getInSchema(), plan.getOutSchema()); - this.child = subOp; + super(context, plan.getInSchema(), plan.getOutSchema(), subOp); this.plan = plan; } @Override public void init() throws IOException { super.init(); - child.init(); // finding grouping column index Column[] groupingColumns = plan.getGroupingColumns(); @@ -248,14 +244,16 @@ public class DistinctGroupbyFirstAggregationExec extends PhysicalExec { if (groupbyNode.hasAggFunctions()) { aggFunctions = groupbyNode.getAggFunctions(); aggFunctionsNum = aggFunctions.length; - for (AggregationFunctionCallEval eachFunction: aggFunctions) { - eachFunction.setFirstPhase(); - } } else { aggFunctions = new AggregationFunctionCallEval[0]; aggFunctionsNum = 0; } + for (AggregationFunctionCallEval eachFunction: aggFunctions) { + eachFunction.bind(inSchema); + eachFunction.setFirstPhase(); + } + dummyTuple = new VTuple(aggFunctionsNum); for (int i = 0; i < aggFunctionsNum; i++) { dummyTuple.put(i, NullDatum.get()); @@ -267,13 +265,13 @@ public class DistinctGroupbyFirstAggregationExec extends PhysicalExec { FunctionContext[] contexts = nonDistinctAggrDatas.get(groupingKeyTuple); if (contexts != null) { for (int i = 0; i < aggFunctions.length; i++) { - aggFunctions[i].merge(contexts[i], inSchema, tuple); + aggFunctions[i].merge(contexts[i], tuple); } } else { // if the key occurs firstly contexts = new FunctionContext[aggFunctionsNum]; for (int i = 0; i < aggFunctionsNum; i++) { contexts[i] = aggFunctions[i].newContext(); - aggFunctions[i].merge(contexts[i], inSchema, tuple); + aggFunctions[i].merge(contexts[i], tuple); } nonDistinctAggrDatas.put(groupingKeyTuple, contexts); } http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java index d3178db..e96e750 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java @@ -24,6 +24,7 @@ import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; import org.apache.tajo.plan.expr.AggregationFunctionCallEval; +import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.function.FunctionContext; import org.apache.tajo.plan.logical.DistinctGroupbyNode; import org.apache.tajo.plan.logical.GroupbyNode; @@ -35,27 +36,29 @@ import java.io.IOException; import java.util.*; import java.util.Map.Entry; -public class DistinctGroupbyHashAggregationExec extends PhysicalExec { +public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec { private boolean finished = false; + private DistinctGroupbyNode plan; private HashAggregator[] hashAggregators; - private PhysicalExec child; private int distinctGroupingKeyIds[]; private boolean first = true; private int groupbyNodeNum; private int outputColumnNum; private int totalNumRows; private int fetchedRows; - private float progress; private int[] resultColumnIdIndexes; public DistinctGroupbyHashAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, PhysicalExec subOp) throws IOException { - super(context, plan.getInSchema(), plan.getOutSchema()); + super(context, plan.getInSchema(), plan.getOutSchema(), subOp); + this.plan = plan; + } - this.child = subOp; - this.child.init(); + @Override + public void init() throws IOException { + super.init(); List<Integer> distinctGroupingKeyIdList = new ArrayList<Integer>(); for (Column col: plan.getGroupingColumns()) { @@ -72,7 +75,7 @@ public class DistinctGroupbyHashAggregationExec extends PhysicalExec { int idx = 0; distinctGroupingKeyIds = new int[distinctGroupingKeyIdList.size()]; for (Integer intVal: distinctGroupingKeyIdList) { - distinctGroupingKeyIds[idx++] = intVal.intValue(); + distinctGroupingKeyIds[idx++] = intVal; } List<GroupbyNode> groupbyNodes = plan.getSubPlans(); @@ -81,7 +84,7 @@ public class DistinctGroupbyHashAggregationExec extends PhysicalExec { int index = 0; for (GroupbyNode eachGroupby: groupbyNodes) { - hashAggregators[index++] = new HashAggregator(eachGroupby); + hashAggregators[index++] = new HashAggregator(eachGroupby, inSchema); } outputColumnNum = plan.getOutSchema().size(); @@ -295,10 +298,6 @@ public class DistinctGroupbyHashAggregationExec extends PhysicalExec { } } - @Override - public void init() throws IOException { - } - public void rescan() throws IOException { finished = false; for (int i = 0; i < hashAggregators.length; i++) { @@ -337,7 +336,7 @@ public class DistinctGroupbyHashAggregationExec extends PhysicalExec { int tupleSize; - public HashAggregator(GroupbyNode groupbyNode) throws IOException { + public HashAggregator(GroupbyNode groupbyNode, Schema schema) throws IOException { hashTable = new HashMap<Tuple, Map<Tuple, FunctionContext[]>>(10000); @@ -375,6 +374,10 @@ public class DistinctGroupbyHashAggregationExec extends PhysicalExec { aggFunctionsNum = 0; } + for (EvalNode aggFunction : aggFunctions) { + aggFunction.bind(schema); + } + tupleSize = groupingKeyIds.length + aggFunctionsNum; } @@ -401,13 +404,13 @@ public class DistinctGroupbyHashAggregationExec extends PhysicalExec { FunctionContext[] contexts = distinctEntry.get(keyTuple); if (contexts != null) { for (int i = 0; i < aggFunctions.length; i++) { - aggFunctions[i].merge(contexts[i], inSchema, tuple); + aggFunctions[i].merge(contexts[i], tuple); } } else { // if the key occurs firstly contexts = new FunctionContext[aggFunctionsNum]; for (int i = 0; i < aggFunctionsNum; i++) { contexts[i] = aggFunctions[i].newContext(); - aggFunctions[i].merge(contexts[i], inSchema, tuple); + aggFunctions[i].merge(contexts[i], tuple); } distinctEntry.put(keyTuple, contexts); } http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java index cce9a24..7b01a9b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java @@ -77,7 +77,6 @@ package org.apache.tajo.engine.planner.physical; public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec { private static Log LOG = LogFactory.getLog(DistinctGroupbySecondAggregationExec.class); private DistinctGroupbyNode plan; - private PhysicalExec child; private boolean finished = false; @@ -91,13 +90,11 @@ public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec { throws IOException { super(context, plan.getInSchema(), plan.getOutSchema(), sortExec); this.plan = plan; - this.child = sortExec; } @Override public void init() throws IOException { - this.child.init(); - + super.init(); numGroupingColumns = plan.getGroupingColumns().length; List<GroupbyNode> groupbyNodes = plan.getSubPlans(); @@ -122,6 +119,7 @@ public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec { nonDistinctAggrFunctions = eachGroupby.getAggFunctions(); if (nonDistinctAggrFunctions != null) { for (AggregationFunctionCallEval eachFunction: nonDistinctAggrFunctions) { + eachFunction.bind(inSchema); eachFunction.setIntermediatePhase(); } nonDistinctAggrContexts = new FunctionContext[nonDistinctAggrFunctions.length]; @@ -252,7 +250,7 @@ public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec { return; } for (int i = 0; i < nonDistinctAggrFunctions.length; i++) { - nonDistinctAggrFunctions[i].merge(nonDistinctAggrContexts[i], inSchema, tuple); + nonDistinctAggrFunctions[i].merge(nonDistinctAggrContexts[i], tuple); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java index 26f09da..7bd71e2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java @@ -40,7 +40,6 @@ import java.util.*; public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec { private static Log LOG = LogFactory.getLog(DistinctGroupbyThirdAggregationExec.class); private DistinctGroupbyNode plan; - private PhysicalExec child; private boolean finished = false; @@ -56,12 +55,11 @@ public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec { throws IOException { super(context, plan.getInSchema(), plan.getOutSchema(), sortExec); this.plan = plan; - this.child = sortExec; } @Override public void init() throws IOException { - this.child.init(); + super.init(); numGroupingColumns = plan.getGroupingColumns().length; resultTupleLength = numGroupingColumns; @@ -254,6 +252,7 @@ public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec { aggrFunctions = groupbyNode.getAggFunctions(); if (aggrFunctions != null) { for (AggregationFunctionCallEval eachFunction: aggrFunctions) { + eachFunction.bind(inSchema); eachFunction.setFinalPhase(); } } @@ -269,7 +268,7 @@ public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec { public void merge(Tuple tuple) { for (int i = 0; i < aggrFunctions.length; i++) { - aggrFunctions[i].merge(functionContexts[i], inSchema, tuple); + aggrFunctions[i].merge(functionContexts[i], tuple); } if (seq == 0 && nonDistinctAggr != null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java index 4e02e67..32ec772 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java @@ -38,7 +38,11 @@ public class EvalExprExec extends PhysicalExec { @Override public void init() throws IOException { + super.init(); progress = 0.0f; + for (Target target : plan.getTargets()) { + target.getEvalTree().bind(inSchema); + } } @Override @@ -47,7 +51,7 @@ public class EvalExprExec extends PhysicalExec { Target [] targets = plan.getTargets(); Tuple t = new VTuple(targets.length); for (int i = 0; i < targets.length; i++) { - t.put(i, targets[i].getEvalTree().eval(inSchema, null)); + t.put(i, targets[i].getEvalTree().eval(null)); } executedOnce = true; http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java index 0d1bf3d..8ffd503 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java @@ -58,13 +58,13 @@ public class HashAggregateExec extends AggregationExec { FunctionContext [] contexts = hashTable.get(keyTuple); if(contexts != null) { for(int i = 0; i < aggFunctions.length; i++) { - aggFunctions[i].merge(contexts[i], inSchema, tuple); + aggFunctions[i].merge(contexts[i], tuple); } } else { // if the key occurs firstly contexts = new FunctionContext[aggFunctionsNum]; for(int i = 0; i < aggFunctionsNum; i++) { contexts[i] = aggFunctions[i].newContext(); - aggFunctions[i].merge(contexts[i], inSchema, tuple); + aggFunctions[i].merge(contexts[i], tuple); } hashTable.put(keyTuple, contexts); } http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java index 9cd13fb..6e28ae0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java @@ -19,12 +19,8 @@ package org.apache.tajo.engine.planner.physical; import org.apache.tajo.catalog.Column; -import org.apache.tajo.engine.codegen.CompilationError; -import org.apache.tajo.engine.planner.Projector; import org.apache.tajo.engine.utils.TupleUtil; import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.catalog.SchemaUtil; -import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.storage.FrameTuple; import org.apache.tajo.storage.Tuple; @@ -35,10 +31,7 @@ import java.io.IOException; import java.util.*; -public class HashFullOuterJoinExec extends BinaryPhysicalExec { - // from logical plan - protected JoinNode plan; - protected EvalNode joinQual; +public class HashFullOuterJoinExec extends CommonJoinExec { protected List<Column[]> joinKeyPairs; @@ -57,19 +50,13 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec { protected boolean finished = false; protected boolean shouldGetLeftTuple = true; - // projection - protected final Projector projector; - private int rightNumCols; private int leftNumCols; private Map<Tuple, Boolean> matched; public HashFullOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, PhysicalExec inner) { - super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()), - plan.getOutSchema(), outer, inner); - this.plan = plan; - this.joinQual = plan.getJoinQual(); + super(context, plan, outer, inner); this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000); // this hashmap mirrors the evolution of the tupleSlots, with the same keys. For each join key, @@ -91,9 +78,6 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec { rightKeyList[i] = inner.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName()); } - // for projection - this.projector = new Projector(context, inSchema, outSchema, plan.getTargets()); - // for join frameTuple = new FrameTuple(); outTuple = new VTuple(outSchema.size()); @@ -103,11 +87,6 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec { rightNumCols = inner.getSchema().size(); } - @Override - protected void compile() throws CompilationError { - joinQual = context.getPrecompiledEval(inSchema, joinQual); - } - protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) { for (int i = 0; i < leftKeyList.length; i++) { keyTuple.put(i, outerTuple.get(leftKeyList[i])); @@ -186,7 +165,7 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec { rightTuple = iterator.next(); frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples - if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if both tuples are joinable + if (joinQual.eval(frameTuple).isTrue()) { // if both tuples are joinable projector.eval(frameTuple, outTuple); found = true; getKeyLeftTuple(leftTuple, leftKeyTuple); @@ -247,12 +226,6 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec { tupleSlots = null; matched = null; iterator = null; - plan = null; - joinQual = null; - } - - public JoinNode getPlan() { - return this.plan; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java index 3bdf2d4..48f3682 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java @@ -19,12 +19,9 @@ package org.apache.tajo.engine.planner.physical; import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.engine.planner.Projector; import org.apache.tajo.engine.utils.CacheHolder; import org.apache.tajo.engine.utils.TableCacheKey; -import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.FrameTuple; @@ -36,10 +33,7 @@ import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; import java.util.*; -public class HashJoinExec extends BinaryPhysicalExec { - // from logical plan - protected JoinNode plan; - protected EvalNode joinQual; +public class HashJoinExec extends CommonJoinExec { protected List<Column[]> joinKeyPairs; @@ -58,17 +52,11 @@ public class HashJoinExec extends BinaryPhysicalExec { protected boolean finished = false; protected boolean shouldGetLeftTuple = true; - // projection - protected final Projector projector; - private TableStats cachedRightTableStats; public HashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftExec, PhysicalExec rightExec) { - super(context, SchemaUtil.merge(leftExec.getSchema(), rightExec.getSchema()), plan.getOutSchema(), - leftExec, rightExec); - this.plan = plan; - this.joinQual = plan.getJoinQual(); + super(context, plan, leftExec, rightExec); // HashJoin only can manage equi join key pairs. this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftExec.getSchema(), @@ -85,20 +73,12 @@ public class HashJoinExec extends BinaryPhysicalExec { rightKeyList[i] = rightExec.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName()); } - // for projection - this.projector = new Projector(context, inSchema, outSchema, plan.getTargets()); - // for join frameTuple = new FrameTuple(); outTuple = new VTuple(outSchema.size()); leftKeyTuple = new VTuple(leftKeyList.length); } - @Override - protected void compile() { - joinQual = context.getPrecompiledEval(inSchema, joinQual); - } - protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) { for (int i = 0; i < leftKeyList.length; i++) { keyTuple.put(i, outerTuple.get(leftKeyList[i])); @@ -137,7 +117,7 @@ public class HashJoinExec extends BinaryPhysicalExec { // getting a next right tuple on in-memory hash table. rightTuple = iterator.next(); frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples - if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if both tuples are joinable + if (joinQual.eval(frameTuple).isTrue()) { // if both tuples are joinable projector.eval(frameTuple, outTuple); found = true; } @@ -232,12 +212,6 @@ public class HashJoinExec extends BinaryPhysicalExec { } iterator = null; - plan = null; - joinQual = null; - } - - public JoinNode getPlan() { - return this.plan; } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java index cceed3e..881bf84 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java @@ -92,7 +92,7 @@ public class HashLeftAntiJoinExec extends HashJoinExec { while (!context.isStopped() && notFound && iterator.hasNext()) { rightTuple = iterator.next(); frameTuple.set(leftTuple, rightTuple); - if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if the matched one is found + if (joinQual.eval(frameTuple).isTrue()) { // if the matched one is found notFound = false; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java index 81ac02c..fa9ba94 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java @@ -120,6 +120,11 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec { leftKeyTuple = new VTuple(leftKeyList.length); rightNumCols = rightChild.getSchema().size(); + + joinQual.bind(inSchema); + if (joinFilter != null) { + joinFilter.bind(inSchema); + } } @Override @@ -177,8 +182,8 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec { frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples // if there is no join filter, it is always true. - boolean satisfiedWithFilter = joinFilter == null ? true : joinFilter.eval(inSchema, frameTuple).isTrue(); - boolean satisfiedWithJoinCondition = joinQual.eval(inSchema, frameTuple).isTrue(); + boolean satisfiedWithFilter = joinFilter == null || joinFilter.eval(frameTuple).isTrue(); + boolean satisfiedWithJoinCondition = joinQual.eval(frameTuple).isTrue(); // if a composited tuple satisfies with both join filter and join condition if (satisfiedWithJoinCondition && satisfiedWithFilter) { http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java index 37c6d0e..32e6d08 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java @@ -19,10 +19,8 @@ package org.apache.tajo.engine.planner.physical; import org.apache.tajo.worker.TaskAttemptContext; -import org.apache.tajo.datum.NullDatum; import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; import java.io.IOException; import java.util.List; @@ -33,20 +31,10 @@ import java.util.List; * If found, it returns the tuple of the FROM side table. */ public class HashLeftSemiJoinExec extends HashJoinExec { - private Tuple rightNullTuple; public HashLeftSemiJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec fromSideChild, PhysicalExec inSideChild) { super(context, plan, fromSideChild, inSideChild); - // NUll Tuple - rightNullTuple = new VTuple(leftChild.outColumnNum); - for (int i = 0; i < leftChild.outColumnNum; i++) { - rightNullTuple.put(i, NullDatum.get()); - } - } - - protected void compile() { - joinQual = context.getPrecompiledEval(inSchema, joinQual); } /** @@ -95,7 +83,7 @@ public class HashLeftSemiJoinExec extends HashJoinExec { while (notFound && iterator.hasNext()) { rightTuple = iterator.next(); frameTuple.set(leftTuple, rightTuple); - if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if the matched one is found + if (joinQual.eval(frameTuple).isTrue()) { // if the matched one is found notFound = false; projector.eval(frameTuple, outTuple); } http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java index e9a7c03..b71c770 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java @@ -37,10 +37,16 @@ public class HavingExec extends UnaryPhysicalExec { } @Override + public void init() throws IOException { + super.init(); + qual.bind(inSchema); + } + + @Override public Tuple next() throws IOException { Tuple tuple; while (!context.isStopped() && (tuple = child.next()) != null) { - if (qual.eval(inSchema, tuple).isTrue()) { + if (qual.eval(tuple).isTrue()) { return tuple; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java index 3f2e431..13b73c3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java @@ -20,9 +20,7 @@ package org.apache.tajo.engine.planner.physical; import com.google.common.base.Preconditions; import org.apache.tajo.catalog.SortSpec; -import org.apache.tajo.engine.planner.Projector; import org.apache.tajo.engine.utils.TupleUtil; -import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.storage.FrameTuple; import org.apache.tajo.storage.Tuple; @@ -35,10 +33,7 @@ import java.util.ArrayList; import java.util.List; -public class MergeFullOuterJoinExec extends BinaryPhysicalExec { - // from logical plan - private JoinNode joinNode; - private EvalNode joinQual; +public class MergeFullOuterJoinExec extends CommonJoinExec { // temporal tuples and states for nested loop join private FrameTuple frameTuple; @@ -57,9 +52,6 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec { private boolean end = false; - // projection - private Projector projector; - private int rightNumCols; private int leftNumCols; private int posRightTupleSlots = -1; @@ -69,12 +61,9 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec { public MergeFullOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild, PhysicalExec rightChild, SortSpec[] leftSortKey, SortSpec[] rightSortKey) { - super(context, plan.getInSchema(), plan.getOutSchema(), leftChild, rightChild); + super(context, plan, leftChild, rightChild); Preconditions.checkArgument(plan.hasJoinQual(), "Sort-merge join is only used for the equi-join, " + "but there is no join condition"); - this.joinNode = plan; - this.joinQual = plan.getJoinQual(); - this.leftTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT); this.rightTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT); SortSpec[][] sortSpecs = new SortSpec[2][]; @@ -86,9 +75,6 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec { this.tupleComparator = PhysicalPlanUtil.getComparatorsFromJoinQual( plan.getJoinQual(), leftChild.getSchema(), rightChild.getSchema()); - // for projection - this.projector = new Projector(context, inSchema, outSchema, plan.getTargets()); - // for join frameTuple = new FrameTuple(); outTuple = new VTuple(outSchema.size()); @@ -97,15 +83,6 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec { rightNumCols = rightChild.getSchema().size(); } - @Override - protected void compile() { - joinQual = context.getPrecompiledEval(inSchema, joinQual); - } - - public JoinNode getPlan(){ - return this.joinNode; - } - public Tuple next() throws IOException { Tuple previous; @@ -292,7 +269,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec { Tuple aTuple = new VTuple(rightTupleSlots.get(posRightTupleSlots)); posRightTupleSlots = posRightTupleSlots + 1; frameTuple.set(leftNext, aTuple); - joinQual.eval(inSchema, frameTuple); + joinQual.eval(frameTuple); projector.eval(frameTuple, outTuple); return outTuple; } else { @@ -306,7 +283,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec { posLeftTupleSlots = posLeftTupleSlots + 1; frameTuple.set(leftNext, aTuple); - joinQual.eval(inSchema, frameTuple); + joinQual.eval(frameTuple); projector.eval(frameTuple, outTuple); return outTuple; } @@ -333,8 +310,5 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec { rightTupleSlots.clear(); leftTupleSlots = null; rightTupleSlots = null; - joinNode = null; - joinQual = null; - projector = null; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java index 63f48ac..bf9b4cd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java @@ -20,8 +20,6 @@ package org.apache.tajo.engine.planner.physical; import com.google.common.base.Preconditions; import org.apache.tajo.catalog.SortSpec; -import org.apache.tajo.engine.planner.Projector; -import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.storage.FrameTuple; import org.apache.tajo.storage.Tuple; @@ -34,10 +32,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -public class MergeJoinExec extends BinaryPhysicalExec { - // from logical plan - private JoinNode joinNode; - private EvalNode joinQual; +public class MergeJoinExec extends CommonJoinExec { // temporal tuples and states for nested loop join private FrameTuple frameTuple; @@ -58,16 +53,11 @@ public class MergeJoinExec extends BinaryPhysicalExec { private boolean end = false; - // projection - private Projector projector; - public MergeJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, PhysicalExec inner, SortSpec[] outerSortKey, SortSpec[] innerSortKey) { - super(context, plan.getInSchema(), plan.getOutSchema(), outer, inner); + super(context, plan, outer, inner); Preconditions.checkArgument(plan.hasJoinQual(), "Sort-merge join is only used for the equi-join, " + "but there is no join condition"); - this.joinNode = plan; - this.joinQual = plan.getJoinQual(); this.outerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT); this.innerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT); @@ -82,23 +72,11 @@ public class MergeJoinExec extends BinaryPhysicalExec { this.outerIterator = outerTupleSlots.iterator(); this.innerIterator = innerTupleSlots.iterator(); - // for projection - this.projector = new Projector(context, inSchema, outSchema, plan.getTargets()); - // for join frameTuple = new FrameTuple(); outTuple = new VTuple(outSchema.size()); } - @Override - protected void compile() { - joinQual = context.getPrecompiledEval(inSchema, joinQual); - } - - public JoinNode getPlan(){ - return this.joinNode; - } - public Tuple next() throws IOException { Tuple previous; @@ -165,7 +143,7 @@ public class MergeJoinExec extends BinaryPhysicalExec { frameTuple.set(outerNext, innerIterator.next()); - if (joinQual.eval(inSchema, frameTuple).isTrue()) { + if (joinQual.eval(frameTuple).isTrue()) { projector.eval(frameTuple, outTuple); return outTuple; } @@ -192,7 +170,5 @@ public class MergeJoinExec extends BinaryPhysicalExec { innerTupleSlots = null; outerIterator = null; innerIterator = null; - joinQual = null; - projector = null; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java index 5e7ab98..964a523 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java @@ -18,8 +18,6 @@ package org.apache.tajo.engine.planner.physical; -import org.apache.tajo.engine.planner.Projector; -import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.storage.FrameTuple; import org.apache.tajo.storage.Tuple; @@ -28,11 +26,7 @@ import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; -public class NLJoinExec extends BinaryPhysicalExec { - // from logical plan - private JoinNode plan; - private EvalNode joinQual; - +public class NLJoinExec extends CommonJoinExec { // temporal tuples and states for nested loop join private boolean needNewOuter; @@ -41,31 +35,15 @@ public class NLJoinExec extends BinaryPhysicalExec { private Tuple innerTuple = null; private Tuple outTuple = null; - // projection - private final Projector projector; - public NLJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, PhysicalExec inner) { - super(context, plan.getInSchema(), plan.getOutSchema(), outer, inner); - this.plan = plan; - - if (plan.hasJoinQual()) { - this.joinQual = plan.getJoinQual(); - } - - // for projection - projector = new Projector(context, inSchema, outSchema, plan.getTargets()); - + super(context, plan, outer, inner); // for join needNewOuter = true; frameTuple = new FrameTuple(); outTuple = new VTuple(outSchema.size()); } - public JoinNode getPlan() { - return this.plan; - } - public Tuple next() throws IOException { while (!context.isStopped()) { if (needNewOuter) { @@ -84,8 +62,8 @@ public class NLJoinExec extends BinaryPhysicalExec { } frameTuple.set(outerTuple, innerTuple); - if (joinQual != null) { - if (joinQual.eval(inSchema, frameTuple).isTrue()) { + if (hasJoinQual) { + if (joinQual.eval(frameTuple).isTrue()) { projector.eval(frameTuple, outTuple); return outTuple; } http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java index 7959d47..735623d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java @@ -18,9 +18,7 @@ package org.apache.tajo.engine.planner.physical; -import org.apache.tajo.engine.planner.Projector; import org.apache.tajo.engine.utils.TupleUtil; -import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.storage.FrameTuple; import org.apache.tajo.storage.Tuple; @@ -29,11 +27,7 @@ import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; -public class NLLeftOuterJoinExec extends BinaryPhysicalExec { - // from logical plan - private JoinNode plan; - private EvalNode joinQual; - +public class NLLeftOuterJoinExec extends CommonJoinExec { // temporal tuples and states for nested loop join private boolean needNextRightTuple; private FrameTuple frameTuple; @@ -41,24 +35,12 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec { private Tuple rightTuple = null; private Tuple outTuple = null; - // projection - private final Projector projector; - private boolean foundAtLeastOneMatch; private int rightNumCols; public NLLeftOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild, PhysicalExec rightChild) { - super(context, plan.getInSchema(), plan.getOutSchema(), leftChild, rightChild); - this.plan = plan; - - if (plan.hasJoinQual()) { - this.joinQual = plan.getJoinQual(); - } - - // for projection - projector = new Projector(context, inSchema, outSchema, plan.getTargets()); - + super(context, plan, leftChild, rightChild); // for join needNextRightTuple = true; frameTuple = new FrameTuple(); @@ -68,10 +50,6 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec { rightNumCols = rightChild.getSchema().size(); } - public JoinNode getPlan() { - return this.plan; - } - public Tuple next() throws IOException { while (!context.isStopped()) { if (needNextRightTuple) { @@ -106,7 +84,7 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec { frameTuple.set(leftTuple, rightTuple); ; - if (joinQual.eval(inSchema, frameTuple).isTrue()) { + if (joinQual.eval(frameTuple).isTrue()) { projector.eval(frameTuple, outTuple); foundAtLeastOneMatch = true; return outTuple; http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java index df1c09d..7abfbe6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java @@ -21,9 +21,7 @@ package org.apache.tajo.engine.planner.physical; import com.google.common.base.Preconditions; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.engine.planner.Projector; import org.apache.tajo.engine.utils.TupleUtil; -import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.storage.FrameTuple; import org.apache.tajo.storage.Tuple; @@ -35,11 +33,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -public class RightOuterMergeJoinExec extends BinaryPhysicalExec { - // from logical plan - private JoinNode joinNode; - private EvalNode joinQual; - +public class RightOuterMergeJoinExec extends CommonJoinExec { // temporal tuples and states for nested loop join private FrameTuple frameTuple; private Tuple leftTuple = null; @@ -57,9 +51,6 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec { private boolean end = false; - // projection - private Projector projector; - private int leftNumCols; private int posRightTupleSlots = -1; private int posLeftTupleSlots = -1; @@ -68,12 +59,9 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec { public RightOuterMergeJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, PhysicalExec inner, SortSpec[] outerSortKey, SortSpec[] innerSortKey) { - super(context, plan.getInSchema(), plan.getOutSchema(), outer, inner); + super(context, plan, outer, inner); Preconditions.checkArgument(plan.hasJoinQual(), "Sort-merge join is only used for the equi-join, " + "but there is no join condition"); - this.joinNode = plan; - this.joinQual = plan.getJoinQual(); - this.leftTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT); this.innerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT); SortSpec[][] sortSpecs = new SortSpec[2][]; @@ -84,9 +72,6 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec { this.tupleComparator = PhysicalPlanUtil.getComparatorsFromJoinQual( plan.getJoinQual(), outer.getSchema(), inner.getSchema()); - // for projection - this.projector = new Projector(context, inSchema, outSchema, plan.getTargets()); - // for join frameTuple = new FrameTuple(); outTuple = new VTuple(outSchema.size()); @@ -94,15 +79,6 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec { leftNumCols = outer.getSchema().size(); } - @Override - protected void compile() { - joinQual = context.getPrecompiledEval(inSchema, joinQual); - } - - public JoinNode getPlan() { - return this.joinNode; - } - /** * creates a tuple of a given size filled with NULL values in all fields */ @@ -302,7 +278,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec { posRightTupleSlots = posRightTupleSlots + 1; frameTuple.set(nextLeft, aTuple); - if (joinQual.eval(inSchema, frameTuple).asBool()) { + if (joinQual.eval(frameTuple).asBool()) { projector.eval(frameTuple, outTuple); return outTuple; } else { @@ -324,7 +300,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec { frameTuple.set(nextLeft, aTuple); - if (joinQual.eval(inSchema, frameTuple).asBool()) { + if (joinQual.eval(frameTuple).asBool()) { projector.eval(frameTuple, outTuple); return outTuple; } else { @@ -357,9 +333,6 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec { innerTupleSlots.clear(); leftTupleSlots = null; innerTupleSlots = null; - joinNode = null; - joinQual = null; - projector = null; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java index b9273fa..c090fa7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java @@ -37,6 +37,12 @@ public class SelectionExec extends UnaryPhysicalExec { } @Override + public void init() throws IOException { + super.init(); + qual.bind(inSchema); + } + + @Override public void compile() throws CompilationError { qual = context.getPrecompiledEval(inSchema, qual); } @@ -45,7 +51,7 @@ public class SelectionExec extends UnaryPhysicalExec { public Tuple next() throws IOException { Tuple tuple; while (!context.isStopped() && (tuple = child.next()) != null) { - if (qual.eval(inSchema, tuple).isTrue()) { + if (qual.eval(tuple).isTrue()) { return tuple; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index 1078c80..671555c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -27,6 +27,7 @@ import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.NullDatum; import org.apache.tajo.engine.codegen.CompilationError; import org.apache.tajo.engine.planner.Projector; import org.apache.tajo.plan.Target; @@ -69,9 +70,8 @@ public class SeqScanExec extends ScanExec { this.qual = plan.getQual(); this.fragments = fragments; - if (fragments != null - && plan.getTableDesc().hasPartition() - && plan.getTableDesc().getPartitionMethod().getPartitionType() == CatalogProtos.PartitionType.COLUMN) { + if (plan.getTableDesc().hasPartition() && + plan.getTableDesc().getPartitionMethod().getPartitionType() == CatalogProtos.PartitionType.COLUMN) { rewriteColumnPartitionedTableSchema(); } } @@ -94,21 +94,26 @@ public class SeqScanExec extends ScanExec { // Remove partition key columns from an input schema. this.inSchema = plan.getTableDesc().getSchema(); - List<FileFragment> fileFragments = FragmentConvertor.convert(FileFragment.class, fragments); + Tuple partitionRow = null; + if (fragments != null && fragments.length > 0) { + List<FileFragment> fileFragments = FragmentConvertor.convert(FileFragment.class, fragments); - // Get a partition key value from a given path - Tuple partitionRow = - PartitionedTableRewriter.buildTupleFromPartitionPath(columnPartitionSchema, fileFragments.get(0).getPath(), - false); + // Get a partition key value from a given path + partitionRow = PartitionedTableRewriter.buildTupleFromPartitionPath( + columnPartitionSchema, fileFragments.get(0).getPath(), false); + } // Targets or search conditions may contain column references. // However, actual values absent in tuples. So, Replace all column references by constant datum. for (Column column : columnPartitionSchema.toArray()) { FieldEval targetExpr = new FieldEval(column); - Datum datum = targetExpr.eval(columnPartitionSchema, partitionRow); + Datum datum = NullDatum.get(); + if (partitionRow != null) { + targetExpr.bind(columnPartitionSchema); + datum = targetExpr.eval(partitionRow); + } ConstEval constExpr = new ConstEval(datum); - for (int i = 0; i < plan.getTargets().length; i++) { Target target = plan.getTargets()[i]; @@ -156,6 +161,10 @@ public class SeqScanExec extends ScanExec { initScanner(projected); super.init(); + + if (plan.hasQual()) { + qual.bind(inSchema); + } } @Override @@ -211,7 +220,7 @@ public class SeqScanExec extends ScanExec { } } else { while ((tuple = scanner.next()) != null) { - if (qual.eval(inSchema, tuple).isTrue()) { + if (qual.eval(tuple).isTrue()) { projector.eval(tuple, outTuple); return outTuple; } http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java index 425eb86..9831d83 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java @@ -73,14 +73,14 @@ public class SortAggregateExec extends AggregationExec { // Merge when aggregator doesn't receive NullDatum if (!(groupingKeyNum == 0 && aggFunctionsNum == tuple.size() && tuple.get(i) == NullDatum.get())) { - aggFunctions[i].merge(contexts[i], inSchema, tuple); + aggFunctions[i].merge(contexts[i], tuple); } } lastKey = currentKey; } else { // aggregate for (int i = 0; i < aggFunctionsNum; i++) { - aggFunctions[i].merge(contexts[i], inSchema, tuple); + aggFunctions[i].merge(contexts[i], tuple); } } @@ -98,7 +98,7 @@ public class SortAggregateExec extends AggregationExec { for(int evalIdx = 0; evalIdx < aggFunctionsNum; evalIdx++) { contexts[evalIdx] = aggFunctions[evalIdx].newContext(); - aggFunctions[evalIdx].merge(contexts[evalIdx], inSchema, tuple); + aggFunctions[evalIdx].merge(contexts[evalIdx], tuple); } lastKey = currentKey; http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java index e2dbf7a..2f1fc46 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java @@ -23,6 +23,7 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.datum.Datum; +import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.expr.WindowFunctionEval; import org.apache.tajo.plan.function.FunctionContext; import org.apache.tajo.plan.logical.WindowAggNode; @@ -178,6 +179,14 @@ public class WindowAggExec extends UnaryPhysicalExec { outputColumnNum = nonFunctionColumnNum + functionNum; } + @Override + public void init() throws IOException { + super.init(); + for (EvalNode functionEval : functions) { + functionEval.bind(inSchema); + } + } + private void transition(WindowState state) { this.state = state; } @@ -303,7 +312,7 @@ public class WindowAggExec extends UnaryPhysicalExec { Tuple inTuple = accumulatedInTuples.get(i); Tuple outTuple = evaluatedTuples.get(i); - functions[idx].merge(contexts[idx], inSchema, inTuple); + functions[idx].merge(contexts[idx], inTuple); if (windowFuncFlags[idx]) { Datum result = functions[idx].terminate(contexts[idx]); http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java index 027da50..6b2c37c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java @@ -24,7 +24,6 @@ import com.google.common.collect.Maps; import org.apache.commons.codec.binary.Base64; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SortSpec; @@ -37,7 +36,6 @@ import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleRange; import org.apache.tajo.storage.VTuple; -import org.apache.tajo.util.StringUtils; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; @@ -219,7 +217,7 @@ public class TupleUtil { Tuple tuple; while (iterator.hasNext()) { tuple = iterator.next(); - if (qual.eval(schema, tuple).isTrue()) { + if (qual.eval(tuple).isTrue()) { results.add(tuple); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java index 5901aa7..80bdb86 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java @@ -684,7 +684,7 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult } else { while (currentRow < cachedData.size()) { aTuple = cachedData.get(currentRow++); - if (qual.eval(inSchema, aTuple).isTrue()) { + if (qual.eval(aTuple).isTrue()) { projector.eval(aTuple, outTuple); return outTuple; } http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index 75e7762..cec1125 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -274,7 +274,7 @@ public class QueryExecutor { final Tuple outTuple = new VTuple(targets.length); for (int i = 0; i < targets.length; i++) { EvalNode eval = targets[i].getEvalTree(); - outTuple.put(i, eval.eval(null, null)); + outTuple.put(i, eval.eval(null)); } boolean isInsert = rootNode.getChild() != null && rootNode.getChild().getType() == NodeType.INSERT; if (isInsert) { http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java index 876e3e4..e2dac05 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java @@ -281,8 +281,9 @@ public class ExprTestBase { if (queryContext.getBool(SessionVars.CODEGEN)) { eval = codegen.compile(inputSchema, eval); } + eval.bind(inputSchema); - outTuple.put(i, eval.eval(inputSchema, vtuple)); + outTuple.put(i, eval.eval(vtuple)); } try { http://git-wip-us.apache.org/repos/asf/tajo/blob/f4c9e54c/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java index 4c1efd6..f03988b 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java @@ -45,13 +45,15 @@ public class TestEvalTree extends ExprTestBase { schema1.addColumn("table1.score", INT4); BinaryEval expr = new BinaryEval(EvalType.PLUS, e1, e2); + expr.bind(schema1); + assertCloneEqual(expr); VTuple tuple = new VTuple(2); tuple.put(0, DatumFactory.createInt4(1)); // put 0th field - tuple.put(1, DatumFactory.createInt4(99)); // put 0th field + tuple.put(1, DatumFactory.createInt4(99)); // put 1th field // the result of evaluation must be 100. - assertEquals(expr.eval(schema1, tuple).asInt4(), 100); + assertEquals(expr.eval(tuple).asInt4(), 100); } public static class MockTrueEval extends EvalNode { @@ -76,7 +78,7 @@ public class TestEvalTree extends ExprTestBase { } @Override - public Datum eval(Schema schema, Tuple tuple) { + public Datum eval(Tuple tuple) { return DatumFactory.createBool(true); } @@ -108,7 +110,7 @@ public class TestEvalTree extends ExprTestBase { } @Override - public Datum eval(Schema schema, Tuple tuple) { + public Datum eval(Tuple tuple) { return DatumFactory.createBool(false); } @@ -154,16 +156,16 @@ public class TestEvalTree extends ExprTestBase { MockFalseExpr falseExpr = new MockFalseExpr(); BinaryEval andExpr = new BinaryEval(EvalType.AND, trueExpr, trueExpr); - assertTrue(andExpr.eval(null, null).asBool()); + assertTrue(andExpr.eval(null).asBool()); andExpr = new BinaryEval(EvalType.AND, falseExpr, trueExpr); - assertFalse(andExpr.eval(null, null).asBool()); + assertFalse(andExpr.eval(null).asBool()); andExpr = new BinaryEval(EvalType.AND, trueExpr, falseExpr); - assertFalse(andExpr.eval(null, null).asBool()); + assertFalse(andExpr.eval(null).asBool()); andExpr = new BinaryEval(EvalType.AND, falseExpr, falseExpr); - assertFalse(andExpr.eval(null, null).asBool()); + assertFalse(andExpr.eval(null).asBool()); } @Test @@ -172,16 +174,16 @@ public class TestEvalTree extends ExprTestBase { MockFalseExpr falseExpr = new MockFalseExpr(); BinaryEval orExpr = new BinaryEval(EvalType.OR, trueExpr, trueExpr); - assertTrue(orExpr.eval(null, null).asBool()); + assertTrue(orExpr.eval(null).asBool()); orExpr = new BinaryEval(EvalType.OR, falseExpr, trueExpr); - assertTrue(orExpr.eval(null, null).asBool()); + assertTrue(orExpr.eval(null).asBool()); orExpr = new BinaryEval(EvalType.OR, trueExpr, falseExpr); - assertTrue(orExpr.eval(null, null).asBool()); + assertTrue(orExpr.eval(null).asBool()); orExpr = new BinaryEval(EvalType.OR, falseExpr, falseExpr); - assertFalse(orExpr.eval(null, null).asBool()); + assertFalse(orExpr.eval(null).asBool()); } @Test @@ -194,41 +196,41 @@ public class TestEvalTree extends ExprTestBase { e1 = new ConstEval(DatumFactory.createInt4(9)); e2 = new ConstEval(DatumFactory.createInt4(34)); expr = new BinaryEval(EvalType.LTH, e1, e2); - assertTrue(expr.eval(null, null).asBool()); + assertTrue(expr.eval(null).asBool()); expr = new BinaryEval(EvalType.LEQ, e1, e2); - assertTrue(expr.eval(null, null).asBool()); + assertTrue(expr.eval(null).asBool()); expr = new BinaryEval(EvalType.LTH, e2, e1); - assertFalse(expr.eval(null, null).asBool()); + assertFalse(expr.eval(null).asBool()); expr = new BinaryEval(EvalType.LEQ, e2, e1); - assertFalse(expr.eval(null, null).asBool()); + assertFalse(expr.eval(null).asBool()); expr = new BinaryEval(EvalType.GTH, e2, e1); - assertTrue(expr.eval(null, null).asBool()); + assertTrue(expr.eval(null).asBool()); expr = new BinaryEval(EvalType.GEQ, e2, e1); - assertTrue(expr.eval(null, null).asBool()); + assertTrue(expr.eval(null).asBool()); expr = new BinaryEval(EvalType.GTH, e1, e2); - assertFalse(expr.eval(null, null).asBool()); + assertFalse(expr.eval(null).asBool()); expr = new BinaryEval(EvalType.GEQ, e1, e2); - assertFalse(expr.eval(null, null).asBool()); + assertFalse(expr.eval(null).asBool()); BinaryEval plus = new BinaryEval(EvalType.PLUS, e1, e2); expr = new BinaryEval(EvalType.LTH, e1, plus); - assertTrue(expr.eval(null, null).asBool()); + assertTrue(expr.eval(null).asBool()); expr = new BinaryEval(EvalType.LEQ, e1, plus); - assertTrue(expr.eval(null, null).asBool()); + assertTrue(expr.eval(null).asBool()); expr = new BinaryEval(EvalType.LTH, plus, e1); - assertFalse(expr.eval(null, null).asBool()); + assertFalse(expr.eval(null).asBool()); expr = new BinaryEval(EvalType.LEQ, plus, e1); - assertFalse(expr.eval(null, null).asBool()); + assertFalse(expr.eval(null).asBool()); expr = new BinaryEval(EvalType.GTH, plus, e1); - assertTrue(expr.eval(null, null).asBool()); + assertTrue(expr.eval(null).asBool()); expr = new BinaryEval(EvalType.GEQ, plus, e1); - assertTrue(expr.eval(null, null).asBool()); + assertTrue(expr.eval(null).asBool()); expr = new BinaryEval(EvalType.GTH, e1, plus); - assertFalse(expr.eval(null, null).asBool()); + assertFalse(expr.eval(null).asBool()); expr = new BinaryEval(EvalType.GEQ, e1, plus); - assertFalse(expr.eval(null, null).asBool()); + assertFalse(expr.eval(null).asBool()); } @Test @@ -241,28 +243,28 @@ public class TestEvalTree extends ExprTestBase { e1 = new ConstEval(DatumFactory.createInt4(9)); e2 = new ConstEval(DatumFactory.createInt4(34)); BinaryEval expr = new BinaryEval(EvalType.PLUS, e1, e2); - assertEquals(expr.eval(null, null).asInt4(), 43); + assertEquals(expr.eval(null).asInt4(), 43); assertCloneEqual(expr); // MINUS e1 = new ConstEval(DatumFactory.createInt4(5)); e2 = new ConstEval(DatumFactory.createInt4(2)); expr = new BinaryEval(EvalType.MINUS, e1, e2); - assertEquals(expr.eval(null, null).asInt4(), 3); + assertEquals(expr.eval(null).asInt4(), 3); assertCloneEqual(expr); // MULTIPLY e1 = new ConstEval(DatumFactory.createInt4(5)); e2 = new ConstEval(DatumFactory.createInt4(2)); expr = new BinaryEval(EvalType.MULTIPLY, e1, e2); - assertEquals(expr.eval(null, null).asInt4(), 10); + assertEquals(expr.eval(null).asInt4(), 10); assertCloneEqual(expr); // DIVIDE e1 = new ConstEval(DatumFactory.createInt4(10)); e2 = new ConstEval(DatumFactory.createInt4(5)); expr = new BinaryEval(EvalType.DIVIDE, e1, e2); - assertEquals(expr.eval(null, null).asInt4(), 2); + assertEquals(expr.eval(null).asInt4(), 2); assertCloneEqual(expr); } @@ -278,7 +280,7 @@ public class TestEvalTree extends ExprTestBase { assertEquals(CatalogUtil.newSimpleDataType(INT4), expr.getValueType()); expr = new BinaryEval(EvalType.LTH, e1, e2); - assertTrue(expr.eval(null, null).asBool()); + assertTrue(expr.eval(null).asBool()); assertEquals(CatalogUtil.newSimpleDataType(BOOLEAN), expr.getValueType()); e1 = new ConstEval(DatumFactory.createFloat8(9.3));
