http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-core/src/test/resources/results/TestUnionQuery/testComplexUnion1.plan ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestUnionQuery/testComplexUnion1.plan b/tajo-core/src/test/resources/results/TestUnionQuery/testComplexUnion1.plan new file mode 100644 index 0000000..9d713aa --- /dev/null +++ b/tajo-core/src/test/resources/results/TestUnionQuery/testComplexUnion1.plan @@ -0,0 +1,80 @@ +explain +------------------------------- +TABLE_SUBQUERY(7) as default.result + => Targets: default.result.l_orderkey (INT4), default.result.l_partkey (INT4), default.result.query (TEXT) + => out schema: {(3) default.result.l_orderkey (INT4), default.result.l_partkey (INT4), default.result.query (TEXT)} + => in schema: {(3) default.result.l_orderkey (INT4), default.result.l_partkey (INT4), default.result.query (TEXT)} + UNION(6) + PROJECTION(5) + => Targets: default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), bbc as query + => out schema: {(3) default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), query (TEXT)} + => in schema: {(2) default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4)} + SCAN(3) on default.lineitem + => filter: default.lineitem.l_orderkey (INT4) = 1 + => target list: default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4) + => out schema: {(2) default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4)} + => in schema: {(16) default.lineitem.l_comment (TEXT), default.lineitem.l_commitdate (TEXT), default.lineitem.l_discount (FLOAT8), default.lineitem.l_extendedprice (FLOAT8), default.lineitem.l_linenumber (INT4), default.lineitem.l_linestatus (TEXT), default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), default.lineitem.l_quantity (FLOAT8), default.lineitem.l_receiptdate (TEXT), default.lineitem.l_returnflag (TEXT), default.lineitem.l_shipdate (TEXT), default.lineitem.l_shipinstruct (TEXT), default.lineitem.l_shipmode (TEXT), default.lineitem.l_suppkey (INT4), default.lineitem.l_tax (FLOAT8)} + PROJECTION(2) + => Targets: default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), abc as query + => out schema: {(3) default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), query (TEXT)} + => in schema: {(2) default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4)} + SCAN(0) on default.lineitem + => filter: default.lineitem.l_orderkey (INT4) = 1 + => target list: default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4) + => out schema: {(2) default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4)} + => in schema: {(16) default.lineitem.l_comment (TEXT), default.lineitem.l_commitdate (TEXT), default.lineitem.l_discount (FLOAT8), default.lineitem.l_extendedprice (FLOAT8), default.lineitem.l_linenumber (INT4), default.lineitem.l_linestatus (TEXT), default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), default.lineitem.l_quantity (FLOAT8), default.lineitem.l_receiptdate (TEXT), default.lineitem.l_returnflag (TEXT), default.lineitem.l_shipdate (TEXT), default.lineitem.l_shipinstruct (TEXT), default.lineitem.l_shipmode (TEXT), default.lineitem.l_suppkey (INT4), default.lineitem.l_tax (FLOAT8)} +explain +------------------------------- +------------------------------------------------------------------------------- +Execution Block Graph (TERMINAL - eb_0000000000000_0000_000003) +------------------------------------------------------------------------------- +|-eb_0000000000000_0000_000003 + |-eb_0000000000000_0000_000002 + |-eb_0000000000000_0000_000001 +------------------------------------------------------------------------------- +Order of Execution +------------------------------------------------------------------------------- +1: eb_0000000000000_0000_000001 +2: eb_0000000000000_0000_000002 +3: eb_0000000000000_0000_000003 +------------------------------------------------------------------------------- + +======================================================= +Block Id: eb_0000000000000_0000_000001 [ROOT] +======================================================= + +TABLE_SUBQUERY(11) as default.result + => Targets: default.result.l_orderkey (INT4) as default.result.l_orderkey, default.result.l_partkey (INT4) as default.result.l_partkey, default.result.query (TEXT) as default.result.query + => out schema: {(3) default.result.l_orderkey (INT4), default.result.l_partkey (INT4), default.result.query (TEXT)} + => in schema: {(3) default.result.l_orderkey (INT4), default.result.l_partkey (INT4), default.result.query (TEXT)} + PROJECTION(2) + => Targets: default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), abc as query + => out schema: {(3) default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), query (TEXT)} + => in schema: {(2) default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4)} + SCAN(0) on default.lineitem + => filter: default.lineitem.l_orderkey (INT4) = 1 + => target list: default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4) + => out schema: {(2) default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4)} + => in schema: {(16) default.lineitem.l_comment (TEXT), default.lineitem.l_commitdate (TEXT), default.lineitem.l_discount (FLOAT8), default.lineitem.l_extendedprice (FLOAT8), default.lineitem.l_linenumber (INT4), default.lineitem.l_linestatus (TEXT), default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), default.lineitem.l_quantity (FLOAT8), default.lineitem.l_receiptdate (TEXT), default.lineitem.l_returnflag (TEXT), default.lineitem.l_shipdate (TEXT), default.lineitem.l_shipinstruct (TEXT), default.lineitem.l_shipmode (TEXT), default.lineitem.l_suppkey (INT4), default.lineitem.l_tax (FLOAT8)} + +======================================================= +Block Id: eb_0000000000000_0000_000002 [ROOT] +======================================================= + +TABLE_SUBQUERY(12) as default.result + => Targets: default.result.l_orderkey (INT4), default.result.l_partkey (INT4), default.result.query (TEXT) + => out schema: {(3) default.result.l_orderkey (INT4), default.result.l_partkey (INT4), default.result.query (TEXT)} + => in schema: {(3) default.result.l_orderkey (INT4), default.result.l_partkey (INT4), default.result.query (TEXT)} + PROJECTION(5) + => Targets: default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), bbc as query + => out schema: {(3) default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), query (TEXT)} + => in schema: {(2) default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4)} + SCAN(3) on default.lineitem + => filter: default.lineitem.l_orderkey (INT4) = 1 + => target list: default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4) + => out schema: {(2) default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4)} + => in schema: {(16) default.lineitem.l_comment (TEXT), default.lineitem.l_commitdate (TEXT), default.lineitem.l_discount (FLOAT8), default.lineitem.l_extendedprice (FLOAT8), default.lineitem.l_linenumber (INT4), default.lineitem.l_linestatus (TEXT), default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), default.lineitem.l_quantity (FLOAT8), default.lineitem.l_receiptdate (TEXT), default.lineitem.l_returnflag (TEXT), default.lineitem.l_shipdate (TEXT), default.lineitem.l_shipinstruct (TEXT), default.lineitem.l_shipmode (TEXT), default.lineitem.l_suppkey (INT4), default.lineitem.l_tax (FLOAT8)} + +======================================================= +Block Id: eb_0000000000000_0000_000003 [TERMINAL] +=======================================================
http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-core/src/test/resources/results/TestUnionQuery/testComplexUnion1.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestUnionQuery/testComplexUnion1.result b/tajo-core/src/test/resources/results/TestUnionQuery/testComplexUnion1.result new file mode 100644 index 0000000..cf07e63 --- /dev/null +++ b/tajo-core/src/test/resources/results/TestUnionQuery/testComplexUnion1.result @@ -0,0 +1,6 @@ +l_orderkey,l_partkey,query +------------------------------- +1,1,abc +1,1,abc +1,1,bbc +1,1,bbc http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-core/src/test/resources/results/TestUnionQuery/testComplexUnion2.plan ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestUnionQuery/testComplexUnion2.plan b/tajo-core/src/test/resources/results/TestUnionQuery/testComplexUnion2.plan new file mode 100644 index 0000000..2e2280d --- /dev/null +++ b/tajo-core/src/test/resources/results/TestUnionQuery/testComplexUnion2.plan @@ -0,0 +1,116 @@ +explain +------------------------------- +TABLE_SUBQUERY(8) as default.result + => Targets: default.result.l_orderkey (INT4), default.result.l_partkey (INT4), default.result.url (TEXT) + => out schema: {(3) default.result.l_orderkey (INT4), default.result.l_partkey (INT4), default.result.url (TEXT)} + => in schema: {(3) default.result.l_orderkey (INT4), default.result.l_partkey (INT4), default.result.url (TEXT)} + JOIN(11)(INNER) + => Join Cond: default.res1.l_partkey (INT4) = default.res2.p_partkey (INT4) + => target list: default.res1.l_orderkey (INT4), default.res1.l_partkey (INT4), default.res1.url (TEXT) + => out schema: {(3) default.res1.l_orderkey (INT4), default.res1.l_partkey (INT4), default.res1.url (TEXT)} + => in schema: {(4) default.res1.l_orderkey (INT4), default.res1.l_partkey (INT4), default.res1.url (TEXT), default.res2.p_partkey (INT4)} + TABLE_SUBQUERY(5) as default.res2 + => Targets: default.res2.p_partkey (INT4) + => out schema: {(1) default.res2.p_partkey (INT4)} + => in schema: {(9) default.res2.p_brand (TEXT), default.res2.p_comment (TEXT), default.res2.p_container (TEXT), default.res2.p_mfgr (TEXT), default.res2.p_name (TEXT), default.res2.p_partkey (INT4), default.res2.p_retailprice (FLOAT8), default.res2.p_size (INT4), default.res2.p_type (TEXT)} + SCAN(3) on default.part + => target list: default.part.p_brand (TEXT), default.part.p_comment (TEXT), default.part.p_container (TEXT), default.part.p_mfgr (TEXT), default.part.p_name (TEXT), default.part.p_partkey (INT4), default.part.p_retailprice (FLOAT8), default.part.p_size (INT4), default.part.p_type (TEXT) + => out schema: {(9) default.part.p_brand (TEXT), default.part.p_comment (TEXT), default.part.p_container (TEXT), default.part.p_mfgr (TEXT), default.part.p_name (TEXT), default.part.p_partkey (INT4), default.part.p_retailprice (FLOAT8), default.part.p_size (INT4), default.part.p_type (TEXT)} + => in schema: {(9) default.part.p_brand (TEXT), default.part.p_comment (TEXT), default.part.p_container (TEXT), default.part.p_mfgr (TEXT), default.part.p_name (TEXT), default.part.p_partkey (INT4), default.part.p_retailprice (FLOAT8), default.part.p_size (INT4), default.part.p_type (TEXT)} + TABLE_SUBQUERY(2) as default.res1 + => Targets: default.res1.l_orderkey (INT4), default.res1.l_partkey (INT4), default.res1.url (TEXT) + => out schema: {(3) default.res1.l_orderkey (INT4), default.res1.l_partkey (INT4), default.res1.url (TEXT)} + => in schema: {(3) default.res1.l_orderkey (INT4), default.res1.l_partkey (INT4), default.res1.url (TEXT)} + SCAN(0) on default.lineitem + => target list: CASE WHEN default.lineitem.l_partkey (INT4) IS NOT NULL THEN WHEN default.lineitem.l_orderkey (INT4) = 1 THEN 1 ELSE 2 END as url, default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4) + => out schema: {(3) default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), url (TEXT)} + => in schema: {(16) default.lineitem.l_comment (TEXT), default.lineitem.l_commitdate (TEXT), default.lineitem.l_discount (FLOAT8), default.lineitem.l_extendedprice (FLOAT8), default.lineitem.l_linenumber (INT4), default.lineitem.l_linestatus (TEXT), default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), default.lineitem.l_quantity (FLOAT8), default.lineitem.l_receiptdate (TEXT), default.lineitem.l_returnflag (TEXT), default.lineitem.l_shipdate (TEXT), default.lineitem.l_shipinstruct (TEXT), default.lineitem.l_shipmode (TEXT), default.lineitem.l_suppkey (INT4), default.lineitem.l_tax (FLOAT8)} +explain +------------------------------- +------------------------------------------------------------------------------- +Execution Block Graph (TERMINAL - eb_0000000000000_0000_000004) +------------------------------------------------------------------------------- +|-eb_0000000000000_0000_000004 + |-eb_0000000000000_0000_000003 + |-eb_0000000000000_0000_000002 + |-eb_0000000000000_0000_000001 +------------------------------------------------------------------------------- +Order of Execution +------------------------------------------------------------------------------- +1: eb_0000000000000_0000_000001 +2: eb_0000000000000_0000_000002 +3: eb_0000000000000_0000_000003 +4: eb_0000000000000_0000_000004 +------------------------------------------------------------------------------- + +======================================================= +Block Id: eb_0000000000000_0000_000001 [LEAF] +======================================================= + +[Outgoing] +[q_0000000000000_0000] 1 => 3 (type=HASH_SHUFFLE, key=default.res1.l_partkey (INT4), num=32) + +TABLE_SUBQUERY(2) as default.res1 + => Targets: default.res1.l_orderkey (INT4), default.res1.l_partkey (INT4), default.res1.url (TEXT) + => out schema: {(3) default.res1.l_orderkey (INT4), default.res1.l_partkey (INT4), default.res1.url (TEXT)} + => in schema: {(3) default.res1.l_orderkey (INT4), default.res1.l_partkey (INT4), default.res1.url (TEXT)} + PROJECTION(1) + => Targets: default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), url (TEXT) + => out schema: {(3) default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), url (TEXT)} + => in schema: {(3) default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), url (TEXT)} + SCAN(0) on default.lineitem + => target list: CASE WHEN default.lineitem.l_partkey (INT4) IS NOT NULL THEN WHEN default.lineitem.l_orderkey (INT4) = 1 THEN 1 ELSE 2 END as url, default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4) + => out schema: {(3) default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), url (TEXT)} + => in schema: {(16) default.lineitem.l_comment (TEXT), default.lineitem.l_commitdate (TEXT), default.lineitem.l_discount (FLOAT8), default.lineitem.l_extendedprice (FLOAT8), default.lineitem.l_linenumber (INT4), default.lineitem.l_linestatus (TEXT), default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), default.lineitem.l_quantity (FLOAT8), default.lineitem.l_receiptdate (TEXT), default.lineitem.l_returnflag (TEXT), default.lineitem.l_shipdate (TEXT), default.lineitem.l_shipinstruct (TEXT), default.lineitem.l_shipmode (TEXT), default.lineitem.l_suppkey (INT4), default.lineitem.l_tax (FLOAT8)} + +======================================================= +Block Id: eb_0000000000000_0000_000002 [LEAF] +======================================================= + +[Outgoing] +[q_0000000000000_0000] 2 => 3 (type=HASH_SHUFFLE, key=default.res2.p_partkey (INT4), num=32) + +TABLE_SUBQUERY(5) as default.res2 + => Targets: default.res2.p_partkey (INT4) + => out schema: {(1) default.res2.p_partkey (INT4)} + => in schema: {(9) default.res2.p_brand (TEXT), default.res2.p_comment (TEXT), default.res2.p_container (TEXT), default.res2.p_mfgr (TEXT), default.res2.p_name (TEXT), default.res2.p_partkey (INT4), default.res2.p_retailprice (FLOAT8), default.res2.p_size (INT4), default.res2.p_type (TEXT)} + PROJECTION(4) + => Targets: default.part.p_partkey (INT4), default.part.p_name (TEXT), default.part.p_mfgr (TEXT), default.part.p_brand (TEXT), default.part.p_type (TEXT), default.part.p_size (INT4), default.part.p_container (TEXT), default.part.p_retailprice (FLOAT8), default.part.p_comment (TEXT) + => out schema: {(9) default.part.p_brand (TEXT), default.part.p_comment (TEXT), default.part.p_container (TEXT), default.part.p_mfgr (TEXT), default.part.p_name (TEXT), default.part.p_partkey (INT4), default.part.p_retailprice (FLOAT8), default.part.p_size (INT4), default.part.p_type (TEXT)} + => in schema: {(9) default.part.p_brand (TEXT), default.part.p_comment (TEXT), default.part.p_container (TEXT), default.part.p_mfgr (TEXT), default.part.p_name (TEXT), default.part.p_partkey (INT4), default.part.p_retailprice (FLOAT8), default.part.p_size (INT4), default.part.p_type (TEXT)} + SCAN(3) on default.part + => target list: default.part.p_brand (TEXT), default.part.p_comment (TEXT), default.part.p_container (TEXT), default.part.p_mfgr (TEXT), default.part.p_name (TEXT), default.part.p_partkey (INT4), default.part.p_retailprice (FLOAT8), default.part.p_size (INT4), default.part.p_type (TEXT) + => out schema: {(9) default.part.p_brand (TEXT), default.part.p_comment (TEXT), default.part.p_container (TEXT), default.part.p_mfgr (TEXT), default.part.p_name (TEXT), default.part.p_partkey (INT4), default.part.p_retailprice (FLOAT8), default.part.p_size (INT4), default.part.p_type (TEXT)} + => in schema: {(9) default.part.p_brand (TEXT), default.part.p_comment (TEXT), default.part.p_container (TEXT), default.part.p_mfgr (TEXT), default.part.p_name (TEXT), default.part.p_partkey (INT4), default.part.p_retailprice (FLOAT8), default.part.p_size (INT4), default.part.p_type (TEXT)} + +======================================================= +Block Id: eb_0000000000000_0000_000003 [ROOT] +======================================================= + +[Incoming] +[q_0000000000000_0000] 1 => 3 (type=HASH_SHUFFLE, key=default.res1.l_partkey (INT4), num=32) +[q_0000000000000_0000] 2 => 3 (type=HASH_SHUFFLE, key=default.res2.p_partkey (INT4), num=32) + +TABLE_SUBQUERY(8) as default.result + => Targets: default.result.l_orderkey (INT4), default.result.l_partkey (INT4), default.result.url (TEXT) + => out schema: {(3) default.result.l_orderkey (INT4), default.result.l_partkey (INT4), default.result.url (TEXT)} + => in schema: {(3) default.result.l_orderkey (INT4), default.result.l_partkey (INT4), default.result.url (TEXT)} + PROJECTION(7) + => Targets: default.res1.l_orderkey (INT4), default.res1.l_partkey (INT4), default.res1.url (TEXT) + => out schema: {(3) default.res1.l_orderkey (INT4), default.res1.l_partkey (INT4), default.res1.url (TEXT)} + => in schema: {(3) default.res1.l_orderkey (INT4), default.res1.l_partkey (INT4), default.res1.url (TEXT)} + JOIN(11)(INNER) + => Join Cond: default.res1.l_partkey (INT4) = default.res2.p_partkey (INT4) + => target list: default.res1.l_orderkey (INT4), default.res1.l_partkey (INT4), default.res1.url (TEXT) + => out schema: {(3) default.res1.l_orderkey (INT4), default.res1.l_partkey (INT4), default.res1.url (TEXT)} + => in schema: {(4) default.res1.l_orderkey (INT4), default.res1.l_partkey (INT4), default.res1.url (TEXT), default.res2.p_partkey (INT4)} + SCAN(14) on eb_0000000000000_0000_000002 + => out schema: {(1) default.res2.p_partkey (INT4)} + => in schema: {(1) default.res2.p_partkey (INT4)} + SCAN(13) on eb_0000000000000_0000_000001 + => out schema: {(3) default.res1.l_orderkey (INT4), default.res1.l_partkey (INT4), default.res1.url (TEXT)} + => in schema: {(3) default.res1.l_orderkey (INT4), default.res1.l_partkey (INT4), default.res1.url (TEXT)} + +======================================================= +Block Id: eb_0000000000000_0000_000004 [TERMINAL] +======================================================= http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-core/src/test/resources/results/TestUnionQuery/testComplexUnion2.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestUnionQuery/testComplexUnion2.result b/tajo-core/src/test/resources/results/TestUnionQuery/testComplexUnion2.result new file mode 100644 index 0000000..da92d8c --- /dev/null +++ b/tajo-core/src/test/resources/results/TestUnionQuery/testComplexUnion2.result @@ -0,0 +1,7 @@ +l_orderkey,l_partkey,url +------------------------------- +1,1, +1,1, +2,2, +3,2, +3,3, http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AggregationFunctionCallEval.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AggregationFunctionCallEval.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AggregationFunctionCallEval.java index 33d6565..1c29a6d 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AggregationFunctionCallEval.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AggregationFunctionCallEval.java @@ -33,11 +33,12 @@ import org.apache.tajo.util.TUtil; import java.io.IOException; public class AggregationFunctionCallEval extends FunctionEval implements Cloneable { - @Expose boolean intermediatePhase = false; - @Expose boolean finalPhase = true; - @Expose String alias; -// protected AggFunction instance; + // Both firstPhase and lastPhase flags should be true before global planning. + @Expose private boolean firstPhase = true; + @Expose private boolean lastPhase = true; + @Expose private String alias; + @Expose protected FunctionInvokeContext invokeContext; protected transient AggFunctionInvoke functionInvoke; @@ -66,8 +67,8 @@ public class AggregationFunctionCallEval extends FunctionEval implements Cloneab try { if (evalContext != null && evalContext.hasScriptEngine(this)) { this.invokeContext.setScriptEngine(evalContext.getScriptEngine(this)); - this.invokeContext.getScriptEngine().setIntermediatePhase(intermediatePhase); - this.invokeContext.getScriptEngine().setFinalPhase(finalPhase); + this.invokeContext.getScriptEngine().setFirstPhase(firstPhase); + this.invokeContext.getScriptEngine().setLastPhase(lastPhase); } this.functionInvoke.init(invokeContext); } catch (IOException e) { @@ -85,7 +86,7 @@ public class AggregationFunctionCallEval extends FunctionEval implements Cloneab } protected void mergeParam(FunctionContext context, Tuple params) { - if (!intermediatePhase && !finalPhase) { + if (firstPhase) { // firstPhase functionInvoke.eval(context, params); } else { @@ -102,7 +103,7 @@ public class AggregationFunctionCallEval extends FunctionEval implements Cloneab if (!isBound) { throw new IllegalStateException("bind() must be called before terminate()"); } - if (!finalPhase) { + if (!lastPhase) { return functionInvoke.getPartialResult(context); } else { return functionInvoke.terminate(context); @@ -111,7 +112,7 @@ public class AggregationFunctionCallEval extends FunctionEval implements Cloneab @Override public DataType getValueType() { - if (!finalPhase) { + if (!lastPhase) { return functionInvoke.getPartialResultType(); } else { return funcDesc.getReturnType(); @@ -129,8 +130,8 @@ public class AggregationFunctionCallEval extends FunctionEval implements Cloneab public Object clone() throws CloneNotSupportedException { AggregationFunctionCallEval clone = (AggregationFunctionCallEval)super.clone(); - clone.finalPhase = finalPhase; - clone.intermediatePhase = intermediatePhase; + clone.lastPhase = lastPhase; + clone.firstPhase = firstPhase; clone.alias = alias; clone.invokeContext = (FunctionInvokeContext) invokeContext.clone(); if (functionInvoke != null) { @@ -140,35 +141,31 @@ public class AggregationFunctionCallEval extends FunctionEval implements Cloneab return clone; } - public boolean isIntermediatePhase() { - return intermediatePhase; - } - - public void setIntermediatePhase(boolean flag) { - this.intermediatePhase = flag; + public boolean isFirstPhase() { + return firstPhase; } - public void setFinalPhase(boolean flag) { - this.finalPhase = flag; + public boolean isLastPhase() { + return lastPhase; } - public boolean isFinalPhase() { - return finalPhase; + public void setFirstPhase() { + this.firstPhase = true; + this.lastPhase = false; } - public void setFirstPhase() { - this.finalPhase = false; - this.intermediatePhase = false; + public void setLastPhase() { + this.firstPhase = false; + this.lastPhase = true; } - public void setFinalPhase() { - this.finalPhase = true; - this.intermediatePhase = false; + public void setFirstAndLastPhase() { + this.lastPhase = this.firstPhase = true; } public void setIntermediatePhase() { - this.finalPhase = false; - this.intermediatePhase = true; + this.firstPhase = false; + this.lastPhase = false; } @Override @@ -176,8 +173,8 @@ public class AggregationFunctionCallEval extends FunctionEval implements Cloneab final int prime = 31; int result = super.hashCode(); result = prime * result + ((alias == null) ? 0 : alias.hashCode()); - result = prime * result + (finalPhase ? 1231 : 1237); - result = prime * result + (intermediatePhase ? 1231 : 1237); + result = prime * result + (lastPhase ? 1231 : 1237); + result = prime * result + (firstPhase ? 1249 : 1259); return result; } @@ -187,8 +184,8 @@ public class AggregationFunctionCallEval extends FunctionEval implements Cloneab AggregationFunctionCallEval other = (AggregationFunctionCallEval) obj; boolean eq = super.equals(other); - eq &= intermediatePhase == other.intermediatePhase; - eq &= finalPhase == other.finalPhase; + eq &= firstPhase == other.firstPhase; + eq &= lastPhase == other.lastPhase; eq &= TUtil.checkEquals(alias, other.alias); return eq; } http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java index 8980de1..0066c39 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java @@ -286,14 +286,14 @@ public class PythonScriptEngine extends TajoScriptEngine { setSchema(); } - public PythonScriptEngine(FunctionDesc functionDesc, boolean intermediatePhase, boolean finalPhase) { + public PythonScriptEngine(FunctionDesc functionDesc, boolean firstPhase, boolean lastPhase) { if (!functionDesc.getInvocation().hasPython() && !functionDesc.getInvocation().hasPythonAggregation()) { throw new IllegalStateException("Function type must be 'python'"); } functionSignature = functionDesc.getSignature(); invocationDesc = functionDesc.getInvocation().getPython(); - this.intermediatePhase = intermediatePhase; - this.finalPhase = finalPhase; + this.firstPhase = firstPhase; + this.lastPhase = lastPhase; setSchema(); } @@ -381,7 +381,7 @@ public class PythonScriptEngine extends TajoScriptEngine { outSchema = new Schema(new Column[]{new Column("out", functionSignature.getReturnType())}); } else { // UDAF - if (!intermediatePhase && !finalPhase) { + if (firstPhase) { // first phase TajoDataTypes.DataType[] paramTypes = functionSignature.getParamTypes(); inSchema = new Schema(); @@ -389,11 +389,12 @@ public class PythonScriptEngine extends TajoScriptEngine { inSchema.addColumn(new Column("in_" + i, paramTypes[i])); } outSchema = new Schema(new Column[]{new Column("json", TajoDataTypes.Type.TEXT)}); - } else if (intermediatePhase) { - inSchema = outSchema = new Schema(new Column[]{new Column("json", TajoDataTypes.Type.TEXT)}); - } else if (finalPhase) { + } else if (lastPhase) { inSchema = new Schema(new Column[]{new Column("json", TajoDataTypes.Type.TEXT)}); outSchema = new Schema(new Column[]{new Column("out", functionSignature.getReturnType())}); + } else { + // intermediate phase + inSchema = outSchema = new Schema(new Column[]{new Column("json", TajoDataTypes.Type.TEXT)}); } } projectionCols = new int[outSchema.size()]; @@ -494,7 +495,7 @@ public class PythonScriptEngine extends TajoScriptEngine { public void callAggFunc(FunctionContext functionContext, Tuple input) { String methodName; - if (!intermediatePhase && !finalPhase) { + if (firstPhase) { // eval methodName = "eval"; } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/TajoScriptEngine.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/TajoScriptEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/TajoScriptEngine.java index c233fb8..8e35955 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/TajoScriptEngine.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/TajoScriptEngine.java @@ -19,7 +19,6 @@ package org.apache.tajo.plan.function.python; import org.apache.hadoop.conf.Configuration; -import org.apache.tajo.catalog.Schema; import org.apache.tajo.datum.Datum; import org.apache.tajo.plan.function.FunctionContext; import org.apache.tajo.storage.Tuple; @@ -32,8 +31,8 @@ import java.net.URI; */ public abstract class TajoScriptEngine { - protected boolean intermediatePhase = false; - protected boolean finalPhase = false; + protected boolean firstPhase = false; + protected boolean lastPhase = false; /** * Open a stream load a script locally or in the classpath @@ -92,11 +91,11 @@ public abstract class TajoScriptEngine { public abstract Datum getFinalResult(FunctionContext functionContext); - public void setIntermediatePhase(boolean flag) { - this.intermediatePhase = flag; + public void setFirstPhase(boolean flag) { + this.firstPhase = flag; } - public void setFinalPhase(boolean flag) { - this.finalPhase = flag; + public void setLastPhase(boolean flag) { + this.lastPhase = flag; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-plan/src/main/java/org/apache/tajo/plan/logical/CreateTableNode.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/CreateTableNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/CreateTableNode.java index 0976ab5..0ab62d5 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/CreateTableNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/CreateTableNode.java @@ -36,6 +36,11 @@ public class CreateTableNode extends StoreTableNode implements Cloneable { super(pid, NodeType.CREATE_TABLE); } + @Override + public int childNum() { + return child == null ? 0 : 1; + } + public void setTableSchema(Schema schema) { this.schema = schema; } http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-plan/src/main/java/org/apache/tajo/plan/logical/JoinNode.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/JoinNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/JoinNode.java index bad8704..a0d8125 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/JoinNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/JoinNode.java @@ -22,28 +22,21 @@ package org.apache.tajo.plan.logical; import com.google.gson.annotations.Expose; - import org.apache.tajo.algebra.JoinType; import org.apache.tajo.plan.PlanString; -import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.Target; import org.apache.tajo.plan.expr.BinaryEval; import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.util.TUtil; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; public class JoinNode extends BinaryNode implements Projectable, Cloneable { @Expose private JoinType joinType; @Expose private EvalNode joinQual; @Expose private Target[] targets; - // transition states - private boolean candidateBroadcast = false; - private List<LogicalNode> broadcastCandidateTargets = new ArrayList<LogicalNode>(); - public JoinNode(int pid) { super(pid, NodeType.JOIN); } @@ -54,18 +47,6 @@ public class JoinNode extends BinaryNode implements Projectable, Cloneable { setRightChild(right); } - public boolean isCandidateBroadcast() { - return candidateBroadcast; - } - - public void setCandidateBroadcast(boolean candidateBroadcast) { - this.candidateBroadcast = candidateBroadcast; - } - - public List<LogicalNode> getBroadcastCandidateTargets() { - return broadcastCandidateTargets; - } - public JoinType getJoinType() { return this.joinType; } @@ -131,7 +112,6 @@ public class JoinNode extends BinaryNode implements Projectable, Cloneable { public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + (candidateBroadcast ? 1231 : 1237); result = prime * result + ((joinQual == null) ? 0 : joinQual.hashCode()); result = prime * result + ((joinType == null) ? 0 : joinType.hashCode()); result = prime * result + Arrays.hashCode(targets); http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java index cc54a22..587baa5 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java @@ -245,6 +245,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo Set<EvalNode> nonPushableQuals = TUtil.newHashSet(); // TODO: non-equi theta join quals must not be pushed until TAJO-742 is resolved. nonPushableQuals.addAll(extractNonEquiThetaJoinQuals(wherePredicates, block, joinNode)); + nonPushableQuals.addAll(extractNonEquiThetaJoinQuals(onPredicates, block, joinNode)); // for outer joins if (PlannerUtil.isOuterJoin(joinNode.getJoinType())) { http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java index 80c18cc..72373cf 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java @@ -34,7 +34,6 @@ import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.datum.*; import org.apache.tajo.exception.InternalException; import org.apache.tajo.plan.expr.*; -import org.apache.tajo.plan.function.AggFunction; import org.apache.tajo.plan.function.python.PythonScriptEngine; import org.apache.tajo.plan.logical.WindowSpec; import org.apache.tajo.plan.serder.PlanProto.WinFunctionEvalSpec; @@ -191,8 +190,15 @@ public class EvalNodeDeserializer { new AggregationFunctionCallEval(new FunctionDesc(funcProto.getFuncion()), params); PlanProto.AggFunctionEvalSpec aggFunctionProto = protoNode.getAggFunction(); - aggFunc.setIntermediatePhase(aggFunctionProto.getIntermediatePhase()); - aggFunc.setFinalPhase(aggFunctionProto.getFinalPhase()); + if (aggFunctionProto.getFirstPhase() && aggFunctionProto.getLastPhase()) { + aggFunc.setFirstAndLastPhase(); + } else if (aggFunctionProto.getFirstPhase()) { + aggFunc.setFirstPhase(); + } else if (aggFunctionProto.getLastPhase()) { + aggFunc.setLastPhase(); + } else { + aggFunc.setIntermediatePhase(); + } if (aggFunctionProto.hasAlias()) { aggFunc.setAlias(aggFunctionProto.getAlias()); } @@ -200,7 +206,7 @@ public class EvalNodeDeserializer { if (evalContext != null && funcDesc.getInvocation().hasPythonAggregation()) { evalContext.addScriptEngine(current, new PythonScriptEngine(funcDesc, - aggFunc.isIntermediatePhase(), aggFunc.isFinalPhase())); + aggFunc.isFirstPhase() , aggFunc.isLastPhase())); } } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java index e47d620..a03b637 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java @@ -280,8 +280,8 @@ public class EvalNodeSerializer AggregationFunctionCallEval aggFunc = (AggregationFunctionCallEval) function; PlanProto.AggFunctionEvalSpec.Builder aggFunctionEvalBuilder = PlanProto.AggFunctionEvalSpec.newBuilder(); - aggFunctionEvalBuilder.setIntermediatePhase(aggFunc.isIntermediatePhase()); - aggFunctionEvalBuilder.setFinalPhase(aggFunc.isFinalPhase()); + aggFunctionEvalBuilder.setFirstPhase(aggFunc.isFirstPhase()); + aggFunctionEvalBuilder.setLastPhase(aggFunc.isLastPhase()); if (aggFunc.hasAlias()) { aggFunctionEvalBuilder.setAlias(aggFunc.getAlias()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java index 84991bb..694e81c 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java @@ -43,11 +43,6 @@ import java.util.*; * It deserializes a list of serialized logical nodes into a logical node tree. */ public class LogicalNodeDeserializer { - private static final LogicalNodeDeserializer instance; - - static { - instance = new LogicalNodeDeserializer(); - } /** * Deserialize a list of nodes into a logical node tree. http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java index 60171de..88d831e 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java @@ -76,11 +76,11 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe private static PlanProto.LogicalNode.Builder createNodeBuilder(SerializeContext context, LogicalNode node) { int selfId; - if (context.idMap.containsKey(node)) { - selfId = context.idMap.get(node); + if (context.idMap.containsKey(node.getPID())) { + selfId = context.idMap.get(node.getPID()); } else { selfId = context.seqId++; - context.idMap.put(node, selfId); + context.idMap.put(node.getPID(), selfId); } PlanProto.LogicalNode.Builder nodeBuilder = PlanProto.LogicalNode.newBuilder(); @@ -100,7 +100,7 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe public static class SerializeContext { private int seqId = 0; - private Map<LogicalNode, Integer> idMap = Maps.newHashMap(); + private Map<Integer, Integer> idMap = Maps.newHashMap(); // map for PID and visit sequence private LogicalNodeTree.Builder treeBuilder = LogicalNodeTree.newBuilder(); } @@ -485,7 +485,6 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe public LogicalNode visitCreateTable(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, CreateTableNode node, Stack<LogicalNode> stack) throws PlanningException { super.visitCreateTable(context, plan, block, node, stack); - int [] childIds = registerGetChildIds(context, node); PlanProto.PersistentStoreNode.Builder persistentStoreBuilder = buildPersistentStoreBuilder(node, childIds); @@ -629,7 +628,10 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe private static PlanProto.PersistentStoreNode.Builder buildPersistentStoreBuilder(PersistentStoreNode node, int [] childIds) { PlanProto.PersistentStoreNode.Builder persistentStoreBuilder = PlanProto.PersistentStoreNode.newBuilder(); - persistentStoreBuilder.setChildSeq(childIds[0]); + if (childIds.length > 0) { + // Simple create table may not have any children. This should be improved at TAJO-1589. + persistentStoreBuilder.setChildSeq(childIds[0]); + } persistentStoreBuilder.setStorageType(node.getStorageType()); if (node.hasOptions()) { persistentStoreBuilder.setTableProperties(node.getOptions().getProto()); @@ -719,8 +721,8 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe private int [] registerGetChildIds(SerializeContext context, LogicalNode node) { int [] childIds = new int[node.childNum()]; for (int i = 0; i < node.childNum(); i++) { - if (context.idMap.containsKey(node.getChild(i))) { - childIds[i] = context.idMap.get(node.getChild(i)); + if (node.getChild(i) != null && context.idMap.containsKey(node.getChild(i).getPID())) { + childIds[i] = context.idMap.get(node.getChild(i).getPID()); } else { childIds[i] = context.seqId++; } http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-plan/src/main/proto/Plan.proto ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/proto/Plan.proto b/tajo-plan/src/main/proto/Plan.proto index e8172ba..77a21b7 100644 --- a/tajo-plan/src/main/proto/Plan.proto +++ b/tajo-plan/src/main/proto/Plan.proto @@ -216,17 +216,17 @@ message PartitionTableScanSpec { } message PersistentStoreNode { - required int32 childSeq = 1; + optional int32 childSeq = 1; // CreateTableNode may not have any children. This should be improved at TAJO-1589. required string storageType = 2; required KeyValueSetProto tableProperties = 3; } -message StoreTableNodeSpec { // required PersistentStoreSpec +message StoreTableNodeSpec { // required PersistentStoreNode optional string tableName = 1; // 'INSERT INTO LOCATION' does not require 'table name'. optional PartitionMethodProto partitionMethod = 2; } -message InsertNodeSpec { // required PersistentStoreSpec and StoreTableSpec +message InsertNodeSpec { // required PersistentStoreNode and StoreTableSpec required bool overwrite = 1; required SchemaProto tableSchema = 2; optional SchemaProto targetSchema = 4; @@ -234,7 +234,7 @@ message InsertNodeSpec { // required PersistentStoreSpec and StoreTableSpec optional string path = 5; } -message CreateTableNodeSpec { // required PersistentStoreSpec and StoreTableNodeSpec +message CreateTableNodeSpec { // required PersistentStoreNode and StoreTableNodeSpec required SchemaProto schema = 1; required bool external = 2; required bool ifNotExists = 3; @@ -424,8 +424,8 @@ message FunctionEval { } message AggFunctionEvalSpec { // requires FunctionEval - required bool intermediatePhase = 1; - required bool finalPhase = 2; + required bool firstPhase = 1; + required bool lastPhase = 2; optional string alias = 3; }
