Repository: tajo Updated Branches: refs/heads/master 2a6b38e84 -> a5de83720
TAJO-972: Broadcast join with left outer join returns duplicated rows.(Hyoungjun Kim via jaehwa) Closes #89 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/a5de8372 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/a5de8372 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/a5de8372 Branch: refs/heads/master Commit: a5de837209a8d6d9685ad1aa8132b3b4ecd99727 Parents: 2a6b38e Author: blrunner <[email protected]> Authored: Wed Jul 23 11:26:42 2014 +0900 Committer: blrunner <[email protected]> Committed: Wed Jul 23 11:26:42 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../engine/planner/global/GlobalPlanner.java | 4 +- .../planner/global/TestBroadcastJoinPlan.java | 94 ++++++++++++-------- .../tajo/engine/query/TestJoinBroadcast.java | 47 +++++++++- 4 files changed, 104 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/a5de8372/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 8a1aae6..2be9b26 100644 --- a/CHANGES +++ b/CHANGES @@ -97,6 +97,9 @@ Release 0.9.0 - unreleased BUG FIXES + TAJO-972: Broadcast join with left outer join returns duplicated rows. + (Hyoungjun Kim via jaehwa) + TAJO-666: java.nio.BufferOverflowException occurs when the query includes an order by clause on a TEXT column. (Mai Hai Thanh via jihoon) http://git-wip-us.apache.org/repos/asf/tajo/blob/a5de8372/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java index 69ecd02..2daf799 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java @@ -293,7 +293,7 @@ public class GlobalPlanner { // Checking Left Side of Join if (ScanNode.isScanNode(leftNode)) { ScanNode scanNode = (ScanNode)leftNode; - if (getTableVolume(scanNode) >= broadcastThreshold) { + if (joinNode.getJoinType() == JoinType.LEFT_OUTER || getTableVolume(scanNode) >= broadcastThreshold) { numLargeTables++; } else { leftBroadcast = true; @@ -306,7 +306,7 @@ public class GlobalPlanner { // Checking Right Side OF Join if (ScanNode.isScanNode(rightNode)) { ScanNode scanNode = (ScanNode)rightNode; - if (getTableVolume(scanNode) >= broadcastThreshold) { + if (joinNode.getJoinType() == JoinType.RIGHT_OUTER || getTableVolume(scanNode) >= broadcastThreshold) { numLargeTables++; } else { rightBroadcast = true; http://git-wip-us.apache.org/repos/asf/tajo/blob/a5de8372/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java index fd07ae4..ec39609 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java @@ -495,11 +495,13 @@ public class TestBroadcastJoinPlan { // ((((default.small1 â default.small2) â default.small3) â default.large1) â default.large2) /* - |-eb_1402495213549_0000_000007 - |-eb_1402495213549_0000_000006 (GROUP BY) - |-eb_1402495213549_0000_000005 (JOIN) - |-eb_1402495213549_0000_000004 (LEAF, large2) - |-eb_1402495213549_0000_000003 (LEAF, broadcast JOIN small1, small2, small3, large1) + |-eb_1406022243130_0000_000009 + |-eb_1406022243130_0000_000008 + |-eb_1406022243130_0000_000007 (join) + |-eb_1406022243130_0000_000006 (scan large2) + |-eb_1406022243130_0000_000005 (join) + |-eb_1406022243130_0000_000004 (scan large1) + |-eb_1406022243130_0000_000003 (scan small1, broadcast join small2, small3) */ ExecutionBlockCursor ebCursor = new ExecutionBlockCursor(masterPlan); @@ -508,9 +510,9 @@ public class TestBroadcastJoinPlan { ExecutionBlock eb = ebCursor.nextBlock(); if(index == 0) { Collection<String> broadcastTables = eb.getBroadcastTables(); - assertEquals(3, broadcastTables.size()); + assertEquals(2, broadcastTables.size()); - assertTrue(broadcastTables.contains("default.small1")); + assertTrue(!broadcastTables.contains("default.small1")); assertTrue(broadcastTables.contains("default.small2")); assertTrue(broadcastTables.contains("default.small3")); } else if(index == 1 || index == 2 || index == 3) { @@ -520,7 +522,7 @@ public class TestBroadcastJoinPlan { index++; } - assertEquals(5, index); + assertEquals(7, index); } @Test @@ -712,9 +714,9 @@ public class TestBroadcastJoinPlan { globalPlanner.build(masterPlan); /* - |-eb_1402500846700_0000_000007 - |-eb_1402500846700_0000_000006 - |-eb_1402500846700_0000_000005 (LEAF, broadcast join small1, small2, small3) + |-eb_1406022971444_0000_000005 + |-eb_1406022971444_0000_000004 (group by) + |-eb_1406022971444_0000_000003 (scan small1, broadcast join small2, small3) */ ExecutionBlockCursor ebCursor = new ExecutionBlockCursor(masterPlan); @@ -735,7 +737,10 @@ public class TestBroadcastJoinPlan { assertEquals("default.small2", scanNode.getCanonicalName()); Collection<String> broadcastTables = eb.getBroadcastTables(); - assertEquals(3, broadcastTables.size()); + assertEquals(2, broadcastTables.size()); + + assertTrue(broadcastTables.contains("default.small2")); + assertTrue(broadcastTables.contains("default.small3")); } else if(index == 1) { Collection<String> broadcastTables = eb.getBroadcastTables(); assertEquals(0, broadcastTables.size()); @@ -769,9 +774,11 @@ public class TestBroadcastJoinPlan { //(((default.small1 â default.small2) â default.large1) â default.small3) /* - |-eb_1402642709028_0000_000005 - |-eb_1402642709028_0000_000004 (GROUP BY) - |-eb_1402642709028_0000_000003 (LEAF, broadcast JOIN small1, small2, small3, large1) + |-eb_1406023347983_0000_000007 + |-eb_1406023347983_0000_000006 + |-eb_1406023347983_0000_000005 (join, broadcast small3) + |-eb_1406023347983_0000_000004 (scan large1) + |-eb_1406023347983_0000_000003 (scan small1, broadcast join small2) */ ExecutionBlockCursor ebCursor = new ExecutionBlockCursor(masterPlan); @@ -780,19 +787,20 @@ public class TestBroadcastJoinPlan { ExecutionBlock eb = ebCursor.nextBlock(); if(index == 0) { Collection<String> broadcastTables = eb.getBroadcastTables(); - assertEquals(3, broadcastTables.size()); - - assertTrue(broadcastTables.contains("default.small1")); + assertEquals(1, broadcastTables.size()); assertTrue(broadcastTables.contains("default.small2")); + } else if (index == 2) { + Collection<String> broadcastTables = eb.getBroadcastTables(); + assertEquals(1, broadcastTables.size()); assertTrue(broadcastTables.contains("default.small3")); - } else if(index == 1 || index == 2 || index == 3) { + } else if(index == 1 || index == 3) { Collection<String> broadcastTables = eb.getBroadcastTables(); assertEquals(0, broadcastTables.size()); } index++; } - assertEquals(3, index); + assertEquals(5, index); } @Test @@ -820,11 +828,13 @@ public class TestBroadcastJoinPlan { // ((((default.small1 â default.small2) â default.large1) â default.large2) â default.small3) /* - |-eb_1404125948432_0000_000007 - |-eb_1404125948432_0000_000006 - |-eb_1404125948432_0000_000005 (JOIN broadcast small3) - |-eb_1404125948432_0000_000004 (LEAF, scan large2) - |-eb_1404125948432_0000_000003 (LEAF, scan large1, broadcast small1, small2) + |-eb_1406023537578_0000_000009 + |-eb_1406023537578_0000_000008 + |-eb_1406023537578_0000_000007 (join, broadcast small3) + |-eb_1406023537578_0000_000006 (scan large2) + |-eb_1406023537578_0000_000005 (join) + |-eb_1406023537578_0000_000004 (scan large1) + |-eb_1406023537578_0000_000003 (scan small1, broadcast join small2) */ ExecutionBlockCursor ebCursor = new ExecutionBlockCursor(masterPlan); int index = 0; @@ -835,26 +845,34 @@ public class TestBroadcastJoinPlan { assertEquals(NodeType.JOIN, node.getType()); JoinNode joinNode = (JoinNode)node; - JoinNode joinNode2 = joinNode.getLeftChild(); + ScanNode scanNode1 = joinNode.getLeftChild(); ScanNode scanNode2 = joinNode.getRightChild(); - assertEquals("default.large1", scanNode2.getCanonicalName()); - - ScanNode scanNode3 = joinNode2.getLeftChild(); - ScanNode scanNode4 = joinNode2.getRightChild(); - assertEquals("default.small1", scanNode3.getCanonicalName()); - assertEquals("default.small2", scanNode4.getCanonicalName()); + assertEquals("default.small1", scanNode1.getCanonicalName()); + assertEquals("default.small2", scanNode2.getCanonicalName()); Collection<String> broadcastTables = eb.getBroadcastTables(); - assertEquals(2, broadcastTables.size()); + assertEquals(1, broadcastTables.size()); + assertTrue(broadcastTables.contains("default.small2")); } else if (index == 1) { LogicalNode node = eb.getPlan(); assertEquals(NodeType.SCAN, node.getType()); - ScanNode scanNode = (ScanNode)node; + ScanNode scanNode = (ScanNode) node; + assertEquals("default.large1", scanNode.getCanonicalName()); + + Collection<String> broadcastTables = eb.getBroadcastTables(); + assertEquals(0, broadcastTables.size()); + } else if (index == 2) { + LogicalNode node = eb.getPlan(); + assertEquals(NodeType.JOIN, node.getType()); + } else if (index == 3) { + LogicalNode node = eb.getPlan(); + assertEquals(NodeType.SCAN, node.getType()); + ScanNode scanNode = (ScanNode) node; assertEquals("default.large2", scanNode.getCanonicalName()); Collection<String> broadcastTables = eb.getBroadcastTables(); assertEquals(0, broadcastTables.size()); - } else if(index == 2) { + } else if(index == 4) { LogicalNode node = eb.getPlan(); assertEquals(NodeType.GROUP_BY, node.getType()); @@ -866,8 +884,8 @@ public class TestBroadcastJoinPlan { ScanNode scanNode2 = joinNode1.getLeftChild(); ScanNode scanNode3 = joinNode1.getRightChild(); - assertTrue(scanNode2.getCanonicalName().indexOf("0000_000003") > 0); - assertTrue(scanNode3.getCanonicalName().indexOf("0000_000004") > 0); + assertTrue(scanNode2.getCanonicalName().indexOf("0000_000005") > 0); + assertTrue(scanNode3.getCanonicalName().indexOf("0000_000006") > 0); Collection<String> broadcastTables = eb.getBroadcastTables(); assertEquals(1, broadcastTables.size()); @@ -875,7 +893,7 @@ public class TestBroadcastJoinPlan { index++; } - assertEquals(5, index); + assertEquals(7, index); } @Test http://git-wip-us.apache.org/repos/asf/tajo/blob/a5de8372/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java index e01b3c5..9cc65bc 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.tajo.*; import org.apache.tajo.catalog.*; +import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.Int4Datum; @@ -32,11 +33,9 @@ import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.planner.logical.NodeType; import org.apache.tajo.jdbc.TajoResultSet; import org.apache.tajo.master.querymaster.QueryMasterTask; -import org.apache.tajo.storage.Appender; -import org.apache.tajo.storage.StorageManagerFactory; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; +import org.apache.tajo.storage.*; import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.worker.TajoWorker; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -576,4 +575,44 @@ public class TestJoinBroadcast extends QueryTestCaseBase { appender.flush(); appender.close(); } + + @Test + public final void testLeftOuterJoinLeftSideSmallTable() throws Exception { + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.put(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.put(StorageConstants.CSVFILE_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("name", Type.TEXT); + String[] data = new String[]{ "1000000|a", "1000001|b", "2|c", "3|d", "4|e" }; + TajoTestingCluster.createTable("table1", schema, tableOptions, data, 1); + + data = new String[10000]; + for (int i = 0; i < data.length; i++) { + data[i] = i + "|" + "this is testLeftOuterJoinLeftSideSmallTabletestLeftOuterJoinLeftSideSmallTable" + i; + } + TajoTestingCluster.createTable("table_large", schema, tableOptions, data, 2); + + try { + ResultSet res = executeString( + "select a.id, b.name from table1 a left outer join table_large b on a.id = b.id order by a.id" + ); + + String expected = "id,name\n" + + "-------------------------------\n" + + "2,this is testLeftOuterJoinLeftSideSmallTabletestLeftOuterJoinLeftSideSmallTable2\n" + + "3,this is testLeftOuterJoinLeftSideSmallTabletestLeftOuterJoinLeftSideSmallTable3\n" + + "4,this is testLeftOuterJoinLeftSideSmallTabletestLeftOuterJoinLeftSideSmallTable4\n" + + "1000000,null\n" + + "1000001,null\n"; + + assertEquals(expected, resultSetToString(res)); + + cleanupQuery(res); + } finally { + executeString("DROP TABLE table1 PURGE").close(); + executeString("DROP TABLE table_large PURGE").close(); + } + } }
