TAJO-839: If all tables participate in the BROADCAST JOIN, there is some missing data. (Hyoungjun Kim via jihoon)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/43875ba3 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/43875ba3 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/43875ba3 Branch: refs/heads/window_function Commit: 43875ba36b577a9819bd1794f8ea864e8c95077d Parents: f3092c4 Author: Jihoon Son <[email protected]> Authored: Thu Jun 12 13:51:55 2014 +0900 Committer: Jihoon Son <[email protected]> Committed: Thu Jun 12 13:51:55 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../tajo/catalog/statistics/TableStats.java | 4 + .../apache/tajo/engine/planner/LogicalPlan.java | 10 +- .../engine/planner/PhysicalPlannerImpl.java | 12 +-- .../tajo/engine/planner/enforce/Enforcer.java | 18 ++++ .../engine/planner/global/ExecutionBlock.java | 5 + .../engine/planner/physical/HashJoinExec.java | 26 +++++ .../physical/PartitionMergeScanExec.java | 10 ++ .../engine/planner/physical/SeqScanExec.java | 28 +++-- .../apache/tajo/engine/utils/TupleCacheKey.java | 14 +-- .../tajo/engine/utils/TupleCacheScanner.java | 7 +- .../tajo/master/DefaultTaskScheduler.java | 3 +- .../tajo/master/querymaster/Repartitioner.java | 77 ++++++++++---- .../main/java/org/apache/tajo/worker/Task.java | 21 ++-- .../tajo/engine/query/TestJoinBroadcast.java | 104 ++++++++++++++++++- .../apache/tajo/engine/util/TestTupleCache.java | 2 +- .../querymaster/TestQueryUnitStatusUpdate.java | 11 +- .../customer_partition_ddl.sql | 9 ++ .../insert_into_customer_partition.sql | 11 ++ .../TestJoinBroadcast/nation_multifile_ddl.sql | 5 + .../TestJoinBroadcast/orders_multifile_ddl.sql | 5 + .../testBroadcastPartitionTable.sql | 16 +++ .../queries/TestQueryUnitStatusUpdate/case3.sql | 3 +- .../testBroadcastPartitionTable.result | 5 + .../TestQueryUnitStatusUpdate/case3.result | 4 + 25 files changed, 350 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 96003e4..7fe6e62 100644 --- a/CHANGES +++ b/CHANGES @@ -68,6 +68,9 @@ Release 0.9.0 - unreleased BUG FIXES + TAJO-839: If all tables participate in the BROADCAST JOIN, there is some + missing data. (Hyoungjun Kim via jihoon) + TAJO-868: TestDateTimeFunctions unit test is occasionally failed. (hyunsik) TAJO-863: Column order mismatched in the JOIN query with asterisk selection. http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStats.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStats.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStats.java index dda8cd3..c04545c 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStats.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStats.java @@ -47,6 +47,10 @@ public class TableStats implements ProtoObject<TableStatsProto>, Cloneable, Gson @Expose private List<ColumnStats> columnStatses = null; // repeated public TableStats() { + reset(); + } + + public void reset() { numRows = 0l; numBytes = 0l; numBlocks = 0; http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java index 443ee4b..0508bac 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java @@ -489,14 +489,20 @@ public class LogicalPlan { sb.append(queryBlockGraph.toStringGraph(getRootBlock().getName())); sb.append("-----------------------------\n"); sb.append("Optimization Log:\n"); + if (!planingHistory.isEmpty()) { + sb.append("[LogicalPlan]\n"); + for (String eachHistory: planingHistory) { + sb.append("\t> ").append(eachHistory).append("\n"); + } + } DirectedGraphCursor<String, BlockEdge> cursor = new DirectedGraphCursor<String, BlockEdge>(queryBlockGraph, getRootBlock().getName()); while(cursor.hasNext()) { QueryBlock block = getBlock(cursor.nextBlock()); if (block.getPlanHistory().size() > 0) { - sb.append("\n[").append(block.getName()).append("]\n"); + sb.append("[").append(block.getName()).append("]\n"); for (String log : block.getPlanHistory()) { - sb.append("> ").append(log).append("\n"); + sb.append("\t> ").append(log).append("\n"); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java index e508d2c..f41d61d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java @@ -844,16 +844,13 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> node) throws IOException { - if (ctx.getTable(scanNode.getCanonicalName()) == null) { - return new SeqScanExec(ctx, sm, scanNode, null); - } - Preconditions.checkNotNull(ctx.getTable(scanNode.getCanonicalName()), - "Error: There is no table matched to %s", scanNode.getCanonicalName() + "(" + scanNode.getTableName() + ")"); - // check if an input is sorted in the same order to the subsequence sort operator. // TODO - it works only if input files are raw files. We should check the file format. // Since the default intermediate file format is raw file, it is not problem right now. if (checkIfSortEquivalance(ctx, scanNode, node)) { + if (ctx.getTable(scanNode.getCanonicalName()) == null) { + return new SeqScanExec(ctx, sm, scanNode, null); + } FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName()); return new ExternalSortExec(ctx, sm, (SortNode) node.peek(), fragments); } else { @@ -886,6 +883,9 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { } } + if (ctx.getTable(scanNode.getCanonicalName()) == null) { + return new SeqScanExec(ctx, sm, scanNode, null); + } FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName()); return new SeqScanExec(ctx, sm, scanNode, fragments); } http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java index 742736c..36820cc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java @@ -169,6 +169,24 @@ public class Enforcer implements ProtoObject<EnforcerProto> { TUtil.putToNestedList(properties, builder.getType(), builder.build()); } + public void removeBroadcast(String tableName) { + List<EnforceProperty> enforces = properties.get(EnforceType.BROADCAST); + if (enforces == null) { + return; + } + + EnforceProperty found = null; + for (EnforceProperty eachProperty: enforces) { + BroadcastEnforce enforce = eachProperty.getBroadcast(); + if (enforce != null && tableName.equals(enforce.getTableName())) { + found = eachProperty; + } + } + if (found != null) { + enforces.remove(found); + } + } + public void enforceColumnPartitionAlgorithm(int pid, ColumnPartitionAlgorithm algorithm) { EnforceProperty.Builder builder = newProperty(); ColumnPartitionEnforcer.Builder enforce = ColumnPartitionEnforcer.newBuilder(); http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java index 7df6b43..b731cec 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java @@ -113,6 +113,11 @@ public class ExecutionBlock { enforcer.addBroadcast(tableName); } + public void removeBroadcastTable(String tableName) { + broadcasted.remove(tableName); + enforcer.removeBroadcast(tableName); + } + public boolean isBroadcastTable(String tableName) { return broadcasted.contains(tableName); } http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java index dea0340..a5e9df0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java @@ -19,6 +19,7 @@ package org.apache.tajo.engine.planner.physical; import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.engine.eval.EvalNode; import org.apache.tajo.engine.planner.PlannerUtil; import org.apache.tajo.engine.planner.Projector; @@ -199,4 +200,29 @@ public class HashJoinExec extends BinaryPhysicalExec { return this.plan; } + @Override + public TableStats getInputStats() { + if (leftChild == null) { + return inputStats; + } + TableStats leftInputStats = leftChild.getInputStats(); + inputStats.setNumBytes(0); + inputStats.setReadBytes(0); + inputStats.setNumRows(0); + + if (leftInputStats != null) { + inputStats.setNumBytes(leftInputStats.getNumBytes()); + inputStats.setReadBytes(leftInputStats.getReadBytes()); + inputStats.setNumRows(leftInputStats.getNumRows()); + } + + TableStats rightInputStats = rightChild.getInputStats(); + if (rightInputStats != null) { + inputStats.setNumBytes(inputStats.getNumBytes() + rightInputStats.getNumBytes()); + inputStats.setReadBytes(inputStats.getReadBytes() + rightInputStats.getReadBytes()); + inputStats.setNumRows(inputStats.getNumRows() + rightInputStats.getNumRows()); + } + + return inputStats; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java index 7f86ba2..9fa5b76 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java @@ -103,6 +103,7 @@ public class PartitionMergeScanExec extends PhysicalExec { @Override public void close() throws IOException { + inputStats.reset(); for (SeqScanExec scanner : scanners) { scanner.close(); TableStats scannerTableStsts = scanner.getInputStats(); @@ -138,6 +139,15 @@ public class PartitionMergeScanExec extends PhysicalExec { @Override public TableStats getInputStats() { + if (iterator != null) { + inputStats.reset(); + for (SeqScanExec scanner : scanners) { + TableStats scannerTableStats = scanner.getInputStats(); + if (scannerTableStats != null) { + inputStats.merge(scannerTableStats); + } + } + } return inputStats; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index 6dbcc3f..0a2b279 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -23,6 +23,7 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.datum.Datum; import org.apache.tajo.engine.eval.ConstEval; @@ -32,15 +33,20 @@ import org.apache.tajo.engine.eval.FieldEval; import org.apache.tajo.engine.planner.Projector; import org.apache.tajo.engine.planner.Target; import org.apache.tajo.engine.planner.logical.ScanNode; -import org.apache.tajo.engine.utils.*; +import org.apache.tajo.engine.utils.SchemaUtil; +import org.apache.tajo.engine.utils.TupleCache; +import org.apache.tajo.engine.utils.TupleCacheKey; +import org.apache.tajo.engine.utils.TupleUtil; import org.apache.tajo.storage.*; -import org.apache.tajo.storage.Scanner; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.FragmentConvertor; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; public class SeqScanExec extends PhysicalExec { @@ -69,8 +75,17 @@ public class SeqScanExec extends PhysicalExec { this.fragments = fragments; if (plan.isBroadcastTable()) { + String pathNameKey = ""; + if (fragments != null) { + for (FragmentProto f : fragments) { + FileFragment fileFragement = (FileFragment) FragmentConvertor.convert( + context.getConf(), plan.getTableDesc().getMeta().getStoreType(), f); + pathNameKey += fileFragement.getPath().getParent().getName(); + } + } + cacheKey = new TupleCacheKey( - context.getTaskId().getQueryUnitId().getExecutionBlockId().toString(), plan.getTableName()); + context.getTaskId().getQueryUnitId().getExecutionBlockId().toString(), plan.getTableName(), pathNameKey); } } @@ -182,7 +197,6 @@ public class SeqScanExec extends PhysicalExec { private void initScanner(Schema projected) throws IOException { this.projector = new Projector(inSchema, outSchema, plan.getTargets()); - if (fragments != null) { if (fragments.length > 1) { this.scanner = new MergeScanner(context.getConf(), plan.getPhysicalSchema(), plan.getTableDesc().getMeta(), @@ -308,9 +322,9 @@ public class SeqScanExec extends PhysicalExec { @Override public String toString() { if (scanner != null) { - return "SeqScanExec:" + plan.getTableName() + "," + scanner.getClass().getName(); + return "SeqScanExec:" + plan + "," + scanner.getClass().getName(); } else { - return "SeqScanExec:" + plan.getTableName(); + return "SeqScanExec:" + plan; } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheKey.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheKey.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheKey.java index ad9204f..6f39d32 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheKey.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheKey.java @@ -21,18 +21,12 @@ package org.apache.tajo.engine.utils; public class TupleCacheKey { String ebId; String tableName; + String pathName; - public TupleCacheKey(String ebId, String tableName) { + public TupleCacheKey(String ebId, String tableName, String pathName) { this.ebId = ebId; this.tableName = tableName; - } - - public String getEbId() { - return ebId; - } - - public void setEbId(String ebId) { - this.ebId = ebId; + this.pathName = pathName; } public String getTableName() { @@ -55,6 +49,6 @@ public class TupleCacheKey { @Override public String toString() { - return ebId + "," + tableName; + return ebId + "," + tableName + "," + pathName; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheScanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheScanner.java index 3b91f94..743d70c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheScanner.java @@ -51,7 +51,12 @@ public class TupleCacheScanner implements Scanner { public Tuple next() throws IOException { if (it.hasNext()) { count++; - return it.next(); + Tuple tuple = it.next(); + try { + return (Tuple)tuple.clone(); + } catch (CloneNotSupportedException e) { + throw new IOException(e.getMessage(), e); + } } else { return null; } http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java index 5bfac8b..94d0381 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java @@ -190,7 +190,6 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { scheduledObjectNum++; if (castEvent.hasRightFragments()) { task.addFragments(castEvent.getRightFragments()); - //scheduledObjectNum += castEvent.getRightFragments().size(); } subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE)); } else { @@ -821,7 +820,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { host, container.getTaskPort())); assignedRequest.add(attemptId); - scheduledObjectNum -= task.getAllFragments().size(); + scheduledObjectNum--; taskRequest.getCallback().run(taskAssign.getProto()); } else { throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!"); http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java index 292ae13..6c000a1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java @@ -131,17 +131,29 @@ public class Repartitioner { // Assigning either fragments or fetch urls to query units boolean isAllBroadcastTable = true; int baseScanIdx = -1; + long maxStats = Long.MIN_VALUE; + int maxStatsScanIdx = -1; for (int i = 0; i < scans.length; i++) { if (!execBlock.isBroadcastTable(scans[i].getCanonicalName())) { isAllBroadcastTable = false; baseScanIdx = i; } + // finding largest table. + if (stats[i] > maxStats) { + maxStats = stats[i]; + maxStatsScanIdx = i; + } } + if (isAllBroadcastTable) { - LOG.info("[Distributed Join Strategy] : Immediate " + fragments.length + " Way Join on Single Machine"); - SubQuery.scheduleFragment(subQuery, fragments[0], Arrays.asList(Arrays.copyOfRange(fragments, 1, fragments.length))); - schedulerContext.setEstimatedTaskNum(1); + // set largest table to normal mode + baseScanIdx = maxStatsScanIdx; + scans[baseScanIdx].setBroadcastTable(false); + execBlock.removeBroadcastTable(scans[baseScanIdx].getCanonicalName()); + LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join with all tables, base_table=%s, base_volume=%d", + scans[baseScanIdx].getCanonicalName(), stats[baseScanIdx])); + scheduleLeafTasksWithBroadcastTable(schedulerContext, subQuery, baseScanIdx, fragments); } else if (!execBlock.getBroadcastTables().isEmpty()) { LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join, base_table=%s, base_volume=%d", scans[baseScanIdx].getCanonicalName(), stats[baseScanIdx])); @@ -250,7 +262,6 @@ public class Repartitioner { int baseScanId, FileFragment[] fragments) throws IOException { ExecutionBlock execBlock = subQuery.getBlock(); ScanNode[] scans = execBlock.getScanNodes(); - //Preconditions.checkArgument(scans.length == 2, "Must be Join Query"); for (int i = 0; i < scans.length; i++) { if (i != baseScanId) { @@ -258,25 +269,53 @@ public class Repartitioner { } } - TableMeta meta; - ScanNode scan = scans[baseScanId]; - TableDesc desc = subQuery.getContext().getTableDescMap().get(scan.getCanonicalName()); - meta = desc.getMeta(); + // Large table(baseScan) + // -> add all fragment to baseFragments + // -> each fragment is assigned to a Task by DefaultTaskScheduler.handle() + // Broadcast table + // all fragments or paths assigned every Large table's scan task. + // -> PARTITIONS_SCAN + // . add all partition paths to node's inputPaths variable + // -> SCAN + // . add all fragments to broadcastFragments + Collection<FileFragment> baseFragments = null; + List<FileFragment> broadcastFragments = new ArrayList<FileFragment>(); + for (int i = 0; i < scans.length; i++) { + ScanNode scan = scans[i]; + TableDesc desc = subQuery.getContext().getTableDescMap().get(scan.getCanonicalName()); + TableMeta meta = desc.getMeta(); + + Collection<FileFragment> scanFragments; + Path[] partitionScanPaths = null; + if (scan.getType() == NodeType.PARTITIONS_SCAN) { + PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)scan; + partitionScanPaths = partitionScan.getInputPaths(); + // set null to inputPaths in getFragmentsFromPartitionedTable() + scanFragments = getFragmentsFromPartitionedTable(subQuery.getStorageManager(), scan, desc); + } else { + scanFragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, desc.getSchema(), + desc.getPath()); + } - Collection<FileFragment> baseFragments; - if (scan.getType() == NodeType.PARTITIONS_SCAN) { - baseFragments = getFragmentsFromPartitionedTable(subQuery.getStorageManager(), scan, desc); - } else { - baseFragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, desc.getSchema(), - desc.getPath()); + if (scanFragments != null) { + if (i == baseScanId) { + baseFragments = scanFragments; + } else { + if (scan.getType() == NodeType.PARTITIONS_SCAN) { + PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)scan; + // PhisicalPlanner make PartitionMergeScanExec when table is boradcast table and inputpaths is not empty + partitionScan.setInputPaths(partitionScanPaths); + } else { + broadcastFragments.addAll(scanFragments); + } + } + } } - List<FileFragment> broadcastFragments = new ArrayList<FileFragment>(); - for (int i = 0; i < fragments.length; i++) { - if (i != baseScanId) { - broadcastFragments.add(fragments[i]); - } + if (baseFragments == null) { + throw new IOException("No fragments for " + scans[baseScanId].getTableName()); } + SubQuery.scheduleFragments(subQuery, baseFragments, broadcastFragments); schedulerContext.setEstimatedTaskNum(baseFragments.size()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 84f41a2..c6e2b73 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -39,10 +39,7 @@ import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.json.CoreGsonHelper; import org.apache.tajo.engine.planner.PlannerUtil; -import org.apache.tajo.engine.planner.logical.LogicalNode; -import org.apache.tajo.engine.planner.logical.NodeType; -import org.apache.tajo.engine.planner.logical.ScanNode; -import org.apache.tajo.engine.planner.logical.SortNode; +import org.apache.tajo.engine.planner.logical.*; import org.apache.tajo.engine.planner.physical.PhysicalExec; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.engine.query.QueryUnitRequest; @@ -151,9 +148,19 @@ public class Task { plan = CoreGsonHelper.fromJson(request.getSerializedData(), LogicalNode.class); LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN); - for (LogicalNode node : scanNode) { - ScanNode scan = (ScanNode)node; - descs.put(scan.getCanonicalName(), scan.getTableDesc()); + if (scanNode != null) { + for (LogicalNode node : scanNode) { + ScanNode scan = (ScanNode) node; + descs.put(scan.getCanonicalName(), scan.getTableDesc()); + } + } + + LogicalNode [] partitionScanNode = PlannerUtil.findAllNodes(plan, NodeType.PARTITIONS_SCAN); + if (partitionScanNode != null) { + for (LogicalNode node : partitionScanNode) { + PartitionedTableScanNode scan = (PartitionedTableScanNode) node; + descs.put(scan.getCanonicalName(), scan.getTableDesc()); + } } interQuery = request.getProto().getInterQuery(); http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java index 1581372..ae87c64 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java @@ -18,29 +18,43 @@ package org.apache.tajo.engine.query; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; import org.apache.tajo.IntegrationTest; import org.apache.tajo.QueryId; import org.apache.tajo.QueryTestCaseBase; import org.apache.tajo.TajoConstants; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.Int4Datum; +import org.apache.tajo.datum.TextDatum; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.planner.logical.NodeType; import org.apache.tajo.jdbc.TajoResultSet; import org.apache.tajo.master.querymaster.QueryMasterTask; +import org.apache.tajo.storage.Appender; +import org.apache.tajo.storage.StorageManagerFactory; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.util.FileUtil; import org.apache.tajo.worker.TajoWorker; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.io.File; import java.sql.ResultSet; -import static junit.framework.TestCase.assertEquals; -import static junit.framework.TestCase.assertTrue; -import static junit.framework.TestCase.fail; +import static junit.framework.TestCase.*; import static org.junit.Assert.assertNotNull; @Category(IntegrationTest.class) public class TestJoinBroadcast extends QueryTestCaseBase { + private static final Log LOG = LogFactory.getLog(TestJoinBroadcast.class); public TestJoinBroadcast() throws Exception { super(TajoConstants.DEFAULT_DATABASE_NAME); testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_AUTO.varname, "true"); @@ -375,4 +389,88 @@ public class TestJoinBroadcast extends QueryTestCaseBase { cleanupQuery(res); } + @Test + public final void testBroadcastPartitionTable() throws Exception { + // https://issues.apache.org/jira/browse/TAJO-839 + // If all tables participate in the BROADCAST JOIN, there is some missing data. + executeDDL("customer_partition_ddl.sql", null); + ResultSet res = executeFile("insert_into_customer_partition.sql"); + res.close(); + + createMultiFile("nation", 2, new TupleCreator() { + public Tuple createTuple(String[] columnDatas) { + return new VTuple(new Datum[]{ + new Int4Datum(Integer.parseInt(columnDatas[0])), + new TextDatum(columnDatas[1]), + new Int4Datum(Integer.parseInt(columnDatas[2])), + new TextDatum(columnDatas[3]) + }); + } + }); + + createMultiFile("orders", 1, new TupleCreator() { + public Tuple createTuple(String[] columnDatas) { + return new VTuple(new Datum[]{ + new Int4Datum(Integer.parseInt(columnDatas[0])), + new Int4Datum(Integer.parseInt(columnDatas[1])), + new TextDatum(columnDatas[2]) + }); + } + }); + + res = executeQuery(); + assertResultSet(res); + res.close(); + + executeString("DROP TABLE customer_broad_parts PURGE"); + executeString("DROP TABLE nation_multifile PURGE"); + executeString("DROP TABLE orders_multifile PURGE"); + } + + static interface TupleCreator { + public Tuple createTuple(String[] columnDatas); + } + + private void createMultiFile(String tableName, int numRowsEachFile, TupleCreator tupleCreator) throws Exception { + // make multiple small file + String multiTableName = tableName + "_multifile"; + executeDDL(multiTableName + "_ddl.sql", null); + + TableDesc table = client.getTableDesc(multiTableName); + assertNotNull(table); + + TableMeta tableMeta = table.getMeta(); + Schema schema = table.getLogicalSchema(); + + File file = new File("src/test/tpch/" + tableName + ".tbl"); + + if (!file.exists()) { + file = new File(System.getProperty("user.dir") + "/tajo-core/src/test/tpch/" + tableName + ".tbl"); + } + String[] rows = FileUtil.readTextFile(file).split("\n"); + + assertTrue(rows.length > 0); + + int fileIndex = 0; + + Appender appender = null; + for (int i = 0; i < rows.length; i++) { + if (i % numRowsEachFile == 0) { + if (appender != null) { + appender.flush(); + appender.close(); + } + Path dataPath = new Path(table.getPath(), fileIndex + ".csv"); + fileIndex++; + appender = StorageManagerFactory.getStorageManager(conf).getAppender(tableMeta, schema, + dataPath); + appender.init(); + } + String[] columnDatas = rows[i].split("\\|"); + Tuple tuple = tupleCreator.createTuple(columnDatas); + appender.addTuple(tuple); + } + appender.flush(); + appender.close(); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleCache.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleCache.java b/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleCache.java index 1cbbdf9..3d2f307 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleCache.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleCache.java @@ -53,7 +53,7 @@ public class TestTupleCache { ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId( QueryIdFactory.newQueryId(System.currentTimeMillis(), 0)); - TupleCacheKey cacheKey = new TupleCacheKey(ebId.toString(), "TestTable"); + TupleCacheKey cacheKey = new TupleCacheKey(ebId.toString(), "TestTable", "test"); TupleCache tupleCache = TupleCache.getInstance(); assertFalse(tupleCache.isBroadcastCacheReady(cacheKey)); http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java index fa89dc3..c52b277 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java @@ -86,9 +86,13 @@ public class TestQueryUnitStatusUpdate extends QueryTestCaseBase { res = executeQuery(); - long[] expectedNumRows = new long[]{7, 2, 2, 2, 7, 2, 2, 2}; - long[] expectedNumBytes = new long[]{63, 34, 34, 18, 109, 34, 34, 18}; - long[] expectedReadBytes = new long[]{63, 0, 34, 0, 109, 0, 34, 0}; + String actualResult = resultSetToString(res); + System.out.println(actualResult); + + // first stage's num rows = (left: 1 , right: 2 (filtered)) * 5 (tasks) + long[] expectedNumRows = new long[]{15, 2, 2, 2, 7, 2, 2, 2}; + long[] expectedNumBytes = new long[]{45, 34, 34, 18, 109, 34, 34, 18}; + long[] expectedReadBytes = new long[]{45, 0, 34, 0, 109, 0, 34, 0}; assertStatus(2, expectedNumRows, expectedNumBytes, expectedReadBytes); } finally { @@ -109,6 +113,7 @@ public class TestQueryUnitStatusUpdate extends QueryTestCaseBase { res = testBase.execute( "insert overwrite into " + tableName + " select l_orderkey, l_partkey, l_quantity from lineitem"); + res.close(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/test/resources/queries/TestJoinBroadcast/customer_partition_ddl.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestJoinBroadcast/customer_partition_ddl.sql b/tajo-core/src/test/resources/queries/TestJoinBroadcast/customer_partition_ddl.sql new file mode 100644 index 0000000..7d07474 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestJoinBroadcast/customer_partition_ddl.sql @@ -0,0 +1,9 @@ +CREATE TABLE customer_broad_parts ( + c_nationkey INT4, + c_name TEXT, + c_address TEXT, + c_phone TEXT, + c_acctbal FLOAT8, + c_mktsegment TEXT, + c_comment TEXT +) PARTITION BY COLUMN (c_custkey INT4); \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/test/resources/queries/TestJoinBroadcast/insert_into_customer_partition.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestJoinBroadcast/insert_into_customer_partition.sql b/tajo-core/src/test/resources/queries/TestJoinBroadcast/insert_into_customer_partition.sql new file mode 100644 index 0000000..3a500a1 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestJoinBroadcast/insert_into_customer_partition.sql @@ -0,0 +1,11 @@ +INSERT OVERWRITE INTO customer_broad_parts + SELECT + c_nationkey, + c_name, + c_address, + c_phone, + c_acctbal, + c_mktsegment, + c_comment, + c_custkey + FROM customer; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/test/resources/queries/TestJoinBroadcast/nation_multifile_ddl.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestJoinBroadcast/nation_multifile_ddl.sql b/tajo-core/src/test/resources/queries/TestJoinBroadcast/nation_multifile_ddl.sql new file mode 100644 index 0000000..c3f595a --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestJoinBroadcast/nation_multifile_ddl.sql @@ -0,0 +1,5 @@ +create table nation_multifile ( + n_nationkey int, + n_name text, + n_regionkey int, + n_comment text); \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/test/resources/queries/TestJoinBroadcast/orders_multifile_ddl.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestJoinBroadcast/orders_multifile_ddl.sql b/tajo-core/src/test/resources/queries/TestJoinBroadcast/orders_multifile_ddl.sql new file mode 100644 index 0000000..64f70f2 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestJoinBroadcast/orders_multifile_ddl.sql @@ -0,0 +1,5 @@ +create table orders_multifile ( + o_orderkey int, + o_custkey int, + o_orderstatus text +); \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/test/resources/queries/TestJoinBroadcast/testBroadcastPartitionTable.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestJoinBroadcast/testBroadcastPartitionTable.sql b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testBroadcastPartitionTable.sql new file mode 100644 index 0000000..800375b --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testBroadcastPartitionTable.sql @@ -0,0 +1,16 @@ +select + c_custkey, + c_name, + c_nationkey, + n_nationkey, + o_orderkey +from + customer_broad_parts, + nation_multifile, + orders_multifile +where + c_nationkey = n_nationkey +and + o_custkey = c_custkey +order by + c_custkey; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case3.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case3.sql b/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case3.sql index 9c9362e..a0f9c78 100644 --- a/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case3.sql +++ b/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case3.sql @@ -7,5 +7,4 @@ from ( (a.key = 45.0 or a.key = 38.0) ) test order by - col1, col2 -; \ No newline at end of file + col1, col2 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastPartitionTable.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastPartitionTable.result b/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastPartitionTable.result new file mode 100644 index 0000000..c03a275 --- /dev/null +++ b/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastPartitionTable.result @@ -0,0 +1,5 @@ +c_custkey,c_name,c_nationkey,n_nationkey,o_orderkey +------------------------------- +2,Customer#000000002,13,13,3 +3,Customer#000000003,1,1,1 +4,Customer#000000004,4,4,2 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/test/resources/results/TestQueryUnitStatusUpdate/case3.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestQueryUnitStatusUpdate/case3.result b/tajo-core/src/test/resources/results/TestQueryUnitStatusUpdate/case3.result new file mode 100644 index 0000000..025d0b4 --- /dev/null +++ b/tajo-core/src/test/resources/results/TestQueryUnitStatusUpdate/case3.result @@ -0,0 +1,4 @@ +col1,col2,key +------------------------------- +2,2,38.0 +3,2,45.0 \ No newline at end of file
