[ 
https://issues.apache.org/jira/browse/DRILL-4706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15627397#comment-15627397
 ] 

ASF GitHub Bot commented on DRILL-4706:
---------------------------------------

Github user sohami commented on a diff in the pull request:

    https://github.com/apache/drill/pull/639#discussion_r86060309
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
 ---
    @@ -822,10 +838,103 @@ private void getFiles(String path, List<FileStatus> 
fileStatuses) throws IOExcep
         }
       }
     
    +  /*
    +   * Figure out the best node to scan each of the rowGroups and update the 
preferredEndpoint.
    +   * Based on this, update the total work units assigned to the endpoint 
in the endpointAffinity.
    +   */
    +  private void computeRowGroupAssignment() {
    +    Map<DrillbitEndpoint, Integer> numEndpointAssignments = 
Maps.newHashMap();
    +    Map<DrillbitEndpoint, Long> numAssignedBytes = Maps.newHashMap();
    +
    +    // Do this for 2 iterations to adjust node assignments after first 
iteration.
    +    int numIterartions = 2;
    +
    +    while (numIterartions-- > 0) {
    +
    +      for (RowGroupInfo rowGroupInfo : rowGroupInfos) {
    +        EndpointByteMap endpointByteMap = rowGroupInfo.getByteMap();
    +
    +        // This can be empty for local file system or if drilbit is not 
running
    +        // on hosts which have data.
    +        if (endpointByteMap.isEmpty()) {
    +          continue;
    +        }
    +
    +        // Get the list of endpoints which have maximum (equal) data.
    +        List<DrillbitEndpoint> topEndpoints = 
endpointByteMap.getTopEndpoints();
    +
    +        long minBytes = 0, numBytes = 0;
    +        DrillbitEndpoint nodePicked = rowGroupInfo.preferredEndpoint;
    +        if (nodePicked != null && 
numAssignedBytes.containsKey(nodePicked)) {
    +          minBytes = numAssignedBytes.get(nodePicked);
    +        }
    +
    +        DrillbitEndpoint previousNodePicked = nodePicked;
    +
    +        for (DrillbitEndpoint endpoint : topEndpoints) {
    +          if (nodePicked == null) {
    +            nodePicked = endpoint;
    +            if (numAssignedBytes.containsKey(nodePicked)) {
    +              minBytes = numAssignedBytes.get(nodePicked);
    +            }
    +          }
    +
    +          if (numAssignedBytes.containsKey(endpoint)) {
    +            numBytes = numAssignedBytes.get(endpoint);
    +          } else {
    +            numBytes = 0;
    +          }
    +
    +          if (numBytes < minBytes) {
    +            nodePicked = endpoint;
    +            minBytes = numBytes;
    +          }
    +        }
    +
    +        if (nodePicked != null && nodePicked != previousNodePicked) {
    +          numAssignedBytes.put(nodePicked, minBytes + 
endpointByteMap.get(nodePicked));
    +          if (numEndpointAssignments.containsKey(nodePicked)) {
    +            numEndpointAssignments.put(nodePicked, 
numEndpointAssignments.get(nodePicked) + 1);
    +          } else {
    +            numEndpointAssignments.put(nodePicked, 1);
    +          }
    +
    +          // If a different node is picked in second iteration, update.
    +          if (previousNodePicked != null) {
    +            numAssignedBytes.put(previousNodePicked,
    +                numAssignedBytes.get(previousNodePicked) - 
endpointByteMap.get(previousNodePicked));
    +            numEndpointAssignments.put(previousNodePicked, 
numEndpointAssignments.get(previousNodePicked) - 1);
    +          }
    +        }
    +        rowGroupInfo.preferredEndpoint = nodePicked;
    +      }
    +    }
    +
    +    // Set the number of local work units for each endpoint in the 
endpointAffinity.
    +    for (EndpointAffinity epAff : endpointAffinities) {
    +      DrillbitEndpoint endpoint = epAff.getEndpoint();
    +      if (numEndpointAssignments.containsKey(endpoint)) {
    +        epAff.setNumLocalWorkUnits(numEndpointAssignments.get(endpoint));
    +      } else {
    +        epAff.setNumLocalWorkUnits(0);
    --- End diff --
    
    "else" condition is not required since by default it will be set to 0


> Fragment planning causes Drillbits to read remote chunks when local copies 
> are available
> ----------------------------------------------------------------------------------------
>
>                 Key: DRILL-4706
>                 URL: https://issues.apache.org/jira/browse/DRILL-4706
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Query Planning & Optimization
>    Affects Versions: 1.6.0
>         Environment: CentOS, RHEL
>            Reporter: Kunal Khatua
>            Assignee: Sorabh Hamirwasia
>              Labels: performance, planning
>
> When a table (datasize=70GB) of 160 parquet files (each having a single 
> rowgroup and fitting within one chunk) is available on a 10-node setup with 
> replication=3 ; a pure data scan query causes about 2% of the data to be read 
> remotely. 
> Even with the creation of metadata cache, the planner is selecting a 
> sub-optimal plan of executing the SCAN fragments such that some of the data 
> is served from a remote server. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to