Repository: incubator-impala Updated Branches: refs/heads/master 4918b20ac -> dead771d1
IMPALA-4415: Fix unassigned scan range of size 1 ComputeScanRangeAssignment() computes the per-node scan-range load as the total number of bytes to be scanned / number of nodes, and casts that to a float. However, that computation suffers from a precision issue where the average * num_nodes may be 1 byte less than the total. The scan range assignment loop continues until at least average * num_nodes bytes have been assigned. If the last scan range has only 1 byte, it will not be assigned (if it has 2 or more bytes, it will be considered for assignment before the loop exit condition is met). The fix is to make sure that all instances are assigned in the last iteration of the assignment loop, even if the per-node threshold is already met. Testing: No local repro was found - only S3 and LocalFS builds have it. A unit test requires a lot of infrastructure from simple-scheduler-util that doesn't exist yet (e.g. ComputeScanRangeAssignment() is not called). S3 and LocalFS full test builds are in progress. Change-Id: Id3af767ee9d121ca62ac383ef9e696a18dc903d6 Reviewed-on: http://gerrit.cloudera.org:8080/4907 Tested-by: Internal Jenkins Reviewed-by: Henry Robinson <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/dead771d Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/dead771d Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/dead771d Branch: refs/heads/master Commit: dead771d1172b6095d19a64cef48ab9adf7ccb09 Parents: 4918b20 Author: Henry Robinson <[email protected]> Authored: Tue Nov 1 14:36:21 2016 -0700 Committer: Henry Robinson <[email protected]> Committed: Wed Nov 2 05:27:29 2016 +0000 ---------------------------------------------------------------------- be/src/scheduling/simple-scheduler.cc | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dead771d/be/src/scheduling/simple-scheduler.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/simple-scheduler.cc b/be/src/scheduling/simple-scheduler.cc index 9ffc273..973e88b 100644 --- a/be/src/scheduling/simple-scheduler.cc +++ b/be/src/scheduling/simple-scheduler.cc @@ -511,14 +511,12 @@ void SimpleScheduler::CreateScanInstances( schedule->GetNextInstanceId(), host, i, *fragment_params); FInstanceExecParams& instance_params = fragment_params->instance_exec_params.back(); - // threshold beyond which we want to assign to the next instance + // Threshold beyond which we want to assign to the next instance. int64_t threshold_total_bytes = avg_bytes_per_instance * (i + 1); - // this will have assigned all scan ranges by the last instance: - // for the last instance, threshold_total_bytes == total_size and - // total_assigned_bytes won't hit total_size until everything is assigned - while (params_idx < params_list.size() - && total_assigned_bytes < threshold_total_bytes) { + // Assign each scan range in params_list. When the per-instance threshold is + // reached, move on to the next instance. + while (params_idx < params_list.size()) { const TScanRangeParams& scan_range_params = params_list[params_idx]; instance_params.per_node_scan_ranges[leftmost_scan_id].push_back( scan_range_params); @@ -529,10 +527,19 @@ void SimpleScheduler::CreateScanInstances( ++total_assigned_bytes; } ++params_idx; + // If this assignment pushes this instance past the threshold, move on to the next + // instance. However, if this is the last instance, assign any remaining scan + // ranges here since there are no further instances to load-balance across. There + // may be leftover scan ranges because threshold_total_bytes only approximates the + // per-node byte threshold. + if (total_assigned_bytes >= threshold_total_bytes && i != num_instances - 1) { + break; + } } - if (params_idx >= params_list.size()) break; // nothing left to assign + if (params_idx == params_list.size()) break; // nothing left to assign } DCHECK_EQ(params_idx, params_list.size()); // everything got assigned + DCHECK_EQ(total_assigned_bytes, total_size); } }
