Make sure all minorFragments get work in scan

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/8adb5273
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/8adb5273
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/8adb5273

Branch: refs/heads/master
Commit: 8adb52733601a8dfc5ca01eebdd944cabf709f0d
Parents: e7115e9
Author: Steven Phillips <sphill...@maprtech.com>
Authored: Sat Jun 7 23:11:52 2014 -0700
Committer: Steven Phillips <sphill...@maprtech.com>
Committed: Sat Jun 7 23:11:52 2014 -0700

----------------------------------------------------------------------
 .../drill/exec/store/schedule/AssignmentCreator.java      | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8adb5273/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
index 4ae84fc..4610d88 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
@@ -71,10 +71,11 @@ public class AssignmentCreator<T extends CompleteWork> {
 
     ArrayList<T> rowGroupList = new ArrayList<>(units);
     for (double cutoff : ASSIGNMENT_CUTOFFS) {
-      scanAndAssign(rowGroupList, cutoff, false);
+      scanAndAssign(rowGroupList, cutoff, false, false);
     }
-    scanAndAssign(rowGroupList, 0.0, true);
-    
+    scanAndAssign(rowGroupList, 0.0, true, false);
+    scanAndAssign(rowGroupList, 0.0, true, true);
+
     logger.debug("Took {} ms to apply assignments", 
watch.elapsed(TimeUnit.MILLISECONDS));
     Preconditions.checkState(rowGroupList.isEmpty(), "All readEntries should 
be assigned by now, but some are still unassigned");
     Preconditions.checkState(!units.isEmpty());
@@ -94,7 +95,7 @@ public class AssignmentCreator<T extends CompleteWork> {
    * @param assignAll
    *          if true, will assign even if no affinity
    */
-  private void scanAndAssign(List<T> workunits, double requiredPercentage, 
boolean assignAll) {
+  private void scanAndAssign(List<T> workunits, double requiredPercentage, 
boolean assignAllToEmpty, boolean assignAll) {
     Collections.sort(workunits);
     int fragmentPointer = 0;
     final boolean requireAffinity = requiredPercentage > 0;
@@ -112,6 +113,7 @@ public class AssignmentCreator<T extends CompleteWork> {
         boolean haveAffinity = endpointByteMap.isSet(currentEndpoint);
 
         if (assignAll
+            || (assignAllToEmpty && !mappings.containsKey(minorFragmentId))
             || (!endpointByteMap.isEmpty() && (!requireAffinity || 
haveAffinity)
                 && (!mappings.containsKey(minorFragmentId) || 
mappings.get(minorFragmentId).size() < maxAssignments) && (!requireAffinity || 
endpointByteMap
                 .get(currentEndpoint) >= endpointByteMap.getMaxBytes() * 
requiredPercentage))) {

Reply via email to