DRILL-236 ensure that each minor fragment has at least one entry assigned to it
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b5a3b55c Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b5a3b55c Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b5a3b55c Branch: refs/heads/master Commit: b5a3b55c511e33a05c9665d581fea0a15ccacaf9 Parents: 627c84e Author: Steven Phillips <[email protected]> Authored: Thu Sep 5 20:00:47 2013 -0700 Committer: Steven Phillips <[email protected]> Committed: Fri Sep 20 12:10:29 2013 -0700 ---------------------------------------------------------------------- .../org/apache/drill/exec/store/parquet/ParquetGroupScan.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b5a3b55c/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index b789963..09dc2f7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -280,7 +280,7 @@ public class ParquetGroupScan extends AbstractGroupScan { } - static final double[] ASSIGNMENT_CUTOFFS = {0.99, 0.50, 0.25, 0.01}; + static final double[] ASSIGNMENT_CUTOFFS = {0.99, 0.50, 0.25, 0.00}; /** * @@ -317,7 +317,8 @@ public class ParquetGroupScan extends AbstractGroupScan { * @param requiredPercentage the percentage of max bytes required to make an assignment * @param assignAll if true, will assign even if no affinity */ - private void scanAndAssign (Multimap<Integer, ParquetRowGroupScan.RowGroupReadEntry> endpointAssignments, List<DrillbitEndpoint> endpoints, List<RowGroupInfo> rowGroups, double requiredPercentage, boolean assignAll) { + private void scanAndAssign (Multimap<Integer, ParquetRowGroupScan.RowGroupReadEntry> endpointAssignments, List<DrillbitEndpoint> endpoints, + List<RowGroupInfo> rowGroups, double requiredPercentage, boolean assignAll) { Collections.sort(rowGroups, new ParquetReadEntryComparator()); final boolean requireAffinity = requiredPercentage > 0; int maxAssignments = (int) (rowGroups.size() / endpoints.size()); @@ -336,7 +337,7 @@ public class ParquetGroupScan extends AbstractGroupScan { (!bytesPerEndpoint.isEmpty() && (!requireAffinity || haveAffinity) && (!endpointAssignments.containsKey(minorFragmentId) || endpointAssignments.get(minorFragmentId).size() < maxAssignments) && - bytesPerEndpoint.get(currentEndpoint) >= rowGroupInfo.getMaxBytes() * requiredPercentage)) { + (!requireAffinity || bytesPerEndpoint.get(currentEndpoint) >= rowGroupInfo.getMaxBytes() * requiredPercentage))) { endpointAssignments.put(minorFragmentId, rowGroupInfo.getRowGroupReadEntry()); logger.debug("Assigned rowGroup {} to minorFragmentId {} endpoint {}", rowGroupInfo.getRowGroupIndex(), minorFragmentId, endpoints.get(minorFragmentId).getAddress());
