TAJO-839: If all tables participate in the BROADCAST JOIN, there is some 
missing data. (Hyoungjun Kim via jihoon)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/43875ba3
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/43875ba3
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/43875ba3

Branch: refs/heads/window_function
Commit: 43875ba36b577a9819bd1794f8ea864e8c95077d
Parents: f3092c4
Author: Jihoon Son <[email protected]>
Authored: Thu Jun 12 13:51:55 2014 +0900
Committer: Jihoon Son <[email protected]>
Committed: Thu Jun 12 13:51:55 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../tajo/catalog/statistics/TableStats.java     |   4 +
 .../apache/tajo/engine/planner/LogicalPlan.java |  10 +-
 .../engine/planner/PhysicalPlannerImpl.java     |  12 +--
 .../tajo/engine/planner/enforce/Enforcer.java   |  18 ++++
 .../engine/planner/global/ExecutionBlock.java   |   5 +
 .../engine/planner/physical/HashJoinExec.java   |  26 +++++
 .../physical/PartitionMergeScanExec.java        |  10 ++
 .../engine/planner/physical/SeqScanExec.java    |  28 +++--
 .../apache/tajo/engine/utils/TupleCacheKey.java |  14 +--
 .../tajo/engine/utils/TupleCacheScanner.java    |   7 +-
 .../tajo/master/DefaultTaskScheduler.java       |   3 +-
 .../tajo/master/querymaster/Repartitioner.java  |  77 ++++++++++----
 .../main/java/org/apache/tajo/worker/Task.java  |  21 ++--
 .../tajo/engine/query/TestJoinBroadcast.java    | 104 ++++++++++++++++++-
 .../apache/tajo/engine/util/TestTupleCache.java |   2 +-
 .../querymaster/TestQueryUnitStatusUpdate.java  |  11 +-
 .../customer_partition_ddl.sql                  |   9 ++
 .../insert_into_customer_partition.sql          |  11 ++
 .../TestJoinBroadcast/nation_multifile_ddl.sql  |   5 +
 .../TestJoinBroadcast/orders_multifile_ddl.sql  |   5 +
 .../testBroadcastPartitionTable.sql             |  16 +++
 .../queries/TestQueryUnitStatusUpdate/case3.sql |   3 +-
 .../testBroadcastPartitionTable.result          |   5 +
 .../TestQueryUnitStatusUpdate/case3.result      |   4 +
 25 files changed, 350 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 96003e4..7fe6e62 100644
--- a/CHANGES
+++ b/CHANGES
@@ -68,6 +68,9 @@ Release 0.9.0 - unreleased
 
   BUG FIXES
 
+    TAJO-839: If all tables participate in the BROADCAST JOIN, there is some 
+    missing data. (Hyoungjun Kim via jihoon)
+
     TAJO-868: TestDateTimeFunctions unit test is occasionally failed. (hyunsik)
 
     TAJO-863: Column order mismatched in the JOIN query with asterisk 
selection. 

http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStats.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStats.java
 
b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStats.java
index dda8cd3..c04545c 100644
--- 
a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStats.java
+++ 
b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStats.java
@@ -47,6 +47,10 @@ public class TableStats implements 
ProtoObject<TableStatsProto>, Cloneable, Gson
   @Expose private List<ColumnStats> columnStatses = null; // repeated
 
   public TableStats() {
+    reset();
+  }
+
+  public void reset() {
     numRows = 0l;
     numBytes = 0l;
     numBlocks = 0;

http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
index 443ee4b..0508bac 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
@@ -489,14 +489,20 @@ public class LogicalPlan {
     sb.append(queryBlockGraph.toStringGraph(getRootBlock().getName()));
     sb.append("-----------------------------\n");
     sb.append("Optimization Log:\n");
+    if (!planingHistory.isEmpty()) {
+      sb.append("[LogicalPlan]\n");
+      for (String eachHistory: planingHistory) {
+        sb.append("\t> ").append(eachHistory).append("\n");
+      }
+    }
     DirectedGraphCursor<String, BlockEdge> cursor =
         new DirectedGraphCursor<String, BlockEdge>(queryBlockGraph, 
getRootBlock().getName());
     while(cursor.hasNext()) {
       QueryBlock block = getBlock(cursor.nextBlock());
       if (block.getPlanHistory().size() > 0) {
-        sb.append("\n[").append(block.getName()).append("]\n");
+        sb.append("[").append(block.getName()).append("]\n");
         for (String log : block.getPlanHistory()) {
-          sb.append("> ").append(log).append("\n");
+          sb.append("\t> ").append(log).append("\n");
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index e508d2c..f41d61d 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -844,16 +844,13 @@ public class PhysicalPlannerImpl implements 
PhysicalPlanner {
 
   public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode 
scanNode, Stack<LogicalNode> node)
       throws IOException {
-    if (ctx.getTable(scanNode.getCanonicalName()) == null) {
-      return new SeqScanExec(ctx, sm, scanNode, null);
-    }
-    Preconditions.checkNotNull(ctx.getTable(scanNode.getCanonicalName()),
-        "Error: There is no table matched to %s", scanNode.getCanonicalName() 
+ "(" + scanNode.getTableName() + ")");    
-
     // check if an input is sorted in the same order to the subsequence sort 
operator.
     // TODO - it works only if input files are raw files. We should check the 
file format.
     // Since the default intermediate file format is raw file, it is not 
problem right now.
     if (checkIfSortEquivalance(ctx, scanNode, node)) {
+      if (ctx.getTable(scanNode.getCanonicalName()) == null) {
+        return new SeqScanExec(ctx, sm, scanNode, null);
+      }
       FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName());
       return new ExternalSortExec(ctx, sm, (SortNode) node.peek(), fragments);
     } else {
@@ -886,6 +883,9 @@ public class PhysicalPlannerImpl implements PhysicalPlanner 
{
         }
       }
 
+      if (ctx.getTable(scanNode.getCanonicalName()) == null) {
+        return new SeqScanExec(ctx, sm, scanNode, null);
+      }
       FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName());
       return new SeqScanExec(ctx, sm, scanNode, fragments);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
index 742736c..36820cc 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
@@ -169,6 +169,24 @@ public class Enforcer implements 
ProtoObject<EnforcerProto> {
     TUtil.putToNestedList(properties, builder.getType(), builder.build());
   }
 
+  public void removeBroadcast(String tableName) {
+    List<EnforceProperty> enforces = properties.get(EnforceType.BROADCAST);
+    if (enforces == null) {
+      return;
+    }
+
+    EnforceProperty found = null;
+    for (EnforceProperty eachProperty: enforces) {
+      BroadcastEnforce enforce = eachProperty.getBroadcast();
+      if (enforce != null && tableName.equals(enforce.getTableName())) {
+        found = eachProperty;
+      }
+    }
+    if (found != null) {
+      enforces.remove(found);
+    }
+  }
+
   public void enforceColumnPartitionAlgorithm(int pid, 
ColumnPartitionAlgorithm algorithm) {
     EnforceProperty.Builder builder = newProperty();
     ColumnPartitionEnforcer.Builder enforce = 
ColumnPartitionEnforcer.newBuilder();

http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
index 7df6b43..b731cec 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@ -113,6 +113,11 @@ public class ExecutionBlock {
     enforcer.addBroadcast(tableName);
   }
 
+  public void removeBroadcastTable(String tableName) {
+    broadcasted.remove(tableName);
+    enforcer.removeBroadcast(tableName);
+  }
+
   public boolean isBroadcastTable(String tableName) {
     return broadcasted.contains(tableName);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
index dea0340..a5e9df0 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
@@ -199,4 +200,29 @@ public class HashJoinExec extends BinaryPhysicalExec {
     return this.plan;
   }
 
+  @Override
+  public TableStats getInputStats() {
+    if (leftChild == null) {
+      return inputStats;
+    }
+    TableStats leftInputStats = leftChild.getInputStats();
+    inputStats.setNumBytes(0);
+    inputStats.setReadBytes(0);
+    inputStats.setNumRows(0);
+
+    if (leftInputStats != null) {
+      inputStats.setNumBytes(leftInputStats.getNumBytes());
+      inputStats.setReadBytes(leftInputStats.getReadBytes());
+      inputStats.setNumRows(leftInputStats.getNumRows());
+    }
+
+    TableStats rightInputStats = rightChild.getInputStats();
+    if (rightInputStats != null) {
+      inputStats.setNumBytes(inputStats.getNumBytes() + 
rightInputStats.getNumBytes());
+      inputStats.setReadBytes(inputStats.getReadBytes() + 
rightInputStats.getReadBytes());
+      inputStats.setNumRows(inputStats.getNumRows() + 
rightInputStats.getNumRows());
+    }
+
+    return inputStats;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
index 7f86ba2..9fa5b76 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
@@ -103,6 +103,7 @@ public class PartitionMergeScanExec extends PhysicalExec {
 
   @Override
   public void close() throws IOException {
+    inputStats.reset();
     for (SeqScanExec scanner : scanners) {
       scanner.close();
       TableStats scannerTableStsts = scanner.getInputStats();
@@ -138,6 +139,15 @@ public class PartitionMergeScanExec extends PhysicalExec {
 
   @Override
   public TableStats getInputStats() {
+    if (iterator != null) {
+      inputStats.reset();
+      for (SeqScanExec scanner : scanners) {
+        TableStats scannerTableStats = scanner.getInputStats();
+        if (scannerTableStats != null) {
+          inputStats.merge(scannerTableStats);
+        }
+      }
+    }
     return inputStats;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 6dbcc3f..0a2b279 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -23,6 +23,7 @@ import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.engine.eval.ConstEval;
@@ -32,15 +33,20 @@ import org.apache.tajo.engine.eval.FieldEval;
 import org.apache.tajo.engine.planner.Projector;
 import org.apache.tajo.engine.planner.Target;
 import org.apache.tajo.engine.planner.logical.ScanNode;
-import org.apache.tajo.engine.utils.*;
+import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.engine.utils.TupleCache;
+import org.apache.tajo.engine.utils.TupleCacheKey;
+import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.Scanner;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
 
 public class SeqScanExec extends PhysicalExec {
@@ -69,8 +75,17 @@ public class SeqScanExec extends PhysicalExec {
     this.fragments = fragments;
 
     if (plan.isBroadcastTable()) {
+      String pathNameKey = "";
+      if (fragments != null) {
+        for (FragmentProto f : fragments) {
+          FileFragment fileFragement = (FileFragment) 
FragmentConvertor.convert(
+              context.getConf(), plan.getTableDesc().getMeta().getStoreType(), 
f);
+          pathNameKey += fileFragement.getPath().getParent().getName();
+        }
+      }
+
       cacheKey = new TupleCacheKey(
-          
context.getTaskId().getQueryUnitId().getExecutionBlockId().toString(), 
plan.getTableName());
+          
context.getTaskId().getQueryUnitId().getExecutionBlockId().toString(), 
plan.getTableName(), pathNameKey);
     }
   }
 
@@ -182,7 +197,6 @@ public class SeqScanExec extends PhysicalExec {
 
   private void initScanner(Schema projected) throws IOException {
     this.projector = new Projector(inSchema, outSchema, plan.getTargets());
-
     if (fragments != null) {
       if (fragments.length > 1) {
         this.scanner = new MergeScanner(context.getConf(), 
plan.getPhysicalSchema(), plan.getTableDesc().getMeta(),
@@ -308,9 +322,9 @@ public class SeqScanExec extends PhysicalExec {
   @Override
   public String toString() {
     if (scanner != null) {
-      return "SeqScanExec:" + plan.getTableName() + "," + 
scanner.getClass().getName();
+      return "SeqScanExec:" + plan + "," + scanner.getClass().getName();
     } else {
-      return "SeqScanExec:" + plan.getTableName();
+      return "SeqScanExec:" + plan;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheKey.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheKey.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheKey.java
index ad9204f..6f39d32 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheKey.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheKey.java
@@ -21,18 +21,12 @@ package org.apache.tajo.engine.utils;
 public class TupleCacheKey {
   String ebId;
   String tableName;
+  String pathName;
 
-  public TupleCacheKey(String ebId, String tableName) {
+  public TupleCacheKey(String ebId, String tableName, String pathName) {
     this.ebId = ebId;
     this.tableName = tableName;
-  }
-
-  public String getEbId() {
-    return ebId;
-  }
-
-  public void setEbId(String ebId) {
-    this.ebId = ebId;
+    this.pathName = pathName;
   }
 
   public String getTableName() {
@@ -55,6 +49,6 @@ public class TupleCacheKey {
 
   @Override
   public String toString() {
-    return ebId + "," + tableName;
+    return ebId + "," + tableName + "," + pathName;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheScanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheScanner.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheScanner.java
index 3b91f94..743d70c 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheScanner.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheScanner.java
@@ -51,7 +51,12 @@ public class TupleCacheScanner implements Scanner {
   public Tuple next() throws IOException {
     if (it.hasNext()) {
       count++;
-      return it.next();
+      Tuple tuple = it.next();
+      try {
+        return (Tuple)tuple.clone();
+      } catch (CloneNotSupportedException e) {
+        throw new IOException(e.getMessage(), e);
+      }
     } else {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java 
b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index 5bfac8b..94d0381 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -190,7 +190,6 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
           scheduledObjectNum++;
           if (castEvent.hasRightFragments()) {
             task.addFragments(castEvent.getRightFragments());
-            //scheduledObjectNum += castEvent.getRightFragments().size();
           }
           subQuery.getEventHandler().handle(new TaskEvent(task.getId(), 
TaskEventType.T_SCHEDULE));
         } else {
@@ -821,7 +820,7 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
               host, container.getTaskPort()));
           assignedRequest.add(attemptId);
 
-          scheduledObjectNum -= task.getAllFragments().size();
+          scheduledObjectNum--;
           taskRequest.getCallback().run(taskAssign.getProto());
         } else {
           throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!");

http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java 
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 292ae13..6c000a1 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -131,17 +131,29 @@ public class Repartitioner {
     // Assigning either fragments or fetch urls to query units
     boolean isAllBroadcastTable = true;
     int baseScanIdx = -1;
+    long maxStats = Long.MIN_VALUE;
+    int maxStatsScanIdx = -1;
     for (int i = 0; i < scans.length; i++) {
       if (!execBlock.isBroadcastTable(scans[i].getCanonicalName())) {
         isAllBroadcastTable = false;
         baseScanIdx = i;
       }
+      // finding largest table.
+      if (stats[i] > maxStats) {
+        maxStats = stats[i];
+        maxStatsScanIdx = i;
+      }
     }
 
+
     if (isAllBroadcastTable) {
-      LOG.info("[Distributed Join Strategy] : Immediate " +  fragments.length 
+ " Way Join on Single Machine");
-      SubQuery.scheduleFragment(subQuery, fragments[0], 
Arrays.asList(Arrays.copyOfRange(fragments, 1, fragments.length)));
-      schedulerContext.setEstimatedTaskNum(1);
+      // set largest table to normal mode
+      baseScanIdx = maxStatsScanIdx;
+      scans[baseScanIdx].setBroadcastTable(false);
+      execBlock.removeBroadcastTable(scans[baseScanIdx].getCanonicalName());
+      LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join 
with all tables, base_table=%s, base_volume=%d",
+          scans[baseScanIdx].getCanonicalName(), stats[baseScanIdx]));
+      scheduleLeafTasksWithBroadcastTable(schedulerContext, subQuery, 
baseScanIdx, fragments);
     } else if (!execBlock.getBroadcastTables().isEmpty()) {
       LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join, 
base_table=%s, base_volume=%d",
           scans[baseScanIdx].getCanonicalName(), stats[baseScanIdx]));
@@ -250,7 +262,6 @@ public class Repartitioner {
                                                           int baseScanId, 
FileFragment[] fragments) throws IOException {
     ExecutionBlock execBlock = subQuery.getBlock();
     ScanNode[] scans = execBlock.getScanNodes();
-    //Preconditions.checkArgument(scans.length == 2, "Must be Join Query");
 
     for (int i = 0; i < scans.length; i++) {
       if (i != baseScanId) {
@@ -258,25 +269,53 @@ public class Repartitioner {
       }
     }
 
-    TableMeta meta;
-    ScanNode scan = scans[baseScanId];
-    TableDesc desc = 
subQuery.getContext().getTableDescMap().get(scan.getCanonicalName());
-    meta = desc.getMeta();
+    // Large table(baseScan)
+    //  -> add all fragment to baseFragments
+    //  -> each fragment is assigned to a Task by DefaultTaskScheduler.handle()
+    // Broadcast table
+    //  all fragments or paths assigned every Large table's scan task.
+    //  -> PARTITIONS_SCAN
+    //     . add all partition paths to node's inputPaths variable
+    //  -> SCAN
+    //     . add all fragments to broadcastFragments
+    Collection<FileFragment> baseFragments = null;
+    List<FileFragment> broadcastFragments = new ArrayList<FileFragment>();
+    for (int i = 0; i < scans.length; i++) {
+      ScanNode scan = scans[i];
+      TableDesc desc = 
subQuery.getContext().getTableDescMap().get(scan.getCanonicalName());
+      TableMeta meta = desc.getMeta();
+
+      Collection<FileFragment> scanFragments;
+      Path[] partitionScanPaths = null;
+      if (scan.getType() == NodeType.PARTITIONS_SCAN) {
+        PartitionedTableScanNode partitionScan = 
(PartitionedTableScanNode)scan;
+        partitionScanPaths = partitionScan.getInputPaths();
+        // set null to inputPaths in getFragmentsFromPartitionedTable()
+        scanFragments = 
getFragmentsFromPartitionedTable(subQuery.getStorageManager(), scan, desc);
+      } else {
+        scanFragments = 
subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, 
desc.getSchema(),
+            desc.getPath());
+      }
 
-    Collection<FileFragment> baseFragments;
-    if (scan.getType() == NodeType.PARTITIONS_SCAN) {
-      baseFragments = 
getFragmentsFromPartitionedTable(subQuery.getStorageManager(), scan, desc);
-    } else {
-      baseFragments = 
subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, 
desc.getSchema(),
-          desc.getPath());
+      if (scanFragments != null) {
+        if (i == baseScanId) {
+          baseFragments = scanFragments;
+        } else {
+          if (scan.getType() == NodeType.PARTITIONS_SCAN) {
+            PartitionedTableScanNode partitionScan = 
(PartitionedTableScanNode)scan;
+            // PhisicalPlanner make PartitionMergeScanExec when table is 
boradcast table and inputpaths is not empty
+            partitionScan.setInputPaths(partitionScanPaths);
+          } else {
+            broadcastFragments.addAll(scanFragments);
+          }
+        }
+      }
     }
 
-    List<FileFragment> broadcastFragments = new ArrayList<FileFragment>();
-    for (int i = 0; i < fragments.length; i++) {
-      if (i != baseScanId) {
-        broadcastFragments.add(fragments[i]);
-      }
+    if (baseFragments == null) {
+      throw new IOException("No fragments for " + 
scans[baseScanId].getTableName());
     }
+
     SubQuery.scheduleFragments(subQuery, baseFragments, broadcastFragments);
     schedulerContext.setEstimatedTaskNum(baseFragments.size());
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 84f41a2..c6e2b73 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -39,10 +39,7 @@ import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.json.CoreGsonHelper;
 import org.apache.tajo.engine.planner.PlannerUtil;
-import org.apache.tajo.engine.planner.logical.LogicalNode;
-import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.engine.planner.logical.ScanNode;
-import org.apache.tajo.engine.planner.logical.SortNode;
+import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.planner.physical.PhysicalExec;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.engine.query.QueryUnitRequest;
@@ -151,9 +148,19 @@ public class Task {
 
     plan = CoreGsonHelper.fromJson(request.getSerializedData(), 
LogicalNode.class);
     LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
-    for (LogicalNode node : scanNode) {
-      ScanNode scan = (ScanNode)node;
-      descs.put(scan.getCanonicalName(), scan.getTableDesc());
+    if (scanNode != null) {
+      for (LogicalNode node : scanNode) {
+        ScanNode scan = (ScanNode) node;
+        descs.put(scan.getCanonicalName(), scan.getTableDesc());
+      }
+    }
+
+    LogicalNode [] partitionScanNode = PlannerUtil.findAllNodes(plan, 
NodeType.PARTITIONS_SCAN);
+    if (partitionScanNode != null) {
+      for (LogicalNode node : partitionScanNode) {
+        PartitionedTableScanNode scan = (PartitionedTableScanNode) node;
+        descs.put(scan.getCanonicalName(), scan.getTableDesc());
+      }
     }
 
     interQuery = request.getProto().getInterQuery();

http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java 
b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
index 1581372..ae87c64 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
@@ -18,29 +18,43 @@
 
 package org.apache.tajo.engine.query;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
 import org.apache.tajo.IntegrationTest;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryTestCaseBase;
 import org.apache.tajo.TajoConstants;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.Int4Datum;
+import org.apache.tajo.datum.TextDatum;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.jdbc.TajoResultSet;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.storage.Appender;
+import org.apache.tajo.storage.StorageManagerFactory;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.FileUtil;
 import org.apache.tajo.worker.TajoWorker;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.io.File;
 import java.sql.ResultSet;
 
-import static junit.framework.TestCase.assertEquals;
-import static junit.framework.TestCase.assertTrue;
-import static junit.framework.TestCase.fail;
+import static junit.framework.TestCase.*;
 import static org.junit.Assert.assertNotNull;
 
 @Category(IntegrationTest.class)
 public class TestJoinBroadcast extends QueryTestCaseBase {
+  private static final Log LOG = LogFactory.getLog(TestJoinBroadcast.class);
   public TestJoinBroadcast() throws Exception {
     super(TajoConstants.DEFAULT_DATABASE_NAME);
     
testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_AUTO.varname,
 "true");
@@ -375,4 +389,88 @@ public class TestJoinBroadcast extends QueryTestCaseBase {
     cleanupQuery(res);
   }
 
+  @Test
+  public final void testBroadcastPartitionTable() throws Exception {
+    // https://issues.apache.org/jira/browse/TAJO-839
+    // If all tables participate in the BROADCAST JOIN, there is some missing 
data.
+    executeDDL("customer_partition_ddl.sql", null);
+    ResultSet res = executeFile("insert_into_customer_partition.sql");
+    res.close();
+
+    createMultiFile("nation", 2, new TupleCreator() {
+      public Tuple createTuple(String[] columnDatas) {
+        return new VTuple(new Datum[]{
+            new Int4Datum(Integer.parseInt(columnDatas[0])),
+            new TextDatum(columnDatas[1]),
+            new Int4Datum(Integer.parseInt(columnDatas[2])),
+            new TextDatum(columnDatas[3])
+        });
+      }
+    });
+
+    createMultiFile("orders", 1, new TupleCreator() {
+      public Tuple createTuple(String[] columnDatas) {
+        return new VTuple(new Datum[]{
+            new Int4Datum(Integer.parseInt(columnDatas[0])),
+            new Int4Datum(Integer.parseInt(columnDatas[1])),
+            new TextDatum(columnDatas[2])
+        });
+      }
+    });
+
+    res = executeQuery();
+    assertResultSet(res);
+    res.close();
+
+    executeString("DROP TABLE customer_broad_parts PURGE");
+    executeString("DROP TABLE nation_multifile PURGE");
+    executeString("DROP TABLE orders_multifile PURGE");
+  }
+
+  static interface TupleCreator {
+    public Tuple createTuple(String[] columnDatas);
+  }
+
+  private void createMultiFile(String tableName, int numRowsEachFile, 
TupleCreator tupleCreator) throws Exception {
+    // make multiple small file
+    String multiTableName = tableName + "_multifile";
+    executeDDL(multiTableName + "_ddl.sql", null);
+
+    TableDesc table = client.getTableDesc(multiTableName);
+    assertNotNull(table);
+
+    TableMeta tableMeta = table.getMeta();
+    Schema schema = table.getLogicalSchema();
+
+    File file = new File("src/test/tpch/" + tableName + ".tbl");
+
+    if (!file.exists()) {
+      file = new File(System.getProperty("user.dir") + 
"/tajo-core/src/test/tpch/" + tableName + ".tbl");
+    }
+    String[] rows = FileUtil.readTextFile(file).split("\n");
+
+    assertTrue(rows.length > 0);
+
+    int fileIndex = 0;
+
+    Appender appender = null;
+    for (int i = 0; i < rows.length; i++) {
+      if (i % numRowsEachFile == 0) {
+        if (appender != null) {
+          appender.flush();
+          appender.close();
+        }
+        Path dataPath = new Path(table.getPath(), fileIndex + ".csv");
+        fileIndex++;
+        appender = 
StorageManagerFactory.getStorageManager(conf).getAppender(tableMeta, schema,
+            dataPath);
+        appender.init();
+      }
+      String[] columnDatas = rows[i].split("\\|");
+      Tuple tuple = tupleCreator.createTuple(columnDatas);
+      appender.addTuple(tuple);
+    }
+    appender.flush();
+    appender.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleCache.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleCache.java 
b/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleCache.java
index 1cbbdf9..3d2f307 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleCache.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleCache.java
@@ -53,7 +53,7 @@ public class TestTupleCache {
     ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(
         QueryIdFactory.newQueryId(System.currentTimeMillis(), 0));
 
-    TupleCacheKey cacheKey = new TupleCacheKey(ebId.toString(), "TestTable");
+    TupleCacheKey cacheKey = new TupleCacheKey(ebId.toString(), "TestTable", 
"test");
     TupleCache tupleCache = TupleCache.getInstance();
 
     assertFalse(tupleCache.isBroadcastCacheReady(cacheKey));

http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java
 
b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java
index fa89dc3..c52b277 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java
@@ -86,9 +86,13 @@ public class TestQueryUnitStatusUpdate extends 
QueryTestCaseBase {
 
       res = executeQuery();
 
-      long[] expectedNumRows = new long[]{7, 2, 2, 2, 7, 2, 2, 2};
-      long[] expectedNumBytes = new long[]{63, 34, 34, 18, 109, 34, 34, 18};
-      long[] expectedReadBytes = new long[]{63, 0, 34, 0, 109, 0, 34, 0};
+      String actualResult = resultSetToString(res);
+      System.out.println(actualResult);
+
+      // first stage's num rows = (left: 1 , right: 2 (filtered)) * 5 (tasks)
+      long[] expectedNumRows = new long[]{15, 2, 2, 2, 7, 2, 2, 2};
+      long[] expectedNumBytes = new long[]{45, 34, 34, 18, 109, 34, 34, 18};
+      long[] expectedReadBytes = new long[]{45, 0, 34, 0, 109, 0, 34, 0};
 
       assertStatus(2, expectedNumRows, expectedNumBytes, expectedReadBytes);
     } finally {
@@ -109,6 +113,7 @@ public class TestQueryUnitStatusUpdate extends 
QueryTestCaseBase {
 
     res = testBase.execute(
         "insert overwrite into " + tableName + " select l_orderkey, l_partkey, 
l_quantity from lineitem");
+
     res.close();
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/test/resources/queries/TestJoinBroadcast/customer_partition_ddl.sql
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/queries/TestJoinBroadcast/customer_partition_ddl.sql
 
b/tajo-core/src/test/resources/queries/TestJoinBroadcast/customer_partition_ddl.sql
new file mode 100644
index 0000000..7d07474
--- /dev/null
+++ 
b/tajo-core/src/test/resources/queries/TestJoinBroadcast/customer_partition_ddl.sql
@@ -0,0 +1,9 @@
+CREATE TABLE customer_broad_parts (
+  c_nationkey INT4,
+  c_name    TEXT,
+  c_address    TEXT,
+  c_phone    TEXT,
+  c_acctbal    FLOAT8,
+  c_mktsegment    TEXT,
+  c_comment    TEXT
+) PARTITION BY COLUMN (c_custkey    INT4);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/test/resources/queries/TestJoinBroadcast/insert_into_customer_partition.sql
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/queries/TestJoinBroadcast/insert_into_customer_partition.sql
 
b/tajo-core/src/test/resources/queries/TestJoinBroadcast/insert_into_customer_partition.sql
new file mode 100644
index 0000000..3a500a1
--- /dev/null
+++ 
b/tajo-core/src/test/resources/queries/TestJoinBroadcast/insert_into_customer_partition.sql
@@ -0,0 +1,11 @@
+INSERT OVERWRITE INTO customer_broad_parts
+  SELECT
+    c_nationkey,
+    c_name,
+    c_address,
+    c_phone,
+    c_acctbal,
+    c_mktsegment,
+    c_comment,
+    c_custkey
+  FROM customer;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/test/resources/queries/TestJoinBroadcast/nation_multifile_ddl.sql
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/queries/TestJoinBroadcast/nation_multifile_ddl.sql
 
b/tajo-core/src/test/resources/queries/TestJoinBroadcast/nation_multifile_ddl.sql
new file mode 100644
index 0000000..c3f595a
--- /dev/null
+++ 
b/tajo-core/src/test/resources/queries/TestJoinBroadcast/nation_multifile_ddl.sql
@@ -0,0 +1,5 @@
+create table nation_multifile (
+    n_nationkey int,
+    n_name text,
+    n_regionkey int,
+    n_comment text);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/test/resources/queries/TestJoinBroadcast/orders_multifile_ddl.sql
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/queries/TestJoinBroadcast/orders_multifile_ddl.sql
 
b/tajo-core/src/test/resources/queries/TestJoinBroadcast/orders_multifile_ddl.sql
new file mode 100644
index 0000000..64f70f2
--- /dev/null
+++ 
b/tajo-core/src/test/resources/queries/TestJoinBroadcast/orders_multifile_ddl.sql
@@ -0,0 +1,5 @@
+create table orders_multifile (
+    o_orderkey int,
+    o_custkey int,
+    o_orderstatus text
+);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/test/resources/queries/TestJoinBroadcast/testBroadcastPartitionTable.sql
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/queries/TestJoinBroadcast/testBroadcastPartitionTable.sql
 
b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testBroadcastPartitionTable.sql
new file mode 100644
index 0000000..800375b
--- /dev/null
+++ 
b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testBroadcastPartitionTable.sql
@@ -0,0 +1,16 @@
+select
+  c_custkey,
+  c_name,
+  c_nationkey,
+  n_nationkey,
+  o_orderkey
+from
+  customer_broad_parts,
+  nation_multifile,
+  orders_multifile
+where
+  c_nationkey = n_nationkey
+and
+  o_custkey = c_custkey
+order by
+  c_custkey;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case3.sql
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case3.sql 
b/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case3.sql
index 9c9362e..a0f9c78 100644
--- a/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case3.sql
+++ b/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case3.sql
@@ -7,5 +7,4 @@ from (
     (a.key = 45.0 or a.key = 38.0)
 ) test
 order by
-  col1, col2
-;
\ No newline at end of file
+  col1, col2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastPartitionTable.result
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastPartitionTable.result
 
b/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastPartitionTable.result
new file mode 100644
index 0000000..c03a275
--- /dev/null
+++ 
b/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastPartitionTable.result
@@ -0,0 +1,5 @@
+c_custkey,c_name,c_nationkey,n_nationkey,o_orderkey
+-------------------------------
+2,Customer#000000002,13,13,3
+3,Customer#000000003,1,1,1
+4,Customer#000000004,4,4,2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/43875ba3/tajo-core/src/test/resources/results/TestQueryUnitStatusUpdate/case3.result
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/results/TestQueryUnitStatusUpdate/case3.result 
b/tajo-core/src/test/resources/results/TestQueryUnitStatusUpdate/case3.result
new file mode 100644
index 0000000..025d0b4
--- /dev/null
+++ 
b/tajo-core/src/test/resources/results/TestQueryUnitStatusUpdate/case3.result
@@ -0,0 +1,4 @@
+col1,col2,key
+-------------------------------
+2,2,38.0
+3,2,45.0
\ No newline at end of file

Reply via email to