TAJO-748: Shuffle output numbers of join may be inconsistent. (jaehwa)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/2b27f7de Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/2b27f7de Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/2b27f7de Branch: refs/heads/window_function Commit: 2b27f7de70904552d38801f57aa12396a9df75ac Parents: f1f36ec Author: blrunner <[email protected]> Authored: Mon Apr 21 16:11:13 2014 +0900 Committer: blrunner <[email protected]> Committed: Mon Apr 21 16:11:13 2014 +0900 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../java/org/apache/tajo/conf/TajoConf.java | 2 +- .../engine/planner/global/GlobalPlanner.java | 49 +++++++++++++++++++- .../tajo/master/querymaster/SubQuery.java | 18 +++++++ .../tajo/engine/query/TestJoinBroadcast.java | 14 +++--- .../tajo/master/TestExecutionBlockCursor.java | 2 +- .../querymaster/TestQueryUnitStatusUpdate.java | 8 ++-- .../resources/queries/TestNetTypes/testJoin.sql | 2 +- .../testBroadcastSubquery2.result | 2 +- 9 files changed, 82 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/2b27f7de/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d6ed95a..0fcc83d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -334,6 +334,8 @@ Release 0.8.0 - unreleased BUG FIXES + TAJO-748: Shuffle output numbers of join may be inconsistent. (jaehwa) + TAJO-777: Partition column in function parameter occurs NPE. (Hyoungjun Kim via hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/2b27f7de/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 3c81ed5..5b3d4b3 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -161,7 +161,7 @@ public class TajoConf extends Configuration { ////////////////////////////////////////// // Distributed Query Execution Parameters ////////////////////////////////////////// - DIST_QUERY_BROADCAST_JOIN_AUTO("tajo.dist-query.join.broadcast.auto", true), + DIST_QUERY_BROADCAST_JOIN_AUTO("tajo.dist-query.join.auto-broadcast", true), DIST_QUERY_BROADCAST_JOIN_THRESHOLD("tajo.dist-query.join.broadcast.threshold-bytes", (long)5 * 1048576), DIST_QUERY_JOIN_TASK_VOLUME("tajo.dist-query.join.task-volume-mb", 128), http://git-wip-us.apache.org/repos/asf/tajo/blob/2b27f7de/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java index bf2bf7d..edc08fc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java @@ -210,6 +210,10 @@ public class GlobalPlanner { throw new PlanningException("Invalid State"); } + private static boolean checkIfCanBeOneOfBroadcastJoin(LogicalNode node) { + return node.getType() == NodeType.SCAN || node.getType() == NodeType.PARTITIONS_SCAN; + } + private ExecutionBlock buildJoinPlan(GlobalPlanContext context, JoinNode joinNode, ExecutionBlock leftBlock, ExecutionBlock rightBlock) throws PlanningException { @@ -218,6 +222,7 @@ public class GlobalPlanner { boolean autoBroadcast = conf.getBoolVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_AUTO); + // to check when the tajo.dist-query.join.broadcast.auto property is true if (autoBroadcast && joinNode.isCandidateBroadcast()) { long broadcastThreshold = conf.getLongVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_THRESHOLD); List<LogicalNode> broadtargetTables = new ArrayList<LogicalNode>(); @@ -234,7 +239,6 @@ public class GlobalPlanner { } } - //large table must be one if (numLargeTables <= 1 && !broadtargetTables.isEmpty()) { currentBlock = masterPlan.newExecutionBlock(); currentBlock.setPlan(joinNode); @@ -250,6 +254,49 @@ public class GlobalPlanner { } } + LogicalNode leftNode = joinNode.getLeftChild(); + LogicalNode rightNode = joinNode.getRightChild(); + + boolean leftBroadcasted = false; + boolean rightBroadcasted = false; + + // Although broadcast join property is false, we need to handle boradcast join. + // It must, Shuffle output numbers of join will be consistent. + if (checkIfCanBeOneOfBroadcastJoin(leftNode) && checkIfCanBeOneOfBroadcastJoin(rightNode)) { + ScanNode leftScan = (ScanNode) leftNode; + ScanNode rightScan = (ScanNode) rightNode; + + TableDesc leftDesc = leftScan.getTableDesc(); + TableDesc rightDesc = rightScan.getTableDesc(); + long broadcastThreshold = conf.getLongVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_THRESHOLD); + + if (leftDesc.getStats().getNumBytes() < broadcastThreshold) { + leftBroadcasted = true; + } + if (rightDesc.getStats().getNumBytes() < broadcastThreshold) { + rightBroadcasted = true; + } + + if (leftBroadcasted || rightBroadcasted) { + currentBlock = masterPlan.newExecutionBlock(); + currentBlock.setPlan(joinNode); + if (leftBroadcasted) { + currentBlock.addBroadcastTable(leftScan.getCanonicalName()); + LOG.info("The left table " + rightScan.getCanonicalName() + " (" + + rightScan.getTableDesc().getStats().getNumBytes() + ") is marked a broadcasted table"); + } + if (rightBroadcasted) { + currentBlock.addBroadcastTable(rightScan.getCanonicalName()); + LOG.info("The right table " + rightScan.getCanonicalName() + " (" + + rightScan.getTableDesc().getStats().getNumBytes() + ") is marked a broadcasted table"); + } + + context.execBlockMap.remove(leftScan.getPID()); + context.execBlockMap.remove(rightScan.getPID()); + return currentBlock; + } + } + // symmetric repartition join currentBlock = masterPlan.newExecutionBlock(); http://git-wip-us.apache.org/repos/asf/tajo/blob/2b27f7de/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java index 63b50ac..8929e8d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java @@ -742,6 +742,24 @@ public class SubQuery implements EventHandler<SubQueryEvent> { taskNum = Math.min(taskNum, slots); LOG.info(subQuery.getId() + ", The determined number of join partitions is " + taskNum); + // The shuffle output numbers of join may be inconsistent by execution block order. + // Thus, we need to compare the number with DataChannel output numbers. + // If the number is right, the number and DataChannel output numbers will be consistent. + int outerShuffleOutptNum = 0, innerShuffleOutputNum = 0; + for (DataChannel eachChannel : masterPlan.getOutgoingChannels(outer.getId())) { + outerShuffleOutptNum = Math.max(outerShuffleOutptNum, eachChannel.getShuffleOutputNum()); + } + + for (DataChannel eachChannel : masterPlan.getOutgoingChannels(inner.getId())) { + innerShuffleOutputNum = Math.max(innerShuffleOutputNum, eachChannel.getShuffleOutputNum()); + } + + if (outerShuffleOutptNum != innerShuffleOutputNum + && taskNum != outerShuffleOutptNum + && taskNum != innerShuffleOutputNum) { + taskNum = Math.max(outerShuffleOutptNum, innerShuffleOutputNum); + } + return taskNum; // Is this subquery the first step of group-by? http://git-wip-us.apache.org/repos/asf/tajo/blob/2b27f7de/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java index 89519ef..f5f98a5 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java @@ -365,13 +365,11 @@ public class TestJoinBroadcast extends QueryTestCaseBase { cleanupQuery(res); } - // It doesn't run as expected because of TAJO-747 bug. - // Thus, we need to block this method until resolving this bug. -// @Test -// public final void testBroadcastSubquery2() throws Exception { -// ResultSet res = executeQuery(); -// assertResultSet(res); -// cleanupQuery(res); -// } + @Test + public final void testBroadcastSubquery2() throws Exception { + ResultSet res = executeQuery(); + assertResultSet(res); + cleanupQuery(res); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/2b27f7de/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java index ab31c8d..f4fa74a 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java @@ -118,6 +118,6 @@ public class TestExecutionBlockCursor { count++; } - assertEquals(10, count); + assertEquals(6, count); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/2b27f7de/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java index 07b4ac5..fa89dc3 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java @@ -86,11 +86,11 @@ public class TestQueryUnitStatusUpdate extends QueryTestCaseBase { res = executeQuery(); - long[] expectedNumRows = new long[]{2, 2, 5, 5, 7, 2, 2, 2}; - long[] expectedNumBytes = new long[]{18, 34, 45, 75, 109, 34, 34, 18}; - long[] expectedReadBytes = new long[]{18, 0, 45, 0, 109, 0, 34, 0}; + long[] expectedNumRows = new long[]{7, 2, 2, 2, 7, 2, 2, 2}; + long[] expectedNumBytes = new long[]{63, 34, 34, 18, 109, 34, 34, 18}; + long[] expectedReadBytes = new long[]{63, 0, 34, 0, 109, 0, 34, 0}; - assertStatus(4, expectedNumRows, expectedNumBytes, expectedReadBytes); + assertStatus(2, expectedNumRows, expectedNumBytes, expectedReadBytes); } finally { cleanupQuery(res); } http://git-wip-us.apache.org/repos/asf/tajo/blob/2b27f7de/tajo-core/src/test/resources/queries/TestNetTypes/testJoin.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestNetTypes/testJoin.sql b/tajo-core/src/test/resources/queries/TestNetTypes/testJoin.sql index ec4f8e6..22c97d5 100644 --- a/tajo-core/src/test/resources/queries/TestNetTypes/testJoin.sql +++ b/tajo-core/src/test/resources/queries/TestNetTypes/testJoin.sql @@ -1 +1 @@ -select t1.*,t2.* from table1 as t1, table2 as t2 where t1.addr = t2.addr order by t2.name; \ No newline at end of file +select t1.*,t2.* from table1 as t1, table2 as t2 where t1.addr = t2.addr order by t1.id, t1.name,t2. name; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/2b27f7de/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastSubquery2.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastSubquery2.result b/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastSubquery2.result index 14c2211..9368976 100644 --- a/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastSubquery2.result +++ b/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastSubquery2.result @@ -1,3 +1,3 @@ ?sum ------------------------------- -360.0 \ No newline at end of file +190.0 \ No newline at end of file
