DRILL-236 ensure that each minor fragment has at least one entry assigned to it


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b5a3b55c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b5a3b55c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b5a3b55c

Branch: refs/heads/master
Commit: b5a3b55c511e33a05c9665d581fea0a15ccacaf9
Parents: 627c84e
Author: Steven Phillips <[email protected]>
Authored: Thu Sep 5 20:00:47 2013 -0700
Committer: Steven Phillips <[email protected]>
Committed: Fri Sep 20 12:10:29 2013 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/store/parquet/ParquetGroupScan.java | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b5a3b55c/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index b789963..09dc2f7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -280,7 +280,7 @@ public class ParquetGroupScan extends AbstractGroupScan {
   }
 
 
-  static final double[] ASSIGNMENT_CUTOFFS = {0.99, 0.50, 0.25, 0.01};
+  static final double[] ASSIGNMENT_CUTOFFS = {0.99, 0.50, 0.25, 0.00};
 
   /**
    *
@@ -317,7 +317,8 @@ public class ParquetGroupScan extends AbstractGroupScan {
    * @param requiredPercentage the percentage of max bytes required to make an 
assignment
    * @param assignAll if true, will assign even if no affinity
    */
-  private void scanAndAssign (Multimap<Integer, 
ParquetRowGroupScan.RowGroupReadEntry> endpointAssignments, 
List<DrillbitEndpoint> endpoints, List<RowGroupInfo> rowGroups, double 
requiredPercentage, boolean assignAll) {
+  private void scanAndAssign (Multimap<Integer, 
ParquetRowGroupScan.RowGroupReadEntry> endpointAssignments, 
List<DrillbitEndpoint> endpoints,
+                              List<RowGroupInfo> rowGroups, double 
requiredPercentage, boolean assignAll) {
     Collections.sort(rowGroups, new ParquetReadEntryComparator());
     final boolean requireAffinity = requiredPercentage > 0;
     int maxAssignments = (int) (rowGroups.size() / endpoints.size());
@@ -336,7 +337,7 @@ public class ParquetGroupScan extends AbstractGroupScan {
                 (!bytesPerEndpoint.isEmpty() &&
                         (!requireAffinity || haveAffinity) &&
                         (!endpointAssignments.containsKey(minorFragmentId) || 
endpointAssignments.get(minorFragmentId).size() < maxAssignments) &&
-                        bytesPerEndpoint.get(currentEndpoint) >= 
rowGroupInfo.getMaxBytes() * requiredPercentage)) {
+                        (!requireAffinity || 
bytesPerEndpoint.get(currentEndpoint) >= rowGroupInfo.getMaxBytes() * 
requiredPercentage))) {
 
           endpointAssignments.put(minorFragmentId, 
rowGroupInfo.getRowGroupReadEntry());
           logger.debug("Assigned rowGroup {} to minorFragmentId {} endpoint 
{}", rowGroupInfo.getRowGroupIndex(), minorFragmentId, 
endpoints.get(minorFragmentId).getAddress());

Reply via email to