Repository: incubator-impala Updated Branches: refs/heads/master dc2f69e5a -> 985698bb6
IMPALA-4833: Compute precise per-host reservation size, pt2 Addresses some comments from the CR: https://gerrit.cloudera.org/#/c/7630/8 Notably changes the name of initial_reservation_total_bytes to initial_reservation_total_claims and attempts to clarify comments. Change-Id: I1391d99ca15d2ebcaabd564fbe1242806be09f72 Reviewed-on: http://gerrit.cloudera.org:8080/7681 Reviewed-by: Dan Hecht <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/985698bb Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/985698bb Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/985698bb Branch: refs/heads/master Commit: 985698bb6f69a61bc2bd88b8fb7e89d476d970a5 Parents: dc2f69e Author: Matthew Jacobs <[email protected]> Authored: Tue Aug 15 14:24:09 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Fri Aug 18 19:34:35 2017 +0000 ---------------------------------------------------------------------- be/src/runtime/coordinator-backend-state.cc | 4 ++-- be/src/runtime/query-state.cc | 2 +- be/src/scheduling/query-schedule.h | 15 ++++++++++----- be/src/scheduling/scheduler.cc | 4 ++-- common/thrift/ImpalaInternalService.thrift | 17 +++++++++++------ common/thrift/Planner.thrift | 12 +++++++----- .../org/apache/impala/planner/PlanFragment.java | 12 ++++++------ 7 files changed, 39 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/985698bb/be/src/runtime/coordinator-backend-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc index 515b0b3..06fafc5 100644 --- a/be/src/runtime/coordinator-backend-state.cc +++ b/be/src/runtime/coordinator-backend-state.cc @@ -94,8 +94,8 @@ void Coordinator::BackendState::SetRpcParams( rpc_params->__set_protocol_version(ImpalaInternalServiceVersion::V1); rpc_params->__set_coord_state_idx(state_idx_); rpc_params->__set_min_reservation_bytes(backend_exec_params_->min_reservation_bytes); - rpc_params->__set_initial_reservation_total_bytes( - backend_exec_params_->initial_reservation_total_bytes); + rpc_params->__set_initial_reservation_total_claims( + backend_exec_params_->initial_reservation_total_claims); // set fragment_ctxs and fragment_instance_ctxs rpc_params->__isset.fragment_ctxs = true; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/985698bb/be/src/runtime/query-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc index c279fa0..1c9b03b 100644 --- a/be/src/runtime/query-state.cc +++ b/be/src/runtime/query-state.cc @@ -139,7 +139,7 @@ Status QueryState::Init(const TExecQueryFInstancesParams& rpc_params) { // to handle releasing it if a later step fails. initial_reservations_ = obj_pool_.Add(new InitialReservations(&obj_pool_, buffer_reservation_, query_mem_tracker_, - rpc_params.initial_reservation_total_bytes)); + rpc_params.initial_reservation_total_claims)); RETURN_IF_ERROR( initial_reservations_->Init(query_id(), rpc_params.min_reservation_bytes)); DCHECK_EQ(0, initial_reservation_refcnt_.Load()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/985698bb/be/src/scheduling/query-schedule.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h index adb39ef..28ccd6f 100644 --- a/be/src/scheduling/query-schedule.h +++ b/be/src/scheduling/query-schedule.h @@ -53,13 +53,18 @@ struct BackendExecParams { /// FInstanceExecParams are owned by QuerySchedule::fragment_exec_params_. std::vector<const FInstanceExecParams*> instance_params; - // The minimum reservation size (in bytes) required for all fragments in - // instance_params. + // The minimum query-wide buffer reservation size (in bytes) required for this backend. + // This is the peak minimum reservation that may be required by the + // concurrently-executing operators at any point in query execution. It may be less + // than the initial reservation total claims (below) if execution of some operators + // never overlaps, which allows reuse of reservations. int64_t min_reservation_bytes; - // The total of initial reservations (in bytes) that will be claimed over the lifetime - // of this query for the fragments in instance_params. - int64_t initial_reservation_total_bytes; + // Total of the initial buffer reservations that we expect to be claimed on this + // backend for all fragment instances in instance_params. I.e. the sum over all + // operators in all fragment instances that execute on this backend. This is used for + // an optimization in InitialReservation. Measured in bytes. + int64_t initial_reservation_total_claims; }; /// map from an impalad host address to the list of assigned fragment instance params. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/985698bb/be/src/scheduling/scheduler.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc index 619c89c..1c39230 100644 --- a/be/src/scheduling/scheduler.cc +++ b/be/src/scheduling/scheduler.cc @@ -708,8 +708,8 @@ void Scheduler::ComputeBackendExecParams(QuerySchedule* schedule) { // i.e. that this backend's peak resources is the sum of the per-fragment-instance // peak resources for the instances executing on this backend. be_params.min_reservation_bytes += f.fragment.min_reservation_bytes; - be_params.initial_reservation_total_bytes += - f.fragment.initial_reservation_total_bytes; + be_params.initial_reservation_total_claims += + f.fragment.initial_reservation_total_claims; } } schedule->set_per_backend_exec_params(per_backend_params); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/985698bb/common/thrift/ImpalaInternalService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index 7328cc2..b094d3e 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -480,14 +480,19 @@ struct TExecQueryFInstancesParams { // required in V1 5: optional list<TPlanFragmentInstanceCtx> fragment_instance_ctxs - // The minimum reservation size (in bytes) required for all fragments in fragment_ctxs. - // required in V1 + // The minimum query-wide buffer reservation size (in bytes) required for the backend + // executing the instances in fragment_instance_ctxs. This is the peak minimum + // reservation that may be required by the concurrently-executing operators at any + // point in query execution. It may be less than the initial reservation total claims + // (below) if execution of some operators never overlaps, which allows reuse of + // reservations. required in V1 6: optional i64 min_reservation_bytes - // The total of initial reservations (in bytes) that will be claimed over the lifetime - // of this query for the fragments in fragment_ctxs. - // required in V1 - 7: optional i64 initial_reservation_total_bytes + // Total of the initial buffer reservations that we expect to be claimed on this + // backend for all fragment instances in fragment_instance_ctxs. I.e. the sum over all + // operators in all fragment instances that execute on this backend. This is used for + // an optimization in InitialReservation. Measured in bytes. required in V1 + 7: optional i64 initial_reservation_total_claims } struct TExecQueryFInstancesResult { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/985698bb/common/thrift/Planner.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/Planner.thrift b/common/thrift/Planner.thrift index 1634dd4..9c1c8e7 100644 --- a/common/thrift/Planner.thrift +++ b/common/thrift/Planner.thrift @@ -61,13 +61,15 @@ struct TPlanFragment { // output, which is specified by output_sink.output_partitioning. 6: required Partitions.TDataPartition partition - // The minimum reservation size (in bytes) required for this plan fragment to execute - // on a single host. + // The minimum reservation size (in bytes) required for an instance of this plan + // fragment to execute on a single host. 7: optional i64 min_reservation_bytes - // The total of initial reservations (in bytes) that will be claimed over the lifetime - // of this fragment. - 8: optional i64 initial_reservation_total_bytes + // Total of the initial buffer reservations that we expect to be claimed by this + // fragment. I.e. the sum of the min reservations over all operators (including the + // sink) in a single instance of this fragment. This is used for an optimization in + // InitialReservation. Measured in bytes. required in V1 + 8: optional i64 initial_reservation_total_claims } // location information for a single scan range http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/985698bb/fe/src/main/java/org/apache/impala/planner/PlanFragment.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java index 2f6f7de..ab72863 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java +++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java @@ -112,7 +112,7 @@ public class PlanFragment extends TreeNode<PlanFragment> { // The total of initial reservations (in bytes) that will be claimed over the lifetime // of this fragment. Computed in computeResourceProfile(). - private long initialReservationTotalBytes_ = -1; + private long initialReservationTotalClaims_ = -1; /** * C'tor for fragment with specific partition; the output is by default broadcast. @@ -238,9 +238,9 @@ public class PlanFragment extends TreeNode<PlanFragment> { resourceProfile_ = planTreeProfile.duringOpenProfile.max(fInstancePostOpenProfile); - initialReservationTotalBytes_ = sink_.getResourceProfile().getMinReservationBytes(); + initialReservationTotalClaims_ = sink_.getResourceProfile().getMinReservationBytes(); for (PlanNode node: collectPlanNodes()) { - initialReservationTotalBytes_ += + initialReservationTotalClaims_ += node.getNodeResourceProfile().getMinReservationBytes(); } } @@ -313,12 +313,12 @@ public class PlanFragment extends TreeNode<PlanFragment> { if (sink_ != null) result.setOutput_sink(sink_.toThrift()); result.setPartition(dataPartition_.toThrift()); if (resourceProfile_.isValid()) { - Preconditions.checkArgument(initialReservationTotalBytes_ > -1); + Preconditions.checkArgument(initialReservationTotalClaims_ > -1); result.setMin_reservation_bytes(resourceProfile_.getMinReservationBytes()); - result.setInitial_reservation_total_bytes(initialReservationTotalBytes_); + result.setInitial_reservation_total_claims(initialReservationTotalClaims_); } else { result.setMin_reservation_bytes(0); - result.setInitial_reservation_total_bytes(0); + result.setInitial_reservation_total_claims(0); } return result; }
