Make sure all minorFragments get work in scan
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/8adb5273 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/8adb5273 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/8adb5273 Branch: refs/heads/master Commit: 8adb52733601a8dfc5ca01eebdd944cabf709f0d Parents: e7115e9 Author: Steven Phillips <sphill...@maprtech.com> Authored: Sat Jun 7 23:11:52 2014 -0700 Committer: Steven Phillips <sphill...@maprtech.com> Committed: Sat Jun 7 23:11:52 2014 -0700 ---------------------------------------------------------------------- .../drill/exec/store/schedule/AssignmentCreator.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8adb5273/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java index 4ae84fc..4610d88 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java @@ -71,10 +71,11 @@ public class AssignmentCreator<T extends CompleteWork> { ArrayList<T> rowGroupList = new ArrayList<>(units); for (double cutoff : ASSIGNMENT_CUTOFFS) { - scanAndAssign(rowGroupList, cutoff, false); + scanAndAssign(rowGroupList, cutoff, false, false); } - scanAndAssign(rowGroupList, 0.0, true); - + scanAndAssign(rowGroupList, 0.0, true, false); + scanAndAssign(rowGroupList, 0.0, true, true); + logger.debug("Took {} ms to apply assignments", watch.elapsed(TimeUnit.MILLISECONDS)); Preconditions.checkState(rowGroupList.isEmpty(), "All readEntries should be assigned by now, but some are still unassigned"); Preconditions.checkState(!units.isEmpty()); @@ -94,7 +95,7 @@ public class AssignmentCreator<T extends CompleteWork> { * @param assignAll * if true, will assign even if no affinity */ - private void scanAndAssign(List<T> workunits, double requiredPercentage, boolean assignAll) { + private void scanAndAssign(List<T> workunits, double requiredPercentage, boolean assignAllToEmpty, boolean assignAll) { Collections.sort(workunits); int fragmentPointer = 0; final boolean requireAffinity = requiredPercentage > 0; @@ -112,6 +113,7 @@ public class AssignmentCreator<T extends CompleteWork> { boolean haveAffinity = endpointByteMap.isSet(currentEndpoint); if (assignAll + || (assignAllToEmpty && !mappings.containsKey(minorFragmentId)) || (!endpointByteMap.isEmpty() && (!requireAffinity || haveAffinity) && (!mappings.containsKey(minorFragmentId) || mappings.get(minorFragmentId).size() < maxAssignments) && (!requireAffinity || endpointByteMap .get(currentEndpoint) >= endpointByteMap.getMaxBytes() * requiredPercentage))) {