http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Hash.plan ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Hash.plan b/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Hash.plan index 1c2fd7a..c0de2a1 100644 --- a/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Hash.plan +++ b/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Hash.plan @@ -10,7 +10,7 @@ JOIN(8)(RIGHT_OUTER) => out schema: {(1) default.t3.id (INT4)} => in schema: {(2) default.t3.id (INT4), default.t3.name (TEXT)} JOIN(7)(RIGHT_OUTER) - => Join Cond: default.t1.id (INT4) = default.t2.id (INT4) AND concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33 + => Join Cond: (default.t1.id (INT4) = default.t2.id (INT4) AND (concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33)) => target list: default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4) => out schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)} => in schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)} @@ -29,72 +29,43 @@ Execution Block Graph (TERMINAL - eb_0000000000000_0000_000006) ------------------------------------------------------------------------------- |-eb_0000000000000_0000_000006 |-eb_0000000000000_0000_000005 - |-eb_0000000000000_0000_000004 - |-eb_0000000000000_0000_000003 ------------------------------------------------------------------------------- Order of Execution ------------------------------------------------------------------------------- -1: eb_0000000000000_0000_000003 -2: eb_0000000000000_0000_000004 -3: eb_0000000000000_0000_000005 -4: eb_0000000000000_0000_000006 +1: eb_0000000000000_0000_000005 +2: eb_0000000000000_0000_000006 ------------------------------------------------------------------------------- ======================================================= -Block Id: eb_0000000000000_0000_000003 [LEAF] -======================================================= - -[Outgoing] -[q_0000000000000_0000] 3 => 5 (type=HASH_SHUFFLE, key=default.t1.id (INT4), num=32) - -[Enforcers] - 0: type=Broadcast, tables=default.t1 - -JOIN(7)(RIGHT_OUTER) - => Join Cond: default.t1.id (INT4) = default.t2.id (INT4) AND concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33 - => target list: default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4) - => out schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)} - => in schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)} - SCAN(1) on default.jointable12 as t2 - => target list: default.t2.id (INT4) - => out schema: {(1) default.t2.id (INT4)} - => in schema: {(2) default.t2.id (INT4), default.t2.name (TEXT)} - SCAN(0) on default.jointable11 as t1 - => target list: default.t1.id (INT4), default.t1.name (TEXT) - => out schema: {(2) default.t1.id (INT4), default.t1.name (TEXT)} - => in schema: {(2) default.t1.id (INT4), default.t1.name (TEXT)} - -======================================================= -Block Id: eb_0000000000000_0000_000004 [LEAF] -======================================================= - -[Outgoing] -[q_0000000000000_0000] 4 => 5 (type=HASH_SHUFFLE, key=default.t3.id (INT4), num=32) - -SCAN(3) on default.jointable13 as t3 - => target list: default.t3.id (INT4) - => out schema: {(1) default.t3.id (INT4)} - => in schema: {(2) default.t3.id (INT4), default.t3.name (TEXT)} - -======================================================= Block Id: eb_0000000000000_0000_000005 [ROOT] ======================================================= -[Incoming] -[q_0000000000000_0000] 3 => 5 (type=HASH_SHUFFLE, key=default.t1.id (INT4), num=32) -[q_0000000000000_0000] 4 => 5 (type=HASH_SHUFFLE, key=default.t3.id (INT4), num=32) +[Enforcers] + 0: type=Broadcast, tables=default.t2 + 1: type=Broadcast, tables=default.t1 JOIN(8)(RIGHT_OUTER) => Join Cond: default.t1.id (INT4) = default.t3.id (INT4) => target list: default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4), default.t3.id (INT4) => out schema: {(4) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4), default.t3.id (INT4)} => in schema: {(4) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4), default.t3.id (INT4)} - SCAN(13) on eb_0000000000000_0000_000004 + SCAN(3) on default.jointable13 as t3 + => target list: default.t3.id (INT4) => out schema: {(1) default.t3.id (INT4)} - => in schema: {(1) default.t3.id (INT4)} - SCAN(12) on eb_0000000000000_0000_000003 + => in schema: {(2) default.t3.id (INT4), default.t3.name (TEXT)} + JOIN(7)(RIGHT_OUTER) + => Join Cond: (default.t1.id (INT4) = default.t2.id (INT4) AND (concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33)) + => target list: default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4) => out schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)} => in schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)} + SCAN(1) on default.jointable12 as t2 + => target list: default.t2.id (INT4) + => out schema: {(1) default.t2.id (INT4)} + => in schema: {(2) default.t2.id (INT4), default.t2.name (TEXT)} + SCAN(0) on default.jointable11 as t1 + => target list: default.t1.id (INT4), default.t1.name (TEXT) + => out schema: {(2) default.t1.id (INT4), default.t1.name (TEXT)} + => in schema: {(2) default.t1.id (INT4), default.t1.name (TEXT)} ======================================================= Block Id: eb_0000000000000_0000_000006 [TERMINAL]
http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Hash_NoBroadcast.plan ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Hash_NoBroadcast.plan b/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Hash_NoBroadcast.plan index 5a589ff..fab3809 100644 --- a/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Hash_NoBroadcast.plan +++ b/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Hash_NoBroadcast.plan @@ -10,7 +10,7 @@ JOIN(8)(RIGHT_OUTER) => out schema: {(1) default.t3.id (INT4)} => in schema: {(2) default.t3.id (INT4), default.t3.name (TEXT)} JOIN(7)(RIGHT_OUTER) - => Join Cond: default.t1.id (INT4) = default.t2.id (INT4) AND concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33 + => Join Cond: (default.t1.id (INT4) = default.t2.id (INT4) AND (concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33)) => target list: default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4) => out schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)} => in schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)} @@ -80,7 +80,7 @@ Block Id: eb_0000000000000_0000_000003 [INTERMEDIATE] [q_0000000000000_0000] 3 => 5 (type=HASH_SHUFFLE, key=default.t1.id (INT4), num=32) JOIN(7)(RIGHT_OUTER) - => Join Cond: default.t1.id (INT4) = default.t2.id (INT4) AND concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33 + => Join Cond: (default.t1.id (INT4) = default.t2.id (INT4) AND (concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33)) => target list: default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4) => out schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)} => in schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)} http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Sort.plan ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Sort.plan b/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Sort.plan index 1c2fd7a..c0de2a1 100644 --- a/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Sort.plan +++ b/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Sort.plan @@ -10,7 +10,7 @@ JOIN(8)(RIGHT_OUTER) => out schema: {(1) default.t3.id (INT4)} => in schema: {(2) default.t3.id (INT4), default.t3.name (TEXT)} JOIN(7)(RIGHT_OUTER) - => Join Cond: default.t1.id (INT4) = default.t2.id (INT4) AND concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33 + => Join Cond: (default.t1.id (INT4) = default.t2.id (INT4) AND (concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33)) => target list: default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4) => out schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)} => in schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)} @@ -29,72 +29,43 @@ Execution Block Graph (TERMINAL - eb_0000000000000_0000_000006) ------------------------------------------------------------------------------- |-eb_0000000000000_0000_000006 |-eb_0000000000000_0000_000005 - |-eb_0000000000000_0000_000004 - |-eb_0000000000000_0000_000003 ------------------------------------------------------------------------------- Order of Execution ------------------------------------------------------------------------------- -1: eb_0000000000000_0000_000003 -2: eb_0000000000000_0000_000004 -3: eb_0000000000000_0000_000005 -4: eb_0000000000000_0000_000006 +1: eb_0000000000000_0000_000005 +2: eb_0000000000000_0000_000006 ------------------------------------------------------------------------------- ======================================================= -Block Id: eb_0000000000000_0000_000003 [LEAF] -======================================================= - -[Outgoing] -[q_0000000000000_0000] 3 => 5 (type=HASH_SHUFFLE, key=default.t1.id (INT4), num=32) - -[Enforcers] - 0: type=Broadcast, tables=default.t1 - -JOIN(7)(RIGHT_OUTER) - => Join Cond: default.t1.id (INT4) = default.t2.id (INT4) AND concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33 - => target list: default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4) - => out schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)} - => in schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)} - SCAN(1) on default.jointable12 as t2 - => target list: default.t2.id (INT4) - => out schema: {(1) default.t2.id (INT4)} - => in schema: {(2) default.t2.id (INT4), default.t2.name (TEXT)} - SCAN(0) on default.jointable11 as t1 - => target list: default.t1.id (INT4), default.t1.name (TEXT) - => out schema: {(2) default.t1.id (INT4), default.t1.name (TEXT)} - => in schema: {(2) default.t1.id (INT4), default.t1.name (TEXT)} - -======================================================= -Block Id: eb_0000000000000_0000_000004 [LEAF] -======================================================= - -[Outgoing] -[q_0000000000000_0000] 4 => 5 (type=HASH_SHUFFLE, key=default.t3.id (INT4), num=32) - -SCAN(3) on default.jointable13 as t3 - => target list: default.t3.id (INT4) - => out schema: {(1) default.t3.id (INT4)} - => in schema: {(2) default.t3.id (INT4), default.t3.name (TEXT)} - -======================================================= Block Id: eb_0000000000000_0000_000005 [ROOT] ======================================================= -[Incoming] -[q_0000000000000_0000] 3 => 5 (type=HASH_SHUFFLE, key=default.t1.id (INT4), num=32) -[q_0000000000000_0000] 4 => 5 (type=HASH_SHUFFLE, key=default.t3.id (INT4), num=32) +[Enforcers] + 0: type=Broadcast, tables=default.t2 + 1: type=Broadcast, tables=default.t1 JOIN(8)(RIGHT_OUTER) => Join Cond: default.t1.id (INT4) = default.t3.id (INT4) => target list: default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4), default.t3.id (INT4) => out schema: {(4) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4), default.t3.id (INT4)} => in schema: {(4) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4), default.t3.id (INT4)} - SCAN(13) on eb_0000000000000_0000_000004 + SCAN(3) on default.jointable13 as t3 + => target list: default.t3.id (INT4) => out schema: {(1) default.t3.id (INT4)} - => in schema: {(1) default.t3.id (INT4)} - SCAN(12) on eb_0000000000000_0000_000003 + => in schema: {(2) default.t3.id (INT4), default.t3.name (TEXT)} + JOIN(7)(RIGHT_OUTER) + => Join Cond: (default.t1.id (INT4) = default.t2.id (INT4) AND (concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33)) + => target list: default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4) => out schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)} => in schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)} + SCAN(1) on default.jointable12 as t2 + => target list: default.t2.id (INT4) + => out schema: {(1) default.t2.id (INT4)} + => in schema: {(2) default.t2.id (INT4), default.t2.name (TEXT)} + SCAN(0) on default.jointable11 as t1 + => target list: default.t1.id (INT4), default.t1.name (TEXT) + => out schema: {(2) default.t1.id (INT4), default.t1.name (TEXT)} + => in schema: {(2) default.t1.id (INT4), default.t1.name (TEXT)} ======================================================= Block Id: eb_0000000000000_0000_000006 [TERMINAL] http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Sort_NoBroadcast.plan ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Sort_NoBroadcast.plan b/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Sort_NoBroadcast.plan index 5a589ff..fab3809 100644 --- a/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Sort_NoBroadcast.plan +++ b/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Sort_NoBroadcast.plan @@ -10,7 +10,7 @@ JOIN(8)(RIGHT_OUTER) => out schema: {(1) default.t3.id (INT4)} => in schema: {(2) default.t3.id (INT4), default.t3.name (TEXT)} JOIN(7)(RIGHT_OUTER) - => Join Cond: default.t1.id (INT4) = default.t2.id (INT4) AND concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33 + => Join Cond: (default.t1.id (INT4) = default.t2.id (INT4) AND (concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33)) => target list: default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4) => out schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)} => in schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)} @@ -80,7 +80,7 @@ Block Id: eb_0000000000000_0000_000003 [INTERMEDIATE] [q_0000000000000_0000] 3 => 5 (type=HASH_SHUFFLE, key=default.t1.id (INT4), num=32) JOIN(7)(RIGHT_OUTER) - => Join Cond: default.t1.id (INT4) = default.t2.id (INT4) AND concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33 + => Join Cond: (default.t1.id (INT4) = default.t2.id (INT4) AND (concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33)) => target list: default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4) => out schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)} => in schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)} http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result index 1f878f1..33443da 100644 --- a/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result +++ b/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result @@ -18,7 +18,8 @@ Available Session Variables: \set LC_MONETARY [text value] - Formatting of currency amounts \set LC_NUMERIC [text value] - Formatting of numbers \set LC_TIME [text value] - Formatting of dates and times -\set BROADCAST_TABLE_SIZE_LIMIT [long value] - limited size (bytes) of broadcast table +\set BROADCAST_NON_CROSS_JOIN_THRESHOLD [long value] - restriction for the total bytes of broadcasted table for non-cross join +\set BROADCAST_CROSS_JOIN_THRESHOLD [long value] - restriction for the total bytes of broadcasted table for cross join \set JOIN_TASK_INPUT_SIZE [int value] - join task input size (mb) \set SORT_TASK_INPUT_SIZE [int value] - sort task input size (mb) \set GROUPBY_TASK_INPUT_SIZE [int value] - group by task input size (mb) @@ -41,4 +42,4 @@ Available Session Variables: \set ARITHABORT [true or false] - If true, a running query will be terminated when an overflow or divide-by-zero occurs. \set FETCH_ROWNUM [int value] - Sets the number of rows at a time from Master \set BLOCK_ON_RESULT [true or false] - Whether to block result set on query execution -\set DEBUG_ENABLED [true or false] - (debug only) debug mode enabled +\set DEBUG_ENABLED [true or false] - (debug only) debug mode enabled \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java index 5bbf3a9..b04bdc4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java @@ -330,20 +330,18 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { JoinAlgorithm algorithm = property.getJoin().getAlgorithm(); switch (algorithm) { - case NESTED_LOOP_JOIN: - LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]"); - return new NLJoinExec(context, plan, leftExec, rightExec); - case BLOCK_NESTED_LOOP_JOIN: - LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]"); - return new BNLJoinExec(context, plan, leftExec, rightExec); default: // fallback algorithm LOG.error("Invalid Cross Join Algorithm Enforcer: " + algorithm.name()); - return new BNLJoinExec(context, plan, leftExec, rightExec); + PhysicalExec [] orderedChilds = switchJoinSidesIfNecessary(context, plan, leftExec, rightExec); + return new HashJoinExec(context, plan, orderedChilds[1], orderedChilds[0]); } } else { - return new BNLJoinExec(context, plan, leftExec, rightExec); + LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]"); + // returns two PhysicalExec. smaller one is 0, and larger one is 1. + PhysicalExec [] orderedChilds = switchJoinSidesIfNecessary(context, plan, leftExec, rightExec); + return new HashJoinExec(context, plan, orderedChilds[1], orderedChilds[0]); } } @@ -356,12 +354,6 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { JoinAlgorithm algorithm = property.getJoin().getAlgorithm(); switch (algorithm) { - case NESTED_LOOP_JOIN: - LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]"); - return new NLJoinExec(context, plan, leftExec, rightExec); - case BLOCK_NESTED_LOOP_JOIN: - LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]"); - return new BNLJoinExec(context, plan, leftExec, rightExec); case IN_MEMORY_HASH_JOIN: LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]"); // returns two PhysicalExec. smaller one is 0, and larger one is 1. @@ -389,7 +381,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { */ @VisibleForTesting public PhysicalExec [] switchJoinSidesIfNecessary(TaskAttemptContext context, JoinNode plan, - PhysicalExec left, PhysicalExec right) throws IOException { + PhysicalExec left, PhysicalExec right) throws IOException { String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild()); String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild()); long leftSize = estimateSizeRecursive(context, leftLineage); http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java index 92ecadd..d67cee8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java @@ -302,10 +302,6 @@ public class Enforcer implements ProtoObject<EnforcerProto> { sb.append("type=Join,alg="); if (join.getAlgorithm() == JoinEnforce.JoinAlgorithm.MERGE_JOIN) { sb.append("merge_join"); - } else if (join.getAlgorithm() == JoinEnforce.JoinAlgorithm.NESTED_LOOP_JOIN) { - sb.append("nested_loop"); - } else if (join.getAlgorithm() == JoinEnforce.JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN) { - sb.append("block_nested_loop"); } else if (join.getAlgorithm() == JoinEnforce.JoinAlgorithm.IN_MEMORY_HASH_JOIN) { sb.append("in_memory_hash"); } http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java index 4f352c1..c71324d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java @@ -194,8 +194,10 @@ public class ExecutionBlock { } public void addBroadcastRelation(ScanNode relationNode) { + if (!broadcastRelations.containsKey(relationNode.getCanonicalName())) { + enforcer.addBroadcast(relationNode.getCanonicalName()); + } broadcastRelations.put(relationNode.getCanonicalName(), relationNode); - enforcer.addBroadcast(relationNode.getCanonicalName()); } public void removeBroadcastRelation(ScanNode relationNode) { http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java index 6f7b4c9..dbb92e1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java @@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.global.rewriter.rules; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.OverridableConf; import org.apache.tajo.SessionVars; +import org.apache.tajo.algebra.JoinType; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.GlobalPlanner; import org.apache.tajo.engine.planner.global.MasterPlan; @@ -28,6 +29,7 @@ import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanRewriteRule; import org.apache.tajo.exception.TajoException; import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.joinorder.GreedyHeuristicJoinOrderAlgorithm; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.util.TUtil; @@ -42,9 +44,6 @@ import java.util.*; * <h3>Broadcastable relation</h3> * A relation is broadcastable when its size is smaller than a given threshold. * - * <h3>Assumetion</h3> - * If every input of an execution block is broadcastable, the output of the execution block is also broadcastable. - * * <h3>Rules to convert repartition join into broadcast join</h3> * <ul> * <li>Given an EB containing a join and its child EBs, those EBs can be merged into a single EB if at least one child EB's output is broadcastable.</li> @@ -65,24 +64,31 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule { private BroadcastJoinPlanBuilder planBuilder; private BroadcastJoinPlanFinalizer planFinalizer; + protected void init(MasterPlan plan, long thresholdForNonCrossJoin, long thresholdForCrossJoin, + boolean broadcastForNonCrossJoinEnabled) { + GlobalPlanRewriteUtil.ParentFinder parentFinder = new GlobalPlanRewriteUtil.ParentFinder(); + RelationSizeComparator relSizeComparator = new RelationSizeComparator(); + planBuilder = new BroadcastJoinPlanBuilder(plan, relSizeComparator, parentFinder, thresholdForNonCrossJoin, + thresholdForCrossJoin, broadcastForNonCrossJoinEnabled); + planFinalizer = new BroadcastJoinPlanFinalizer(plan, relSizeComparator); + } + @Override public String getName() { - return "BroadcastJoinRule"; + return "Broadcast join rule"; } @Override public boolean isEligible(OverridableConf queryContext, MasterPlan plan) { - if (queryContext.getBool(SessionVars.TEST_BROADCAST_JOIN_ENABLED)) { + long thresholdForNonCrossJoin = queryContext.getLong(SessionVars.BROADCAST_NON_CROSS_JOIN_THRESHOLD); + long thresholdForCrossJoin = queryContext.getLong(SessionVars.BROADCAST_CROSS_JOIN_THRESHOLD); + boolean broadcastJoinEnabled = queryContext.getBool(SessionVars.TEST_BROADCAST_JOIN_ENABLED); + if (broadcastJoinEnabled && + (thresholdForNonCrossJoin > 0 || thresholdForCrossJoin > 0)) { for (LogicalPlan.QueryBlock block : plan.getLogicalPlan().getQueryBlocks()) { if (block.hasNode(NodeType.JOIN)) { - long broadcastSizeThreshold = queryContext.getLong(SessionVars.BROADCAST_TABLE_SIZE_LIMIT); - if (broadcastSizeThreshold > 0) { - GlobalPlanRewriteUtil.ParentFinder parentFinder = new GlobalPlanRewriteUtil.ParentFinder(); - RelationSizeComparator relSizeComparator = new RelationSizeComparator(); - planBuilder = new BroadcastJoinPlanBuilder(plan, relSizeComparator, parentFinder, broadcastSizeThreshold); - planFinalizer = new BroadcastJoinPlanFinalizer(plan, relSizeComparator); - return true; - } + init(plan, thresholdForNonCrossJoin, thresholdForCrossJoin, broadcastJoinEnabled); + return true; } } } @@ -116,7 +122,7 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule { * {@Link BroadcastJoinPlanFinalizer} checks whether every input is the broadcast candidate or not. * If so, it removes the broadcast property from the largest relation. */ - private static class BroadcastJoinPlanFinalizer implements DirectedGraphVisitor<ExecutionBlockId> { + private class BroadcastJoinPlanFinalizer implements DirectedGraphVisitor<ExecutionBlockId> { private final MasterPlan plan; private final RelationSizeComparator relSizeComparator; @@ -141,18 +147,25 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule { } } - private static class BroadcastJoinPlanBuilder implements DirectedGraphVisitor<ExecutionBlockId> { + private class BroadcastJoinPlanBuilder implements DirectedGraphVisitor<ExecutionBlockId> { private final MasterPlan plan; private final RelationSizeComparator relSizeComparator; - private final long broadcastSizeThreshold; + private final long thresholdForNonCrossJoin; + private final long thresholdForCrossJoin; + private final boolean broadcastForNonCrossJoinEnabled; private final GlobalPlanRewriteUtil.ParentFinder parentFinder; + private final Map<ExecutionBlockId, Long> estimatedEbOutputSize = TUtil.newHashMap(); public BroadcastJoinPlanBuilder(MasterPlan plan, RelationSizeComparator relationSizeComparator, - GlobalPlanRewriteUtil.ParentFinder parentFinder, long broadcastSizeThreshold) { + GlobalPlanRewriteUtil.ParentFinder parentFinder, + long thresholdForNonCrossJoin, long thresholdForCrossJoin, + boolean broadcastForNonCrossJoinEnabled) { this.plan = plan; this.relSizeComparator = relationSizeComparator; - this.broadcastSizeThreshold = broadcastSizeThreshold; + this.thresholdForNonCrossJoin = thresholdForNonCrossJoin; + this.thresholdForCrossJoin = thresholdForCrossJoin; this.parentFinder = parentFinder; + this.broadcastForNonCrossJoinEnabled = broadcastForNonCrossJoinEnabled; } @Override @@ -166,28 +179,30 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule { } } + /** + * Estimate the result size of leaf blocks. + * + * @param current + */ private void visitLeafNode(ExecutionBlock current) { - // At leaf execution blocks, find input relations who's size is smaller than the predefined threshold. + // Preserved-row relations must not be broadcasted to avoid data duplication. if (!current.isPreservedRow()) { - // Preserved-row relations must not be broadcasted to avoid data duplication. - boolean fullyBroadcastable = true; + long totalVolume = 0; for (ScanNode scanNode : current.getScanNodes()) { - if (GlobalPlanRewriteUtil.getTableVolume(scanNode) <= broadcastSizeThreshold) { - current.addBroadcastRelation(scanNode); - } else { - fullyBroadcastable = false; - } - } - if (fullyBroadcastable && current.getScanNodes().length == 1) { - try { - updateScanOfParentAsBroadcastable(plan, current); - } catch (NoScanNodeForChildEbException e) { - // This case is when the current has two or more inputs via union, and simply ignored. - } + totalVolume += GlobalPlanRewriteUtil.getTableVolume(scanNode); } + estimatedEbOutputSize.put(current.getId(), totalVolume); } } + /** + * 1. Based on the join type, find broadcastable relations of the child execution blocks. + * 2. Update the current block's inputs based on the broadcastability of the child blocks. + * 3. Merge child blocks and the current block if the scan to the corresponding child block is broadcastable. + * 4. Estimate the result size of the current block. + * + * @param current + */ private void visitNonLeafNode(ExecutionBlock current) { // At non-leaf execution blocks, merge broadcastable children's plan with the current plan. @@ -195,21 +210,31 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule { if (current.hasJoin()) { List<ExecutionBlock> childs = plan.getChilds(current); Map<ExecutionBlockId, ExecutionBlockId> unionScanMap = current.getUnionScanMap(); + LogicalNode found = PlannerUtil.findTopNode(current.getPlan(), NodeType.JOIN); + if (found == null) { + throw new TajoInternalError("ExecutionBlock " + current.getId() + " doesn't have any join operator, " + + "but the master plan indicates that it has."); + } + JoinType joinType = ((JoinNode)found).getJoinType(); + + for (ExecutionBlock child : childs) { + if (!child.isPreservedRow()) { + updateBroadcastableRelForChildEb(child, joinType); + updateInputBasedOnChildEb(child, current); + } + } if (current.hasBroadcastRelation()) { // The current execution block and its every child are able to be merged. for (ExecutionBlock child : childs) { addUnionNodeIfNecessary(unionScanMap, plan, child, current); - mergeTwoPhaseJoin(plan, child, current); + mergeTwoPhaseJoinIfPossible(plan, child, current); } checkTotalSizeOfBroadcastableRelations(current); - // We assume that if every input of an execution block is broadcastable, - // the output of the execution block is also broadcastable. - if (!current.isPreservedRow() && isFullyBroadcastable(current)) { - updateScanOfParentAsBroadcastable(plan, current); - } + long outputVolume = estimateOutputVolume(current); + estimatedEbOutputSize.put(current.getId(), outputVolume); } } else { List<ScanNode> relations = TUtil.newList(current.getBroadcastRelations()); @@ -220,6 +245,134 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule { } } + private void updateInputBasedOnChildEb(ExecutionBlock child, ExecutionBlock parent) { + if (isFullyBroadcastable(child)) { + if (plan.isLeaf(child) && child.getScanNodes().length == 1) { + try { + updateScanOfParentAsBroadcastable(plan, child, parent); + } catch (NoScanNodeForChildEbException e) { + // This case is when the current has two or more inputs via union, and simply ignored. + } + } else { + updateScanOfParentAsBroadcastable(plan, child, parent); + } + } + } + + private void updateBroadcastableRelForChildEb(ExecutionBlock child, JoinType joinType) { + long threshold = joinType == JoinType.CROSS ? thresholdForCrossJoin : thresholdForNonCrossJoin; + for (ScanNode scanNode : child.getScanNodes()) { + long volume = GlobalPlanRewriteUtil.getTableVolume(scanNode); + if (volume >= 0 && volume <= threshold) { + // If the child eb is already visited, the below line may update its broadcast relations. + // Furthermore, this operation might mark the preserved-row relation as the broadcast relation with outer join. + // However, the rewriting result is still valid. Please consider the following query: + // + // EX) SELECT ... FROM a LEFT OUTER JOIN b on ... LEFT OUTER JOIN c on ... + // + // and assume that three relations of a, b, and c are all broadcastable. + // The initial global plan will be as follow: + // + // EB 2) + // LEFT OUTER JOIN + // / \ + // c EB_1 + // EB 1) + // LEFT OUTER JOIN + // / \ + // a b + // + // When visiting EB_1, the bellow line marks only b as the broadcast relation because a is the preserved-row + // relation. However, when visiting EB_2, it marks both a and b as the broadcast relations because EB_1 is + // the null-supplying relation which has a and b as its inputs. + // Thus, the rewriting result will be like + // + // EB 2) broadcast: a, b + // LEFT OUTER JOIN + // / \ + // c LEFT OUTER JOIN + // / \ + // a b + // + // This plan returns the same result as a plan that broadcasts the result of the first join. + // Obviously, the result must be valid. + child.addBroadcastRelation(scanNode); + } + } + } + + private long estimateOutputVolume(ExecutionBlock block) { + return estimateOutputVolumeInternal(PlannerUtil.<JoinNode>findTopNode(block.getPlan(), NodeType.JOIN)); + } + + private long estimateOutputVolumeInternal(LogicalNode node) throws TajoInternalError { + + if (node instanceof RelationNode) { + switch (node.getType()) { + case INDEX_SCAN: + case SCAN: + ScanNode scanNode = (ScanNode) node; + if (scanNode.getTableDesc().getStats() == null) { + // TODO - this case means that data is not located in HDFS. So, we need additional + // broadcast method. + return Long.MAX_VALUE; + } else { + return scanNode.getTableDesc().getStats().getNumBytes(); + } + case PARTITIONS_SCAN: + PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) node; + if (pScanNode.getTableDesc().getStats() == null) { + // TODO - this case means that data is not located in HDFS. So, we need additional + // broadcast method. + return Long.MAX_VALUE; + } else { + // if there is no selected partition + if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) { + return 0; + } else { + return pScanNode.getTableDesc().getStats().getNumBytes(); + } + } + case TABLE_SUBQUERY: + return estimateOutputVolumeInternal(((TableSubQueryNode) node).getSubQuery()); + } + } else if (node instanceof UnaryNode) { + return estimateOutputVolumeInternal(((UnaryNode) node).getChild()); + } else if (node instanceof UnionNode) { + UnionNode binaryNode = (UnionNode) node; + return estimateOutputVolumeInternal(binaryNode.getLeftChild()) + + estimateOutputVolumeInternal(binaryNode.getRightChild()); + } else if (node instanceof JoinNode) { + JoinNode joinNode = (JoinNode) node; + JoinSpec joinSpec = joinNode.getJoinSpec(); + long leftChildVolume = estimateOutputVolumeInternal(joinNode.getLeftChild()); + long rightChildVolume = estimateOutputVolumeInternal(joinNode.getRightChild()); + switch (joinNode.getJoinType()) { + case CROSS: + return leftChildVolume * rightChildVolume; + case INNER: + return (long) (leftChildVolume * rightChildVolume * + Math.pow(GreedyHeuristicJoinOrderAlgorithm.DEFAULT_SELECTION_FACTOR, joinSpec.getPredicates().size())); + case LEFT_OUTER: + return leftChildVolume; + case RIGHT_OUTER: + return rightChildVolume; + case FULL_OUTER: + return leftChildVolume < rightChildVolume ? leftChildVolume : rightChildVolume; + case LEFT_ANTI: + case LEFT_SEMI: + return (long) (leftChildVolume * + Math.pow(GreedyHeuristicJoinOrderAlgorithm.DEFAULT_SELECTION_FACTOR, joinSpec.getPredicates().size())); + case RIGHT_ANTI: + case RIGHT_SEMI: + return (long) (rightChildVolume * + Math.pow(GreedyHeuristicJoinOrderAlgorithm.DEFAULT_SELECTION_FACTOR, joinSpec.getPredicates().size())); + } + } + + throw new TajoInternalError("Invalid State at node " + node.getPID()); + } + /** * When the total size of broadcastable relations exceeds the threshold, enforce repartition join for large ones * in order to broadcast as many relations as possible. @@ -227,21 +380,17 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule { * @param block */ private void checkTotalSizeOfBroadcastableRelations(ExecutionBlock block) { - List<ScanNode> broadcastCandidates = TUtil.newList(); - for (ScanNode scanNode : block.getScanNodes()) { - long estimatedRelationSize = GlobalPlanRewriteUtil.getTableVolume(scanNode); - if (estimatedRelationSize > 0 && estimatedRelationSize <= broadcastSizeThreshold) { - broadcastCandidates.add(scanNode); - } - } + List<ScanNode> broadcastCandidates = TUtil.newList(block.getBroadcastRelations()); Collections.sort(broadcastCandidates, relSizeComparator); // Enforce broadcast for candidates in ascending order of relation size long totalBroadcastVolume = 0; + long largeThreshold = thresholdForCrossJoin > thresholdForNonCrossJoin ? + thresholdForCrossJoin : thresholdForNonCrossJoin; int i; for (i = 0; i < broadcastCandidates.size(); i++) { long volumeOfCandidate = GlobalPlanRewriteUtil.getTableVolume(broadcastCandidates.get(i)); - if (totalBroadcastVolume + volumeOfCandidate > broadcastSizeThreshold) { + if (totalBroadcastVolume + volumeOfCandidate > largeThreshold) { break; } totalBroadcastVolume += volumeOfCandidate; @@ -253,8 +402,7 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule { } } - private void updateScanOfParentAsBroadcastable(MasterPlan plan, ExecutionBlock current) { - ExecutionBlock parent = plan.getParent(current); + private void updateScanOfParentAsBroadcastable(MasterPlan plan, ExecutionBlock current, ExecutionBlock parent) { if (parent != null && !plan.isTerminal(parent)) { ScanNode scanForCurrent = findScanForChildEb(current, parent); parent.addBroadcastRelation(scanForCurrent); @@ -269,7 +417,7 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule { * @param parent parent block who has join nodes * @return */ - private ExecutionBlock mergeTwoPhaseJoin(MasterPlan plan, ExecutionBlock child, ExecutionBlock parent) { + private ExecutionBlock mergeTwoPhaseJoinIfPossible(MasterPlan plan, ExecutionBlock child, ExecutionBlock parent) { ScanNode scanForChild = findScanForChildEb(child, parent); parentFinder.set(scanForChild); http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java index b14687d..df1f33b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java @@ -152,7 +152,7 @@ public class GlobalPlanRewriteUtil { return computeDescendentVolume(binaryNode.getLeftChild()) + computeDescendentVolume(binaryNode.getRightChild()); } - throw new TajoInternalError("invalid state"); + throw new TajoInternalError("Invalid State at node " + node.getPID()); } public static class ParentFinder implements LogicalNodeVisitor { http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java deleted file mode 100644 index d28b7f6..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java +++ /dev/null @@ -1,183 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.engine.planner.physical; - -import org.apache.tajo.plan.logical.JoinNode; -import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.worker.TaskAttemptContext; - -import java.io.IOException; -import java.util.Iterator; - -public class BNLJoinExec extends CommonJoinExec { - - private TupleList leftTupleSlots; - private TupleList rightTupleSlots; - private Iterator<Tuple> leftIterator; - private Iterator<Tuple> rightIterator; - - private boolean leftEnd; - private boolean rightEnd; - - // temporal tuples and states for nested loop join - private Tuple leftTuple = null; - private Tuple rightNext = null; - - private final static int TUPLE_SLOT_SIZE = 10000; - - public BNLJoinExec(final TaskAttemptContext context, final JoinNode plan, - final PhysicalExec leftExec, PhysicalExec rightExec) { - super(context, plan, leftExec, rightExec); - this.leftTupleSlots = new TupleList(TUPLE_SLOT_SIZE); - this.rightTupleSlots = new TupleList(TUPLE_SLOT_SIZE); - this.leftIterator = leftTupleSlots.iterator(); - this.rightIterator = rightTupleSlots.iterator(); - this.rightEnd = false; - this.leftEnd = false; - - // for projection - if (!plan.hasTargets()) { - plan.setTargets(PlannerUtil.schemaToTargets(outSchema)); - } - } - - public Tuple next() throws IOException { - - if (leftTupleSlots.isEmpty()) { - for (int k = 0; k < TUPLE_SLOT_SIZE; k++) { - Tuple t = leftChild.next(); - if (t == null) { - leftEnd = true; - break; - } - leftTupleSlots.add(t); - } - leftIterator = leftTupleSlots.iterator(); - leftTuple = leftIterator.next(); - } - - if (rightTupleSlots.isEmpty()) { - for (int k = 0; k < TUPLE_SLOT_SIZE; k++) { - Tuple t = rightChild.next(); - if (t == null) { - rightEnd = true; - break; - } - rightTupleSlots.add(t); - } - rightIterator = rightTupleSlots.iterator(); - } - - if((rightNext = rightChild.next()) == null){ - rightEnd = true; - } - - while (!context.isStopped()) { - if (!rightIterator.hasNext()) { // if leftIterator ended - if (leftIterator.hasNext()) { // if rightTupleslot remains - leftTuple = leftIterator.next(); - rightIterator = rightTupleSlots.iterator(); - } else { - if (rightEnd) { - rightChild.rescan(); - rightEnd = false; - - if (leftEnd) { - return null; - } - leftTupleSlots.clear(); - for (int k = 0; k < TUPLE_SLOT_SIZE; k++) { - Tuple t = leftChild.next(); - if (t == null) { - leftEnd = true; - break; - } - leftTupleSlots.add(t); - } - if (leftTupleSlots.isEmpty()) { - return null; - } - leftIterator = leftTupleSlots.iterator(); - leftTuple = leftIterator.next(); - - } else { - leftIterator = leftTupleSlots.iterator(); - leftTuple = leftIterator.next(); - } - - rightTupleSlots.clear(); - if (rightNext != null) { - rightTupleSlots.add(rightNext); - for (int k = 1; k < TUPLE_SLOT_SIZE; k++) { // fill right - Tuple t = rightChild.next(); - if (t == null) { - rightEnd = true; - break; - } - rightTupleSlots.add(t); - } - } else { - for (int k = 0; k < TUPLE_SLOT_SIZE; k++) { // fill right - Tuple t = rightChild.next(); - if (t == null) { - rightEnd = true; - break; - } - rightTupleSlots.add(t); - } - } - - if ((rightNext = rightChild.next()) == null) { - rightEnd = true; - } - rightIterator = rightTupleSlots.iterator(); - } - } - - frameTuple.set(leftTuple, rightIterator.next()); - if (!hasJoinQual || joinQual.eval(frameTuple).isTrue()) { - return projector.eval(frameTuple); - } - } - return null; - } - - @Override - public void rescan() throws IOException { - super.rescan(); - rightEnd = false; - rightTupleSlots.clear(); - leftTupleSlots.clear(); - rightIterator = rightTupleSlots.iterator(); - leftIterator = leftTupleSlots.iterator(); - } - - @Override - public void close() throws IOException { - super.close(); - - rightTupleSlots.clear(); - leftTupleSlots.clear(); - rightTupleSlots = null; - leftTupleSlots = null; - rightIterator = null; - leftIterator = null; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java index a59960f..e171338 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java @@ -24,6 +24,7 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.datum.Datum; import org.apache.tajo.engine.planner.Projector; @@ -31,6 +32,7 @@ import org.apache.tajo.plan.Target; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.expr.EvalTreeUtil; import org.apache.tajo.plan.logical.IndexScanNode; +import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.plan.rewrite.rules.IndexScanInfo.SimplePredicate; import org.apache.tajo.storage.*; import org.apache.tajo.storage.index.bst.BSTIndex; @@ -42,7 +44,7 @@ import java.net.URI; import java.util.HashSet; import java.util.Set; -public class BSTIndexScanExec extends PhysicalExec { +public class BSTIndexScanExec extends ScanExec { private IndexScanNode plan; private SeekableScanner fileScanner; @@ -85,7 +87,7 @@ public class BSTIndexScanExec extends PhysicalExec { this.projector = new Projector(context, inSchema, outSchema, plan.getTargets()); - Path indexPath = new Path(indexPrefix.toString(), context.getUniqueKeyFromFragments()); + Path indexPath = new Path(indexPrefix.toString(), IndexExecutorUtil.getIndexFileName(fragment)); this.reader = new BSTIndex(context.getConf()). getIndexReader(indexPath, keySchema, comparator); this.reader.open(); @@ -109,6 +111,21 @@ public class BSTIndexScanExec extends PhysicalExec { } @Override + public String getTableName() { + return plan.getTableName(); + } + + @Override + public String getCanonicalName() { + return plan.getCanonicalName(); + } + + @Override + public FragmentProto[] getFragments() { + return new FragmentProto[]{fragment}; + } + + @Override public void init() throws IOException { Schema projected; @@ -151,6 +168,11 @@ public class BSTIndexScanExec extends PhysicalExec { } } + @Override + public ScanNode getScanNode() { + return plan; + } + private void initScanner(Schema projected) throws IOException { // Why we should check nullity? See https://issues.apache.org/jira/browse/TAJO-1422 http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java index c2d93bb..62af4e1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java @@ -29,9 +29,7 @@ public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalEx // Please keep all physical executors except for abstract class. // They should be ordered in an lexicography order of their names for easy code maintenance. - if (exec instanceof BNLJoinExec) { - return visitBNLJoin(context, (BNLJoinExec) exec, stack); - } else if (exec instanceof BSTIndexScanExec) { + if (exec instanceof BSTIndexScanExec) { return visitBSTIndexScan(context, (BSTIndexScanExec) exec, stack); } else if (exec instanceof EvalExprExec) { return visitEvalExpr(context, (EvalExprExec) exec, stack); @@ -63,8 +61,6 @@ public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalEx return visitMergeFullOuterJoin(context, (MergeFullOuterJoinExec) exec, stack); } else if (exec instanceof MergeJoinExec) { return visitMergeJoin(context, (MergeJoinExec) exec, stack); - } else if (exec instanceof NLJoinExec) { - return visitNLJoin(context, (NLJoinExec) exec, stack); } else if (exec instanceof ProjectionExec) { return visitProjection(context, (ProjectionExec) exec, stack); } else if (exec instanceof RangeShuffleFileWriteExec) { @@ -81,6 +77,8 @@ public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalEx return visitSortBasedColPartitionStore(context, (SortBasedColPartitionStoreExec) exec, stack); } else if (exec instanceof StoreTableExec) { return visitStoreTable(context, (StoreTableExec) exec, stack); + } else if (exec instanceof StoreIndexExec) { + return visitStoreIndex(context, (StoreIndexExec) exec, stack); } throw new PhysicalPlanningException("Unsupported Type: " + exec.getClass().getSimpleName()); @@ -104,12 +102,6 @@ public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalEx } @Override - public RESULT visitBNLJoin(CONTEXT context, BNLJoinExec exec, Stack<PhysicalExec> stack) - throws PhysicalPlanningException { - return visitBinaryExecutor(context, exec, stack); - } - - @Override public RESULT visitBSTIndexScan(CONTEXT context, BSTIndexScanExec exec, Stack<PhysicalExec> stack) throws PhysicalPlanningException { return null; @@ -206,12 +198,6 @@ public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalEx } @Override - public RESULT visitNLJoin(CONTEXT context, NLJoinExec exec, Stack<PhysicalExec> stack) throws - PhysicalPlanningException { - return visitBinaryExecutor(context, exec, stack); - } - - @Override public RESULT visitProjection(CONTEXT context, ProjectionExec exec, Stack<PhysicalExec> stack) throws PhysicalPlanningException { return visitUnaryExecutor(context, exec, stack); @@ -253,7 +239,14 @@ public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalEx } @Override - public RESULT visitStoreTable(CONTEXT context, StoreTableExec exec, Stack<PhysicalExec> stack) throws PhysicalPlanningException { + public RESULT visitStoreTable(CONTEXT context, StoreTableExec exec, Stack<PhysicalExec> stack) + throws PhysicalPlanningException { + return visitUnaryExecutor(context, exec, stack); + } + + @Override + public RESULT visitStoreIndex(CONTEXT context, StoreIndexExec exec, Stack<PhysicalExec> stack) + throws PhysicalPlanningException { return visitUnaryExecutor(context, exec, stack); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java index 0d64e65..a248d52 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java @@ -23,6 +23,7 @@ import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.engine.planner.KeyProjector; import org.apache.tajo.engine.utils.CacheHolder; import org.apache.tajo.engine.utils.TableCacheKey; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.Tuple; @@ -40,50 +41,76 @@ import java.util.List; */ public abstract class CommonHashJoinExec<T> extends CommonJoinExec { - protected final List<Column[]> joinKeyPairs; - // temporal tuples and states for nested loop join protected boolean first = true; protected TupleMap<T> tupleSlots; protected Iterator<Tuple> iterator; + protected final boolean isCrossJoin; + protected final List<Column[]> joinKeyPairs; + protected final int rightNumCols; protected final int leftNumCols; protected final Column[] leftKeyList; protected final Column[] rightKeyList; - protected boolean finished; protected final KeyProjector leftKeyExtractor; - public CommonHashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, PhysicalExec inner) { - super(context, plan, outer, inner); + protected boolean finished; - // HashJoin only can manage equi join key pairs. - this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, outer.getSchema(), - inner.getSchema(), false); + protected TableStats tableStatsOfCachedRightChild = null; - leftKeyList = new Column[joinKeyPairs.size()]; - rightKeyList = new Column[joinKeyPairs.size()]; + public CommonHashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, PhysicalExec inner) { + super(context, plan, outer, inner); - for (int i = 0; i < joinKeyPairs.size(); i++) { - leftKeyList[i] = outer.getSchema().getColumn(joinKeyPairs.get(i)[0].getQualifiedName()); - rightKeyList[i] = inner.getSchema().getColumn(joinKeyPairs.get(i)[1].getQualifiedName()); + switch (plan.getJoinType()) { + + case CROSS: + if (hasJoinQual) { + throw new TajoInternalError("Cross join cannot evaluate join conditions."); + } else { + isCrossJoin = true; + joinKeyPairs = null; + rightNumCols = leftNumCols = -1; + leftKeyList = rightKeyList = null; + leftKeyExtractor = null; + } + break; + + case INNER: + // Other join types except INNER join can have empty join condition. + if (!hasJoinQual) { + throw new TajoInternalError("Inner join must have any join conditions."); + } + default: + isCrossJoin = false; + // HashJoin only can manage equi join key pairs. + this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, outer.getSchema(), + inner.getSchema(), false); + + leftKeyList = new Column[joinKeyPairs.size()]; + rightKeyList = new Column[joinKeyPairs.size()]; + + for (int i = 0; i < joinKeyPairs.size(); i++) { + leftKeyList[i] = outer.getSchema().getColumn(joinKeyPairs.get(i)[0].getQualifiedName()); + rightKeyList[i] = inner.getSchema().getColumn(joinKeyPairs.get(i)[1].getQualifiedName()); + } + + leftNumCols = outer.getSchema().size(); + rightNumCols = inner.getSchema().size(); + + leftKeyExtractor = new KeyProjector(leftSchema, leftKeyList); + break; } - - leftNumCols = outer.getSchema().size(); - rightNumCols = inner.getSchema().size(); - - leftKeyExtractor = new KeyProjector(leftSchema, leftKeyList); } protected void loadRightToHashTable() throws IOException { ScanExec scanExec = PhysicalPlanUtil.findExecutor(rightChild, ScanExec.class); if (scanExec.canBroadcast()) { /* If this table can broadcast, all tasks in a node will share the same cache */ - TableCacheKey key = CacheHolder.BroadcastCacheHolder.getCacheKey( - context, scanExec.getCanonicalName(), scanExec.getFragments()); + TableCacheKey key = CacheHolder.BroadcastCacheHolder.getCacheKey(context, scanExec); loadRightFromCache(key); } else { this.tupleSlots = convert(buildRightToHashTable(), false); @@ -105,10 +132,31 @@ public abstract class CommonHashJoinExec<T> extends CommonJoinExec { sharedResource.addBroadcastCache(key, holder); } } + this.tableStatsOfCachedRightChild = holder.getTableStats(); this.tupleSlots = convert(holder.getData(), true); } protected TupleMap<TupleList> buildRightToHashTable() throws IOException { + if (isCrossJoin) { + return buildRightToHashTableForCrossJoin(); + } else { + return buildRightToHashTableForNonCrossJoin(); + } + } + + protected TupleMap<TupleList> buildRightToHashTableForCrossJoin() throws IOException { + Tuple tuple; + TupleMap<TupleList> map = new TupleMap<>(1); + TupleList tuples = new TupleList(); + + while (!context.isStopped() && (tuple = rightChild.next()) != null) { + tuples.add(tuple); + } + map.put(null, tuples); + return map; + } + + protected TupleMap<TupleList> buildRightToHashTableForNonCrossJoin() throws IOException { Tuple tuple; TupleMap<TupleList> map = new TupleMap<TupleList>(100000); KeyProjector keyProjector = new KeyProjector(rightSchema, rightKeyList); @@ -162,7 +210,8 @@ public abstract class CommonHashJoinExec<T> extends CommonJoinExec { inputStats.setNumRows(leftInputStats.getNumRows()); } - TableStats rightInputStats = rightChild.getInputStats(); + TableStats rightInputStats = tableStatsOfCachedRightChild == null ? + rightChild.getInputStats() : tableStatsOfCachedRightChild; if (rightInputStats != null) { inputStats.setNumBytes(inputStats.getNumBytes() + rightInputStats.getNumBytes()); inputStats.setReadBytes(inputStats.getReadBytes() + rightInputStats.getReadBytes()); http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java index c0a8622..c463028 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java @@ -130,7 +130,7 @@ public class HashFullOuterJoinExec extends CommonHashJoinExec<Pair<Boolean, Tupl TupleMap<Pair<Boolean, TupleList>> tuples = new TupleMap<Pair<Boolean, TupleList>>(hashed.size()); for (Map.Entry<KeyTuple, TupleList> entry : hashed.entrySet()) { // flag: initially false (whether this join key had at least one match on the counter part) - tuples.putWihtoutKeyCopy(entry.getKey(), new Pair<Boolean, TupleList>(false, entry.getValue())); + tuples.putWihtoutKeyCopy(entry.getKey(), new Pair<>(false, entry.getValue())); } return tuples; } http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java index bd817bb..cca3548 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java @@ -59,7 +59,12 @@ public class HashJoinExec extends CommonHashJoinExec<TupleList> { frameTuple.setLeft(leftTuple); // getting corresponding right - Iterable<Tuple> hashed = tupleSlots.get(leftKeyExtractor.project(leftTuple)); + Iterable<Tuple> hashed; + if (!isCrossJoin) { + hashed = tupleSlots.get(leftKeyExtractor.project(leftTuple)); + } else { + hashed = tupleSlots.get(null); + } Iterator<Tuple> rightTuples = rightFiltered(hashed); if (rightTuples.hasNext()) { iterator = rightTuples; http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/IndexExecutorUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/IndexExecutorUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/IndexExecutorUtil.java new file mode 100644 index 0000000..3b8317f --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/IndexExecutorUtil.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.FragmentConvertor; + +public class IndexExecutorUtil { + + public static String getIndexFileName(FragmentProto fragmentProto) { + FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, fragmentProto); + StringBuilder sb = new StringBuilder(); + sb.append(fileFragment.getPath().getName()).append(fileFragment.getStartKey()).append(fileFragment.getLength()); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java deleted file mode 100644 index d3214c3..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.engine.planner.physical; - -import org.apache.tajo.plan.logical.JoinNode; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.worker.TaskAttemptContext; - -import java.io.IOException; - -public class NLJoinExec extends CommonJoinExec { - - // temporal tuples and states for nested loop join - private boolean needNewOuter; - private Tuple outerTuple = null; - private Tuple innerTuple = null; - - public NLJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, - PhysicalExec inner) { - super(context, plan, outer, inner); - // for join - needNewOuter = true; - } - - public Tuple next() throws IOException { - while (!context.isStopped()) { - if (needNewOuter) { - outerTuple = leftChild.next(); - if (outerTuple == null) { - return null; - } - needNewOuter = false; - } - - innerTuple = rightChild.next(); - if (innerTuple == null) { - needNewOuter = true; - rightChild.rescan(); - continue; - } - - frameTuple.set(outerTuple, innerTuple); - if (hasJoinQual) { - if (joinQual.eval(frameTuple).isTrue()) { - return projector.eval(frameTuple); - } - } else { - return projector.eval(frameTuple); - } - } - return null; - } - - @Override - public void rescan() throws IOException { - super.rescan(); - needNewOuter = true; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java index c1a451a..2b6191e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java @@ -68,6 +68,11 @@ public class PartitionMergeScanExec extends ScanExec { super.init(); } + @Override + public ScanNode getScanNode() { + return plan; + } + private void initScanExecutors() throws IOException { if (scanners.size() > 0) { iterator = scanners.iterator(); http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java index c4d90a5..554c31e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java @@ -24,9 +24,6 @@ import java.util.Stack; public interface PhysicalExecutorVisitor<CONTEXT, RESULT> { - RESULT visitBNLJoin(CONTEXT context, BNLJoinExec exec, Stack<PhysicalExec> stack) - throws PhysicalPlanningException; - RESULT visitBSTIndexScan(CONTEXT context, BSTIndexScanExec exec, Stack<PhysicalExec> stack) throws PhysicalPlanningException; @@ -76,9 +73,6 @@ public interface PhysicalExecutorVisitor<CONTEXT, RESULT> { RESULT visitMergeJoin(CONTEXT context, MergeJoinExec exec, Stack<PhysicalExec> stack) throws PhysicalPlanningException; - RESULT visitNLJoin(CONTEXT context, NLJoinExec exec, Stack<PhysicalExec> stack) - throws PhysicalPlanningException; - RESULT visitProjection(CONTEXT context, ProjectionExec exec, Stack<PhysicalExec> stack) throws PhysicalPlanningException; @@ -103,4 +97,7 @@ public interface PhysicalExecutorVisitor<CONTEXT, RESULT> { RESULT visitStoreTable(CONTEXT context, StoreTableExec exec, Stack<PhysicalExec> stack) throws PhysicalPlanningException; + + RESULT visitStoreIndex(CONTEXT context, StoreIndexExec exec, Stack<PhysicalExec> stack) + throws PhysicalPlanningException; } http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java index 5cca4c5..45379bb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java @@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.plan.serder.PlanProto.EnforceProperty; import org.apache.tajo.worker.TaskAttemptContext; @@ -49,6 +50,8 @@ public abstract class ScanExec extends PhysicalExec { super.init(); } + public abstract ScanNode getScanNode(); + public boolean canBroadcast() { return canBroadcast; } http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index b49fa40..1ecabf1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -211,6 +211,11 @@ public class SeqScanExec extends ScanExec { } @Override + public ScanNode getScanNode() { + return plan; + } + + @Override protected void compile() throws CompilationError { if (plan.hasQual()) { qual = context.getPrecompiledEval(inSchema, qual); http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java index f9db842..fed1d5c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java @@ -27,6 +27,7 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.plan.logical.CreateIndexNode; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.*; @@ -65,8 +66,15 @@ public class StoreIndexExec extends UnaryPhysicalExec { indexKeys[i] = inSchema.getColumnId(col.getQualifiedName()); } + // TODO: this line should be improved to allow multiple scan executors. + ScanExec scanExec = PhysicalPlanUtil.findExecutor(this, ScanExec.class); + if (scanExec == null) { + throw new TajoInternalError("Cannot find scan executors."); + } + TajoConf conf = context.getConf(); - Path indexPath = new Path(logicalPlan.getIndexPath().toString(), context.getUniqueKeyFromFragments()); + Path indexPath = new Path(logicalPlan.getIndexPath().toString(), + IndexExecutorUtil.getIndexFileName(scanExec.getFragments()[0])); // TODO: Create factory using reflection BSTIndex bst = new BSTIndex(conf); this.comparator = new BaseTupleComparator(keySchema, sortSpecs); http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java index 05936be..6eda7e8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java @@ -18,8 +18,10 @@ package org.apache.tajo.engine.utils; +import org.apache.tajo.QueryId; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.engine.planner.physical.ScanExec; import org.apache.tajo.engine.planner.physical.TupleList; import org.apache.tajo.engine.planner.physical.TupleMap; import org.apache.tajo.storage.fragment.Fragment; @@ -77,19 +79,17 @@ public interface CacheHolder<T> { if(rowBlock != null) rowBlock.release(); } - public static TableCacheKey getCacheKey(TaskAttemptContext ctx, String canonicalName, - CatalogProtos.FragmentProto[] fragments) throws IOException { - String pathNameKey = ""; - if (fragments != null) { - StringBuilder stringBuilder = new StringBuilder(); - for (CatalogProtos.FragmentProto f : fragments) { - Fragment fragement = FragmentConvertor.convert(ctx.getConf(), f); - stringBuilder.append(fragement.getKey()); - } - pathNameKey = stringBuilder.toString(); - } - - return new TableCacheKey(ctx.getTaskId().getTaskId().getExecutionBlockId().toString(), canonicalName, pathNameKey); + public static TableCacheKey getCacheKey(TaskAttemptContext ctx, ScanExec scanExec) throws IOException { + + return new TableCacheKey(ctx.getTaskId().getTaskId().getExecutionBlockId().toString(), + scanExec.getCanonicalName(), getUniqueKey(ctx, scanExec)); + } + + public static String getUniqueKey(TaskAttemptContext context, ScanExec scanExec) { + QueryId queryId = context.getTaskId().getTaskId().getExecutionBlockId().getQueryId(); + int pid = scanExec.getScanNode().getPID(); + + return queryId.toString() + "_" + pid; } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java index aee7972..d651154 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java @@ -49,10 +49,7 @@ import org.apache.tajo.plan.logical.InsertNode; import org.apache.tajo.plan.logical.LogicalRootNode; import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.plan.verifier.LogicalPlanVerifier; -import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier; -import org.apache.tajo.plan.verifier.SyntaxErrorUtil; -import org.apache.tajo.plan.verifier.VerificationState; +import org.apache.tajo.plan.verifier.*; import org.apache.tajo.session.Session; import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.util.CommonTestingUtil; @@ -75,6 +72,7 @@ public class GlobalEngine extends AbstractService { private LogicalPlanner planner; private LogicalOptimizer optimizer; private LogicalPlanVerifier annotatedPlanVerifier; + private PostLogicalPlanVerifier postLogicalPlanVerifier; private QueryExecutor queryExecutor; private DDLExecutor ddlExecutor; @@ -96,6 +94,7 @@ public class GlobalEngine extends AbstractService { // Access path rewriter is enabled only in QueryMasterTask optimizer = new LogicalOptimizer(context.getConf(), context.getCatalog()); annotatedPlanVerifier = new LogicalPlanVerifier(); + postLogicalPlanVerifier = new PostLogicalPlanVerifier(); } catch (Throwable t) { LOG.error(t.getMessage(), t); throw new RuntimeException(t); @@ -267,7 +266,6 @@ public class GlobalEngine extends AbstractService { VerificationState state = new VerificationState(); preVerifier.verify(queryContext, state, expression); if (!state.verified()) { - for (Throwable error : state.getErrors()) { throw error; } @@ -294,6 +292,13 @@ public class GlobalEngine extends AbstractService { } } + postLogicalPlanVerifier.verify(queryContext.getLong(SessionVars.BROADCAST_CROSS_JOIN_THRESHOLD), state, plan); + if (!state.verified()) { + for (Throwable error : state.getErrors()) { + throw error; + } + } + return plan; } http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java index fc7e0e3..e209538 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java @@ -26,15 +26,13 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.QueryCoordinatorProtocol; -import org.apache.tajo.ipc.QueryCoordinatorProtocol.*; +import org.apache.tajo.ipc.QueryCoordinatorProtocol.QueryCoordinatorProtocolService; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.rm.NodeStatus; import org.apache.tajo.master.scheduler.event.ResourceReserveSchedulerEvent; import org.apache.tajo.rpc.AsyncRpcServer; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; -import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto; import org.apache.tajo.util.NetUtils; -import org.apache.tajo.util.ProtoUtil; import java.net.InetSocketAddress; import java.util.Collection; @@ -128,7 +126,7 @@ public class QueryCoordinatorService extends AbstractService { */ @Override public void getAllWorkers(RpcController controller, PrimitiveProtos.NullProto request, - RpcCallback<WorkerConnectionsResponse> done) { + RpcCallback<WorkerConnectionsResponse> done) { WorkerConnectionsResponse.Builder builder = WorkerConnectionsResponse.newBuilder(); Collection<NodeStatus> nodeStatuses = context.getResourceManager().getRMContext().getNodes().values(); http://git-wip-us.apache.org/repos/asf/tajo/blob/1a7c353c/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java index e22663a..b848876 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java @@ -25,6 +25,7 @@ import org.apache.tajo.QueryId; import org.apache.tajo.ResourceProtos.AllocationResourceProto; import org.apache.tajo.ResourceProtos.QueryExecutionRequest; import org.apache.tajo.TajoProtos; +import org.apache.tajo.TajoProtos.QueryState; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService; @@ -259,12 +260,12 @@ public class QueryInProgress { // Update diagnosis message if (queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) { this.queryInfo.setLastMessage(queryInfo.getLastMessage()); - LOG.info(queryId + queryInfo.getLastMessage()); } // if any error occurs, print outs the error message - if (this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) { - LOG.warn(queryId + " failed, " + queryInfo.getLastMessage()); + if (this.queryInfo.getQueryState() == QueryState.QUERY_FAILED || + this.queryInfo.getQueryState() == QueryState.QUERY_ERROR) { + LOG.warn(queryId + " is stopped because " + queryInfo.getLastMessage()); } // terminal state will let client to retrieve a query result
