TAJO-894: Left outer join with partitioned large table and small table returns empty result. (Hyoungjun Kim via hyunsik)
Closes #50 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/88d04346 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/88d04346 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/88d04346 Branch: refs/heads/window_function Commit: 88d043467a7da5971cc3e0f535eb158cfa1d1ac5 Parents: 38f1a57 Author: Hyunsik Choi <[email protected]> Authored: Mon Jun 30 19:41:55 2014 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Mon Jun 30 19:41:55 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 ++ .../engine/planner/global/GlobalPlanner.java | 39 ++++++++++++++++--- .../master/rm/TajoWorkerResourceManager.java | 3 +- .../tajo/engine/query/TestJoinBroadcast.java | 40 ++++++++++++++++++++ 4 files changed, 79 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/88d04346/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 61b5529..06f6ec1 100644 --- a/CHANGES +++ b/CHANGES @@ -74,6 +74,9 @@ Release 0.9.0 - unreleased BUG FIXES + TAJO-894: Left outer join with partitioned large table and small table + returns empty result. (Hyoungjun Kim via hyunsik) + TAJO-867: OUTER JOIN with empty result subquery produces a wrong result. (Hyoungjun Kim via hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/88d04346/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java index f12b7e2..edd5674 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java @@ -199,7 +199,6 @@ public class GlobalPlanner { if (node instanceof RelationNode) { switch (node.getType()) { case SCAN: - case PARTITIONS_SCAN: ScanNode scanNode = (ScanNode) node; if (scanNode.getTableDesc().getStats() == null) { // TODO - this case means that data is not located in HDFS. So, we need additional @@ -208,6 +207,20 @@ public class GlobalPlanner { } else { return scanNode.getTableDesc().getStats().getNumBytes(); } + case PARTITIONS_SCAN: + PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) node; + if (pScanNode.getTableDesc().getStats() == null) { + // TODO - this case means that data is not located in HDFS. So, we need additional + // broadcast method. + return Long.MAX_VALUE; + } else { + // if there is no selected partition + if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) { + return 0; + } else { + return pScanNode.getTableDesc().getStats().getNumBytes(); + } + } case TABLE_SUBQUERY: return computeDescendentVolume(((TableSubQueryNode) node).getSubQuery()); default: @@ -227,6 +240,23 @@ public class GlobalPlanner { return node.getType() == NodeType.SCAN || node.getType() == NodeType.PARTITIONS_SCAN; } + /** + * Get a volume of a table of a partitioned table + * @param scanNode ScanNode corresponding to a table + * @return table volume (bytes) + */ + private static long getTableVolume(ScanNode scanNode) { + long scanBytes = scanNode.getTableDesc().getStats().getNumBytes(); + if (scanNode.getType() == NodeType.PARTITIONS_SCAN) { + PartitionedTableScanNode pScanNode = (PartitionedTableScanNode)scanNode; + if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) { + scanBytes = 0L; + } + } + + return scanBytes; + } + private ExecutionBlock buildJoinPlan(GlobalPlanContext context, JoinNode joinNode, ExecutionBlock leftBlock, ExecutionBlock rightBlock) throws PlanningException { @@ -242,8 +272,7 @@ public class GlobalPlanner { int numLargeTables = 0; for(LogicalNode eachNode: joinNode.getBroadcastTargets()) { ScanNode scanNode = (ScanNode)eachNode; - TableDesc tableDesc = scanNode.getTableDesc(); - if (tableDesc.getStats().getNumBytes() < broadcastThreshold) { + if (getTableVolume(scanNode) < broadcastThreshold) { broadtargetTables.add(scanNode); LOG.info("The table " + scanNode.getCanonicalName() + " (" + scanNode.getTableDesc().getStats().getNumBytes() + ") is marked a broadcasted table"); @@ -283,10 +312,10 @@ public class GlobalPlanner { TableDesc rightDesc = rightScan.getTableDesc(); long broadcastThreshold = conf.getLongVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_THRESHOLD); - if (leftDesc.getStats().getNumBytes() < broadcastThreshold) { + if (getTableVolume(leftScan) < broadcastThreshold) { leftBroadcasted = true; } - if (rightDesc.getStats().getNumBytes() < broadcastThreshold) { + if (getTableVolume(rightScan) < broadcastThreshold) { rightBroadcasted = true; } http://git-wip-us.apache.org/repos/asf/tajo/blob/88d04346/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java index 18f2d24..3915225 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java @@ -348,9 +348,10 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke Thread.sleep(100); } } - } catch(InterruptedException ie) { LOG.error(ie); + } catch (Throwable t) { + LOG.error(t); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/88d04346/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java index 2e3b899..2a2b8c3 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java @@ -446,6 +446,46 @@ public class TestJoinBroadcast extends QueryTestCaseBase { cleanupQuery(res); } + @Test + public final void testCasebyCase1() throws Exception { + // Left outer join with a small table and a large partition table which not matched any partition path. + String tableName = CatalogUtil.normalizeIdentifier("largePartitionedTable"); + testBase.execute( + "create table " + tableName + " (l_partkey int4, l_suppkey int4, l_linenumber int4, \n" + + "l_quantity float8, l_extendedprice float8, l_discount float8, l_tax float8, \n" + + "l_returnflag text, l_linestatus text, l_shipdate text, l_commitdate text, \n" + + "l_receiptdate text, l_shipinstruct text, l_shipmode text, l_comment text) \n" + + "partition by column(l_orderkey int4) ").close(); + TajoTestingCluster cluster = testBase.getTestingCluster(); + CatalogService catalog = cluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + + executeString("insert overwrite into " + tableName + + " select l_partkey, l_suppkey, l_linenumber, \n" + + " l_quantity, l_extendedprice, l_discount, l_tax, \n" + + " l_returnflag, l_linestatus, l_shipdate, l_commitdate, \n" + + " l_receiptdate, l_shipinstruct, l_shipmode, l_comment, l_orderkey from lineitem_large"); + + ResultSet res = executeString( + "select a.l_orderkey as key1, b.l_orderkey as key2 from lineitem as a " + + "left outer join " + tableName + " b " + + "on a.l_partkey = b.l_partkey and b.l_orderkey = 1000" + ); + + String expected = "key1,key2\n" + + "-------------------------------\n" + + "1,null\n" + + "1,null\n" + + "2,null\n" + + "3,null\n" + + "3,null\n"; + + try { + assertEquals(expected, resultSetToString(res)); + } finally { + cleanupQuery(res); + } + } static interface TupleCreator { public Tuple createTuple(String[] columnDatas);
