Repository: incubator-impala
Updated Branches:
  refs/heads/master dc2f69e5a -> 985698bb6


IMPALA-4833: Compute precise per-host reservation size, pt2

Addresses some comments from the CR:
https://gerrit.cloudera.org/#/c/7630/8

Notably changes the name of initial_reservation_total_bytes
to initial_reservation_total_claims and attempts to clarify
comments.

Change-Id: I1391d99ca15d2ebcaabd564fbe1242806be09f72
Reviewed-on: http://gerrit.cloudera.org:8080/7681
Reviewed-by: Dan Hecht <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/985698bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/985698bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/985698bb

Branch: refs/heads/master
Commit: 985698bb6f69a61bc2bd88b8fb7e89d476d970a5
Parents: dc2f69e
Author: Matthew Jacobs <[email protected]>
Authored: Tue Aug 15 14:24:09 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Fri Aug 18 19:34:35 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator-backend-state.cc        |  4 ++--
 be/src/runtime/query-state.cc                      |  2 +-
 be/src/scheduling/query-schedule.h                 | 15 ++++++++++-----
 be/src/scheduling/scheduler.cc                     |  4 ++--
 common/thrift/ImpalaInternalService.thrift         | 17 +++++++++++------
 common/thrift/Planner.thrift                       | 12 +++++++-----
 .../org/apache/impala/planner/PlanFragment.java    | 12 ++++++------
 7 files changed, 39 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/985698bb/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc 
b/be/src/runtime/coordinator-backend-state.cc
index 515b0b3..06fafc5 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -94,8 +94,8 @@ void Coordinator::BackendState::SetRpcParams(
   rpc_params->__set_protocol_version(ImpalaInternalServiceVersion::V1);
   rpc_params->__set_coord_state_idx(state_idx_);
   
rpc_params->__set_min_reservation_bytes(backend_exec_params_->min_reservation_bytes);
-  rpc_params->__set_initial_reservation_total_bytes(
-      backend_exec_params_->initial_reservation_total_bytes);
+  rpc_params->__set_initial_reservation_total_claims(
+      backend_exec_params_->initial_reservation_total_claims);
 
   // set fragment_ctxs and fragment_instance_ctxs
   rpc_params->__isset.fragment_ctxs = true;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/985698bb/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index c279fa0..1c9b03b 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -139,7 +139,7 @@ Status QueryState::Init(const TExecQueryFInstancesParams& 
rpc_params) {
   // to handle releasing it if a later step fails.
   initial_reservations_ = obj_pool_.Add(new InitialReservations(&obj_pool_,
       buffer_reservation_, query_mem_tracker_,
-      rpc_params.initial_reservation_total_bytes));
+      rpc_params.initial_reservation_total_claims));
   RETURN_IF_ERROR(
       initial_reservations_->Init(query_id(), 
rpc_params.min_reservation_bytes));
   DCHECK_EQ(0, initial_reservation_refcnt_.Load());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/985698bb/be/src/scheduling/query-schedule.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.h 
b/be/src/scheduling/query-schedule.h
index adb39ef..28ccd6f 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -53,13 +53,18 @@ struct BackendExecParams {
   /// FInstanceExecParams are owned by QuerySchedule::fragment_exec_params_.
   std::vector<const FInstanceExecParams*> instance_params;
 
-  // The minimum reservation size (in bytes) required for all fragments in
-  // instance_params.
+  // The minimum query-wide buffer reservation size (in bytes) required for 
this backend.
+  // This is the peak minimum reservation that may be required by the
+  // concurrently-executing operators at any point in query execution. It may 
be less
+  // than the initial reservation total claims (below) if execution of some 
operators
+  // never overlaps, which allows reuse of reservations.
   int64_t min_reservation_bytes;
 
-  // The total of initial reservations (in bytes) that will be claimed over 
the lifetime
-  // of this query for the fragments in instance_params.
-  int64_t initial_reservation_total_bytes;
+  // Total of the initial buffer reservations that we expect to be claimed on 
this
+  // backend for all fragment instances in instance_params. I.e. the sum over 
all
+  // operators in all fragment instances that execute on this backend. This is 
used for
+  // an optimization in InitialReservation. Measured in bytes.
+  int64_t initial_reservation_total_claims;
 };
 
 /// map from an impalad host address to the list of assigned fragment instance 
params.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/985698bb/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 619c89c..1c39230 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -708,8 +708,8 @@ void Scheduler::ComputeBackendExecParams(QuerySchedule* 
schedule) {
       // i.e. that this backend's peak resources is the sum of the 
per-fragment-instance
       // peak resources for the instances executing on this backend.
       be_params.min_reservation_bytes += f.fragment.min_reservation_bytes;
-      be_params.initial_reservation_total_bytes +=
-          f.fragment.initial_reservation_total_bytes;
+      be_params.initial_reservation_total_claims +=
+          f.fragment.initial_reservation_total_claims;
     }
   }
   schedule->set_per_backend_exec_params(per_backend_params);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/985698bb/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift 
b/common/thrift/ImpalaInternalService.thrift
index 7328cc2..b094d3e 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -480,14 +480,19 @@ struct TExecQueryFInstancesParams {
   // required in V1
   5: optional list<TPlanFragmentInstanceCtx> fragment_instance_ctxs
 
-  // The minimum reservation size (in bytes) required for all fragments in 
fragment_ctxs.
-  // required in V1
+  // The minimum query-wide buffer reservation size (in bytes) required for 
the backend
+  // executing the instances in fragment_instance_ctxs. This is the peak 
minimum
+  // reservation that may be required by the concurrently-executing operators 
at any
+  // point in query execution. It may be less than the initial reservation 
total claims
+  // (below) if execution of some operators never overlaps, which allows reuse 
of
+  // reservations. required in V1
   6: optional i64 min_reservation_bytes
 
-  // The total of initial reservations (in bytes) that will be claimed over 
the lifetime
-  // of this query for the fragments in fragment_ctxs.
-  // required in V1
-  7: optional i64 initial_reservation_total_bytes
+  // Total of the initial buffer reservations that we expect to be claimed on 
this
+  // backend for all fragment instances in fragment_instance_ctxs. I.e. the 
sum over all
+  // operators in all fragment instances that execute on this backend. This is 
used for
+  // an optimization in InitialReservation. Measured in bytes. required in V1
+  7: optional i64 initial_reservation_total_claims
 }
 
 struct TExecQueryFInstancesResult {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/985698bb/common/thrift/Planner.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Planner.thrift b/common/thrift/Planner.thrift
index 1634dd4..9c1c8e7 100644
--- a/common/thrift/Planner.thrift
+++ b/common/thrift/Planner.thrift
@@ -61,13 +61,15 @@ struct TPlanFragment {
   // output, which is specified by output_sink.output_partitioning.
   6: required Partitions.TDataPartition partition
 
-  // The minimum reservation size (in bytes) required for this plan fragment 
to execute
-  // on a single host.
+  // The minimum reservation size (in bytes) required for an instance of this 
plan
+  // fragment to execute on a single host.
   7: optional i64 min_reservation_bytes
 
-  // The total of initial reservations (in bytes) that will be claimed over 
the lifetime
-  // of this fragment.
-  8: optional i64 initial_reservation_total_bytes
+  // Total of the initial buffer reservations that we expect to be claimed by 
this
+  // fragment. I.e. the sum of the min reservations over all operators 
(including the
+  // sink) in a single instance of this fragment. This is used for an 
optimization in
+  // InitialReservation. Measured in bytes. required in V1
+  8: optional i64 initial_reservation_total_claims
 }
 
 // location information for a single scan range

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/985698bb/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java 
b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
index 2f6f7de..ab72863 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
@@ -112,7 +112,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
 
   // The total of initial reservations (in bytes) that will be claimed over 
the lifetime
   // of this fragment. Computed in computeResourceProfile().
-  private long initialReservationTotalBytes_ = -1;
+  private long initialReservationTotalClaims_ = -1;
 
   /**
    * C'tor for fragment with specific partition; the output is by default 
broadcast.
@@ -238,9 +238,9 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     resourceProfile_ =
         planTreeProfile.duringOpenProfile.max(fInstancePostOpenProfile);
 
-    initialReservationTotalBytes_ = 
sink_.getResourceProfile().getMinReservationBytes();
+    initialReservationTotalClaims_ = 
sink_.getResourceProfile().getMinReservationBytes();
     for (PlanNode node: collectPlanNodes()) {
-      initialReservationTotalBytes_ +=
+      initialReservationTotalClaims_ +=
           node.getNodeResourceProfile().getMinReservationBytes();
     }
   }
@@ -313,12 +313,12 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     if (sink_ != null) result.setOutput_sink(sink_.toThrift());
     result.setPartition(dataPartition_.toThrift());
     if (resourceProfile_.isValid()) {
-      Preconditions.checkArgument(initialReservationTotalBytes_ > -1);
+      Preconditions.checkArgument(initialReservationTotalClaims_ > -1);
       
result.setMin_reservation_bytes(resourceProfile_.getMinReservationBytes());
-      result.setInitial_reservation_total_bytes(initialReservationTotalBytes_);
+      
result.setInitial_reservation_total_claims(initialReservationTotalClaims_);
     } else {
       result.setMin_reservation_bytes(0);
-      result.setInitial_reservation_total_bytes(0);
+      result.setInitial_reservation_total_claims(0);
     }
     return result;
   }

Reply via email to