IMPALA-3627: Clean up RPC structures in ImpalaInternalService

This change is a pre-requisite for IMPALA-2550.

Change-Id: I0659c94f6b80bd7bbe0bd150ce243f9efa9a41ad
TODO: Write commit message
Reviewed-on: http://gerrit.cloudera.org:8080/3202
Reviewed-by: Lars Volker <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5be7c68e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5be7c68e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5be7c68e

Branch: refs/heads/master
Commit: 5be7c68ed86bee90bec1e9d64ec8829c38d2026d
Parents: 32c40f9
Author: Lars Volker <[email protected]>
Authored: Tue May 24 13:11:18 2016 +0200
Committer: Tim Armstrong <[email protected]>
Committed: Tue May 31 23:32:12 2016 -0700

----------------------------------------------------------------------
 be/src/exec/data-sink.cc                   |   6 +-
 be/src/exec/data-sink.h                    |   7 +-
 be/src/exec/union-node.cc                  |   2 +-
 be/src/runtime/coordinator.cc              | 111 +++++++++++++-----------
 be/src/runtime/coordinator.h               |  13 +--
 be/src/runtime/plan-fragment-executor.cc   |  75 ++++++++--------
 be/src/runtime/plan-fragment-executor.h    |   3 +-
 be/src/runtime/runtime-state.cc            |  12 +--
 be/src/runtime/runtime-state.h             |  12 +--
 be/src/runtime/test-env.cc                 |   4 +-
 be/src/scheduling/query-schedule.h         |   2 +-
 be/src/service/fragment-exec-state.cc      |   4 +-
 be/src/service/fragment-exec-state.h       |  11 +--
 be/src/service/fragment-mgr.cc             |  10 +--
 be/src/service/impala-server.cc            |   4 +-
 common/thrift/ImpalaInternalService.thrift |  95 ++++++++++----------
 common/thrift/PlanNodes.thrift             |   2 +-
 17 files changed, 193 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index dc3a17c..9440368 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -36,7 +36,7 @@ namespace impala {
 
 Status DataSink::CreateDataSink(ObjectPool* pool,
     const TDataSink& thrift_sink, const vector<TExpr>& output_exprs,
-    const TPlanFragmentExecParams& params,
+    const TPlanFragmentInstanceCtx& fragment_instance_ctx,
     const RowDescriptor& row_desc, scoped_ptr<DataSink>* sink) {
   DataSink* tmp_sink = NULL;
   switch (thrift_sink.type) {
@@ -47,8 +47,8 @@ Status DataSink::CreateDataSink(ObjectPool* pool,
 
       // TODO: figure out good buffer size based on size of output row
       tmp_sink = new DataStreamSender(pool,
-          params.sender_id, row_desc, thrift_sink.stream_sink,
-          params.destinations, 16 * 1024);
+          fragment_instance_ctx.sender_id, row_desc, thrift_sink.stream_sink,
+          fragment_instance_ctx.destinations, 16 * 1024);
       sink->reset(tmp_sink);
       break;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/exec/data-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h
index 8935727..9c9b708 100644
--- a/be/src/exec/data-sink.h
+++ b/be/src/exec/data-sink.h
@@ -32,7 +32,7 @@ class RuntimeProfile;
 class RuntimeState;
 class TPlanExecRequest;
 class TPlanExecParams;
-class TPlanFragmentExecParams;
+class TPlanFragmentInstanceCtx;
 class RowDescriptor;
 
 /// Superclass of all data sinks.
@@ -66,7 +66,7 @@ class DataSink {
   /// new sink is written to *sink, and is owned by the caller.
   static Status CreateDataSink(ObjectPool* pool,
     const TDataSink& thrift_sink, const std::vector<TExpr>& output_exprs,
-    const TPlanFragmentExecParams& params,
+    const TPlanFragmentInstanceCtx& fragment_instance_ctx,
     const RowDescriptor& row_desc, boost::scoped_ptr<DataSink>* sink);
 
   /// Returns the runtime profile for the sink.
@@ -77,7 +77,8 @@ class DataSink {
   static void MergeInsertStats(const TInsertStats& src_stats,
       TInsertStats* dst_stats);
 
-  /// Outputs the insert stats contained in the map of insert partition 
updates to a string
+  /// Outputs the insert stats contained in the map of insert partition 
updates to a
+  /// string
   static std::string OutputInsertStats(const PartitionStatusMap& stats,
       const std::string& prefix = "");
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/exec/union-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/union-node.cc b/be/src/exec/union-node.cc
index 42d9b97..2e4cbf5 100644
--- a/be/src/exec/union-node.cc
+++ b/be/src/exec/union-node.cc
@@ -168,7 +168,7 @@ Status UnionNode::GetNext(RuntimeState* state, RowBatch* 
row_batch, bool* eos) {
   // Evaluate and materialize the const expr lists exactly once.
   while (const_result_expr_idx_ < const_result_expr_ctx_lists_.size()) {
     // Only evaluate the const expr lists by the first fragment instance.
-    if (state->fragment_ctx().per_fragment_instance_idx == 0) {
+    if (state->fragment_ctx().fragment_instance_idx == 0) {
       // Materialize expr results into row_batch.
       RETURN_IF_ERROR(EvalAndMaterializeExprs(
           const_result_expr_ctx_lists_[const_result_expr_idx_], true, &tuple,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 7cb6c97..6f7368c 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -95,24 +95,24 @@ namespace impala {
 // Maximum number of fragment instances that can publish each broadcast filter.
 static const int MAX_BROADCAST_FILTER_PRODUCERS = 3;
 
-// container for debug options in TPlanFragmentExecParams (debug_node, 
debug_action,
+// container for debug options in TPlanFragmentInstanceCtx (debug_node, 
debug_action,
 // debug_phase)
 struct DebugOptions {
-  int fragment_instance_idx;
+  int instance_state_idx;
   int node_id;
   TDebugAction::type action;
   TExecNodePhase::type phase;  // INVALID: debug options invalid
 
   DebugOptions()
-    : fragment_instance_idx(-1), node_id(-1), action(TDebugAction::WAIT),
+    : instance_state_idx(-1), node_id(-1), action(TDebugAction::WAIT),
       phase(TExecNodePhase::INVALID) {}
 
   // If these debug options apply to the candidate fragment instance, returns 
true
   // otherwise returns false.
   bool IsApplicable(int candidate_fragment_instance_idx) {
     if (phase == TExecNodePhase::INVALID) return false;
-    return (fragment_instance_idx == -1 ||
-        fragment_instance_idx == candidate_fragment_instance_idx);
+    return (instance_state_idx == -1 ||
+        instance_state_idx == candidate_fragment_instance_idx);
   }
 };
 
@@ -360,12 +360,12 @@ static void ProcessQueryOptions(
   split(components, query_options.debug_action, is_any_of(":"), 
token_compress_on);
   if (components.size() < 3 || components.size() > 4) return;
   if (components.size() == 3) {
-    debug_options->fragment_instance_idx = -1;
+    debug_options->instance_state_idx = -1;
     debug_options->node_id = atoi(components[0].c_str());
     debug_options->phase = GetExecNodePhase(components[1]);
     debug_options->action = GetDebugAction(components[2]);
   } else {
-    debug_options->fragment_instance_idx = atoi(components[0].c_str());
+    debug_options->instance_state_idx = atoi(components[0].c_str());
     debug_options->node_id = atoi(components[1].c_str());
     debug_options->phase = GetExecNodePhase(components[2]);
     debug_options->action = GetDebugAction(components[3]);
@@ -552,7 +552,7 @@ Status Coordinator::StartRemoteFragments(QuerySchedule* 
schedule) {
   query_events_->MarkEvent(
       Substitute("Ready to start $0 remote fragments", 
num_fragment_instances));
 
-  int fragment_instance_idx = 0;
+  int instance_state_idx = 0;
   bool has_coordinator_fragment =
       request.fragments[0].partition.type == TPartitionType::UNPARTITIONED;
   int first_remote_fragment_idx = has_coordinator_fragment ? 1 : 0;
@@ -565,13 +565,13 @@ Status Coordinator::StartRemoteFragments(QuerySchedule* 
schedule) {
       int num_hosts = params->hosts.size();
       DCHECK_GT(num_hosts, 0);
       UpdateFilterRoutingTable(request.fragments[fragment_idx].plan.nodes, 
num_hosts,
-          fragment_instance_idx);
-      fragment_instance_idx += num_hosts;
+          instance_state_idx);
+      instance_state_idx += num_hosts;
     }
     MarkFilterRoutingTableComplete();
   }
 
-  fragment_instance_idx = 0;
+  instance_state_idx = 0;
   // Start one fragment instance per fragment per host (number of hosts 
running each
   // fragment may not be constant).
   for (int fragment_idx = first_remote_fragment_idx;
@@ -584,24 +584,24 @@ Status Coordinator::StartRemoteFragments(QuerySchedule* 
schedule) {
     // schedule. Each fragment instance is assigned a unique ID, numbered from 
0, with
     // instances for fragment ID 0 being assigned IDs [0 .. 
num_hosts(fragment_id_0)] and
     // so on.
-    for (int per_fragment_instance_idx = 0; per_fragment_instance_idx < 
num_hosts;
-         ++per_fragment_instance_idx) {
+    for (int fragment_instance_idx = 0; fragment_instance_idx < num_hosts;
+         ++fragment_instance_idx) {
       DebugOptions* fragment_instance_debug_options =
-          debug_options.IsApplicable(fragment_instance_idx) ? &debug_options : 
NULL;
+          debug_options.IsApplicable(instance_state_idx) ? &debug_options : 
NULL;
       exec_env_->fragment_exec_thread_pool()->Offer(
           bind<void>(mem_fn(&Coordinator::ExecRemoteFragment), this,
               params, // fragment_exec_params
               &request.fragments[fragment_idx], // plan_fragment,
               fragment_instance_debug_options,
               schedule,
-              fragment_instance_idx++,
+              instance_state_idx++,
               fragment_idx,
-              per_fragment_instance_idx));
+              fragment_instance_idx));
     }
   }
   exec_complete_barrier_->Wait();
   query_events_->MarkEvent(
-      Substitute("All $0 remote fragments started", fragment_instance_idx));
+      Substitute("All $0 remote fragments started", instance_state_idx));
 
   Status status = Status::OK();
   const TMetricDef& def =
@@ -1360,23 +1360,24 @@ int64_t Coordinator::ComputeTotalScanRangesComplete(int 
node_id) {
 
 void Coordinator::ExecRemoteFragment(const FragmentExecParams* 
fragment_exec_params,
     const TPlanFragment* plan_fragment, DebugOptions* debug_options,
-    QuerySchedule* schedule, int fragment_instance_idx, int fragment_idx,
-    int per_fragment_instance_idx) {
+    QuerySchedule* schedule, int instance_state_idx, int fragment_idx,
+    int fragment_instance_idx) {
   NotifyBarrierOnExit notifier(exec_complete_barrier_.get());
   TExecPlanFragmentParams rpc_params;
   SetExecPlanFragmentParams(*schedule, *plan_fragment, *fragment_exec_params,
-      fragment_instance_idx, fragment_idx, per_fragment_instance_idx,
+      instance_state_idx, fragment_idx, fragment_instance_idx,
       MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port), &rpc_params);
   if (debug_options != NULL) {
-    rpc_params.params.__set_debug_node_id(debug_options->node_id);
-    rpc_params.params.__set_debug_action(debug_options->action);
-    rpc_params.params.__set_debug_phase(debug_options->phase);
+    
rpc_params.fragment_instance_ctx.__set_debug_node_id(debug_options->node_id);
+    rpc_params.fragment_instance_ctx.__set_debug_action(debug_options->action);
+    rpc_params.fragment_instance_ctx.__set_debug_phase(debug_options->phase);
   }
   FragmentInstanceState* exec_state = obj_pool()->Add(
-      new FragmentInstanceState(fragment_idx, fragment_exec_params,
-          per_fragment_instance_idx, obj_pool()));
-  exec_state->ComputeTotalSplitSize(rpc_params.params.per_node_scan_ranges);
-  fragment_instance_states_[fragment_instance_idx] = exec_state;
+      new FragmentInstanceState(fragment_idx, fragment_exec_params, 
fragment_instance_idx,
+          obj_pool()));
+  exec_state->ComputeTotalSplitSize(
+      rpc_params.fragment_instance_ctx.per_node_scan_ranges);
+  fragment_instance_states_[instance_state_idx] = exec_state;
   VLOG_FILE << "making rpc: ExecPlanFragment query_id=" << query_id_
             << " instance_id=" << exec_state->fragment_instance_id()
             << " host=" << exec_state->impalad_address();
@@ -1510,13 +1511,13 @@ Status Coordinator::UpdateFragmentExecStatus(const 
TReportExecStatusParams& para
   VLOG_FILE << "UpdateFragmentExecStatus() query_id=" << query_id_
             << " status=" << params.status.status_code
             << " done=" << (params.done ? "true" : "false");
-  uint32_t fragment_instance_idx = params.fragment_instance_idx;
-  if (fragment_instance_idx >= fragment_instance_states_.size()) {
+  uint32_t instance_state_idx = params.instance_state_idx;
+  if (instance_state_idx >= fragment_instance_states_.size()) {
     return Status(TErrorCode::INTERNAL_ERROR,
         Substitute("Unknown fragment instance index $0 (max known: $1)",
-            fragment_instance_idx, fragment_instance_states_.size() - 1));
+            instance_state_idx, fragment_instance_states_.size() - 1));
   }
-  FragmentInstanceState* exec_state = 
fragment_instance_states_[fragment_instance_idx];
+  FragmentInstanceState* exec_state = 
fragment_instance_states_[instance_state_idx];
 
   const TRuntimeProfileTree& cumulative_profile = params.profile;
   Status status(params.status);
@@ -1611,7 +1612,7 @@ Status Coordinator::UpdateFragmentExecStatus(const 
TReportExecStatusParams& para
     lock_guard<mutex> l(lock_);
     exec_state->stopwatch()->Stop();
     DCHECK_GT(num_remaining_fragment_instances_, 0);
-    VLOG_QUERY << "Fragment instance " << params.fragment_instance_idx << "("
+    VLOG_QUERY << "Fragment instance " << params.instance_state_idx << "("
                << exec_state->fragment_instance_id() << ") on host "
                << exec_state->impalad_address() << " completed, "
                << num_remaining_fragment_instances_ - 1 << " remaining: 
query_id="
@@ -1857,13 +1858,18 @@ string Coordinator::GetErrorLog() {
 
 void Coordinator::SetExecPlanFragmentParams(QuerySchedule& schedule,
     const TPlanFragment& fragment, const FragmentExecParams& params,
-    int fragment_instance_idx, int fragment_idx, int per_fragment_instance_idx,
+    int instance_state_idx, int fragment_idx, int fragment_instance_idx,
     const TNetworkAddress& coord, TExecPlanFragmentParams* rpc_params) {
   rpc_params->__set_protocol_version(ImpalaInternalServiceVersion::V1);
-  rpc_params->__set_fragment(fragment);
+  rpc_params->__set_query_ctx(query_ctx_);
+
+  TPlanFragmentCtx fragment_ctx;
+  TPlanFragmentInstanceCtx fragment_instance_ctx;
+
+  fragment_ctx.__set_fragment(fragment);
   // Remove filters that weren't selected during filter routing table 
construction.
   if (filter_mode_ != TRuntimeFilterMode::OFF) {
-    for (TPlanNode& plan_node: rpc_params->fragment.plan.nodes) {
+    for (TPlanNode& plan_node: rpc_params->fragment_ctx.fragment.plan.nodes) {
       if (plan_node.__isset.runtime_filters) {
         vector<TRuntimeFilterDesc> required_filters;
         for (const TRuntimeFilterDesc& desc: plan_node.runtime_filters) {
@@ -1872,7 +1878,7 @@ void 
Coordinator::SetExecPlanFragmentParams(QuerySchedule& schedule,
           if (filter_it == filter_routing_table_.end()) continue;
           FilterState* f = &filter_it->second;
           if (plan_node.__isset.hash_join_node) {
-            if (f->src_fragment_instance_idxs.find(fragment_instance_idx) ==
+            if (f->src_fragment_instance_idxs.find(instance_state_idx) ==
                 f->src_fragment_instance_idxs.end()) {
               DCHECK(desc.is_broadcast_join);
               continue;
@@ -1889,7 +1895,7 @@ void 
Coordinator::SetExecPlanFragmentParams(QuerySchedule& schedule,
   }
   SetExecPlanDescriptorTable(fragment, rpc_params);
 
-  TNetworkAddress exec_host = params.hosts[per_fragment_instance_idx];
+  TNetworkAddress exec_host = params.hosts[fragment_instance_idx];
   if (schedule.HasReservation()) {
     // The reservation has already have been validated at this point.
     TNetworkAddress resource_hostport;
@@ -1900,33 +1906,32 @@ void 
Coordinator::SetExecPlanFragmentParams(QuerySchedule& schedule,
     // fragment. Otherwise, don't set it (usually this the coordinator 
fragment), and it
     // won't participate in dynamic RM controls.
     if (it != schedule.reservation()->allocated_resources.end()) {
-      rpc_params->__set_reserved_resource(it->second);
-      rpc_params->__set_local_resource_address(resource_hostport);
+      fragment_instance_ctx.__set_reserved_resource(it->second);
+      fragment_instance_ctx.__set_local_resource_address(resource_hostport);
     }
   }
-  rpc_params->params.__set_request_pool(schedule.request_pool());
   FragmentScanRangeAssignment::const_iterator it =
       params.scan_range_assignment.find(exec_host);
   // Scan ranges may not always be set, so use an empty structure if so.
   const PerNodeScanRanges& scan_ranges =
       (it != params.scan_range_assignment.end()) ? it->second : 
PerNodeScanRanges();
 
-  rpc_params->params.__set_per_node_scan_ranges(scan_ranges);
-  rpc_params->params.__set_per_exch_num_senders(params.per_exch_num_senders);
-  rpc_params->params.__set_destinations(params.destinations);
-  rpc_params->params.__set_sender_id(params.sender_id_base + 
per_fragment_instance_idx);
-  rpc_params->__isset.params = true;
-  rpc_params->fragment_instance_ctx.__set_query_ctx(query_ctx_);
-  rpc_params->fragment_instance_ctx.fragment_instance_id =
-      params.instance_ids[per_fragment_instance_idx];
-  rpc_params->fragment_instance_ctx.per_fragment_instance_idx = 
per_fragment_instance_idx;
-  rpc_params->fragment_instance_ctx.num_fragment_instances = 
params.instance_ids.size();
-  rpc_params->fragment_instance_ctx.fragment_instance_idx = 
fragment_instance_idx;
-  rpc_params->__isset.fragment_instance_ctx = true;
+  fragment_ctx.num_fragment_instances = params.instance_ids.size();
+  fragment_instance_ctx.__set_request_pool(schedule.request_pool());
+  fragment_instance_ctx.__set_per_node_scan_ranges(scan_ranges);
+  
fragment_instance_ctx.__set_per_exch_num_senders(params.per_exch_num_senders);
+  fragment_instance_ctx.__set_destinations(params.destinations);
+  fragment_instance_ctx.__set_sender_id(params.sender_id_base + 
fragment_instance_idx);
+  fragment_instance_ctx.fragment_instance_id = 
params.instance_ids[fragment_instance_idx];
+  fragment_instance_ctx.fragment_instance_idx = fragment_instance_idx;
+  fragment_instance_ctx.instance_state_idx = instance_state_idx;
+  rpc_params->__set_fragment_ctx(fragment_ctx);
+  rpc_params->__set_fragment_instance_ctx(fragment_instance_ctx);
 }
 
 void Coordinator::SetExecPlanDescriptorTable(const TPlanFragment& fragment,
     TExecPlanFragmentParams* rpc_params) {
+  DCHECK(rpc_params->__isset.query_ctx);
   TDescriptorTable thrift_desc_tbl;
 
   // Always add the Tuple and Slot descriptors.
@@ -1991,7 +1996,7 @@ void Coordinator::SetExecPlanDescriptorTable(const 
TPlanFragment& fragment,
     thrift_desc_tbl.__isset.tableDescriptors = true;
   }
 
-  rpc_params->__set_desc_tbl(thrift_desc_tbl);
+  rpc_params->query_ctx.__set_desc_tbl(thrift_desc_tbl);
 }
 namespace {
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 578c61d..b4eab0e 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -425,14 +425,15 @@ class Coordinator {
   void MarkFilterRoutingTableComplete();
 
   /// Fill in rpc_params based on parameters.
-  /// 'fragment_instance_idx' is the 0-based query-wide ordinal of the 
fragment instance.
+  /// 'instance_state_idx' is the index of the fragment instance state in
+  /// fragment_instance_states_.
   /// 'fragment_idx' is the 0-based query-wide ordinal of the fragment of 
which it is an
   /// instance.
-  /// 'per_fragment_instance_idx' is the 0-based ordinal of this particular 
fragment
+  /// 'fragment_instance_idx' is the 0-based ordinal of this particular 
fragment
   /// instance within its fragment.
   void SetExecPlanFragmentParams(QuerySchedule& schedule, const TPlanFragment& 
fragment,
-      const FragmentExecParams& params, int fragment_instance_idx, int 
fragment_idx,
-      int per_fragment_instance_idx, const TNetworkAddress& coord,
+      const FragmentExecParams& params, int instance_state_idx, int 
fragment_idx,
+      int fragment_instance_idx, const TNetworkAddress& coord,
       TExecPlanFragmentParams* rpc_params);
 
   /// Wrapper for ExecPlanFragment() RPC. This function will be called in 
parallel from
@@ -440,8 +441,8 @@ class Coordinator {
   /// fragment_instance_states_, then calls RPC to issue fragment on remote 
impalad.
   void ExecRemoteFragment(const FragmentExecParams* fragment_exec_params,
       const TPlanFragment* plan_fragment, DebugOptions* debug_options,
-      QuerySchedule* schedule, int fragment_instance_idx, int fragment_idx,
-      int per_fragment_instance_idx);
+      QuerySchedule* schedule, int instance_state_idx, int fragment_idx,
+      int fragment_instance_idx);
 
   /// Determine fragment number, given fragment id.
   int GetFragmentNum(const TUniqueId& fragment_id);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc 
b/be/src/runtime/plan-fragment-executor.cc
index 75c5f58..ef39206 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -88,20 +88,23 @@ Status PlanFragmentExecutor::Prepare(const 
TExecPlanFragmentParams& request) {
   is_prepared_ = true;
   // TODO: Break this method up.
   fragment_sw_.Start();
-  const TPlanFragmentExecParams& params = request.params;
-  query_id_ = request.fragment_instance_ctx.query_ctx.query_id;
+  const TPlanFragmentInstanceCtx& fragment_instance_ctx = 
request.fragment_instance_ctx;
+  query_id_ = request.query_ctx.query_id;
 
   VLOG_QUERY << "Prepare(): query_id=" << PrintId(query_id_) << " instance_id="
              << PrintId(request.fragment_instance_ctx.fragment_instance_id);
-  VLOG(2) << "params:\n" << ThriftDebugString(params);
+  VLOG(2) << "fragment_instance_ctx:\n" << 
ThriftDebugString(fragment_instance_ctx);
 
-  if (request.__isset.reserved_resource) {
+  DCHECK(request.__isset.fragment_ctx);
+  bool request_has_reserved_resource =
+      request.fragment_instance_ctx.__isset.reserved_resource;
+  if (request_has_reserved_resource) {
     VLOG_QUERY << "Executing fragment in reserved resource:\n"
-               << request.reserved_resource;
+               << request.fragment_instance_ctx.reserved_resource;
   }
 
   string cgroup = "";
-  if (FLAGS_enable_rm && request.__isset.reserved_resource) {
+  if (FLAGS_enable_rm && request_has_reserved_resource) {
     cgroup = exec_env_->cgroups_mgr()->UniqueIdToCgroup(PrintId(query_id_, 
"_"));
   }
 
@@ -114,34 +117,34 @@ Status PlanFragmentExecutor::Prepare(const 
TExecPlanFragmentParams& request) {
   SCOPED_TIMER(profile()->total_time_counter());
 
   // Register after setting runtime_state_ to ensure proper cleanup.
-  if (FLAGS_enable_rm && !cgroup.empty() && request.__isset.reserved_resource) 
{
+  if (FLAGS_enable_rm && !cgroup.empty() && request_has_reserved_resource) {
     bool is_first;
     RETURN_IF_ERROR(exec_env_->cgroups_mgr()->RegisterFragment(
         request.fragment_instance_ctx.fragment_instance_id, cgroup, 
&is_first));
     // The first fragment using cgroup sets the cgroup's CPU shares based on 
the reserved
     // resource.
     if (is_first) {
-      DCHECK(request.__isset.reserved_resource);
+      DCHECK(request_has_reserved_resource);
       int32_t cpu_shares = exec_env_->cgroups_mgr()->VirtualCoresToCpuShares(
-          request.reserved_resource.v_cpu_cores);
+          request.fragment_instance_ctx.reserved_resource.v_cpu_cores);
       RETURN_IF_ERROR(exec_env_->cgroups_mgr()->SetCpuShares(cgroup, 
cpu_shares));
     }
   }
 
   // TODO: Find the reservation id when the resource request is not set
-  if (FLAGS_enable_rm && request.__isset.reserved_resource) {
+  if (FLAGS_enable_rm && request_has_reserved_resource) {
     TUniqueId reservation_id;
-    reservation_id << request.reserved_resource.reservation_id;
+    reservation_id << 
request.fragment_instance_ctx.reserved_resource.reservation_id;
 
     // TODO: Combine this with RegisterFragment() etc.
     QueryResourceMgr* res_mgr;
     bool is_first = 
exec_env_->resource_broker()->GetQueryResourceMgr(query_id_,
-        reservation_id, request.local_resource_address, &res_mgr);
+        reservation_id, request.fragment_instance_ctx.local_resource_address, 
&res_mgr);
     DCHECK(res_mgr != NULL);
     runtime_state_->SetQueryResourceMgr(res_mgr);
     if (is_first) {
       runtime_state_->query_resource_mgr()->InitVcoreAcquisition(
-          request.reserved_resource.v_cpu_cores);
+          request.fragment_instance_ctx.reserved_resource.v_cpu_cores);
     }
   }
 
@@ -155,9 +158,11 @@ Status PlanFragmentExecutor::Prepare(const 
TExecPlanFragmentParams& request) {
   }
 
   int64_t rm_reservation_size_bytes = -1;
-  if (request.__isset.reserved_resource && request.reserved_resource.memory_mb 
> 0) {
-    rm_reservation_size_bytes =
-        static_cast<int64_t>(request.reserved_resource.memory_mb) * 1024L * 
1024L;
+  if (request_has_reserved_resource &&
+      request.fragment_instance_ctx.reserved_resource.memory_mb > 0) {
+    int64_t rm_reservation_size_mb =
+      
static_cast<int64_t>(request.fragment_instance_ctx.reserved_resource.memory_mb);
+    rm_reservation_size_bytes = rm_reservation_size_mb * 1024L * 1024L;
     // Queries that use more than the hard limit will be killed, so it's not 
useful to
     // have a reservation larger than the hard limit. Clamp reservation bytes 
limit to the
     // hard limit (if it exists).
@@ -171,8 +176,8 @@ Status PlanFragmentExecutor::Prepare(const 
TExecPlanFragmentParams& request) {
                << PrettyPrinter::Print(rm_reservation_size_bytes, 
TUnit::BYTES);
   }
 
-  DCHECK(!params.request_pool.empty());
-  runtime_state_->InitMemTrackers(query_id_, &params.request_pool,
+  DCHECK(!fragment_instance_ctx.request_pool.empty());
+  runtime_state_->InitMemTrackers(query_id_, 
&fragment_instance_ctx.request_pool,
       bytes_limit, rm_reservation_size_bytes);
   RETURN_IF_ERROR(runtime_state_->CreateBlockMgr());
 
@@ -197,25 +202,26 @@ Status PlanFragmentExecutor::Prepare(const 
TExecPlanFragmentParams& request) {
 
   // set up desc tbl
   DescriptorTbl* desc_tbl = NULL;
-  DCHECK(request.__isset.desc_tbl);
+  DCHECK(request.__isset.query_ctx);
+  DCHECK(request.query_ctx.__isset.desc_tbl);
   RETURN_IF_ERROR(
-      DescriptorTbl::Create(obj_pool(), request.desc_tbl, &desc_tbl));
+      DescriptorTbl::Create(obj_pool(), request.query_ctx.desc_tbl, 
&desc_tbl));
   runtime_state_->set_desc_tbl(desc_tbl);
   VLOG_QUERY << "descriptor table for fragment="
              << request.fragment_instance_ctx.fragment_instance_id
              << "\n" << desc_tbl->DebugString();
 
   // set up plan
-  DCHECK(request.__isset.fragment);
-  RETURN_IF_ERROR(ExecNode::CreateTree(runtime_state_.get(), 
request.fragment.plan,
-      *desc_tbl, &plan_));
+  DCHECK(request.__isset.fragment_ctx);
+  RETURN_IF_ERROR(ExecNode::CreateTree(runtime_state_.get(),
+      request.fragment_ctx.fragment.plan, *desc_tbl, &plan_));
   runtime_state_->set_fragment_root_id(plan_->id());
 
-  if (request.params.__isset.debug_node_id) {
-    DCHECK(request.params.__isset.debug_action);
-    DCHECK(request.params.__isset.debug_phase);
-    ExecNode::SetDebugOptions(request.params.debug_node_id,
-        request.params.debug_phase, request.params.debug_action, plan_);
+  if (fragment_instance_ctx.__isset.debug_node_id) {
+    DCHECK(fragment_instance_ctx.__isset.debug_action);
+    DCHECK(fragment_instance_ctx.__isset.debug_phase);
+    ExecNode::SetDebugOptions(fragment_instance_ctx.debug_node_id,
+        fragment_instance_ctx.debug_phase, fragment_instance_ctx.debug_action, 
plan_);
   }
 
   // set #senders of exchange nodes before calling Prepare()
@@ -224,7 +230,7 @@ Status PlanFragmentExecutor::Prepare(const 
TExecPlanFragmentParams& request) {
   for (ExecNode* exch_node: exch_nodes)
   {
     DCHECK_EQ(exch_node->type(), TPlanNodeType::EXCHANGE_NODE);
-    int num_senders = FindWithDefault(params.per_exch_num_senders,
+    int num_senders = 
FindWithDefault(fragment_instance_ctx.per_exch_num_senders,
         exch_node->id(), 0);
     DCHECK_GT(num_senders, 0);
     static_cast<ExchangeNode*>(exch_node)->set_num_senders(num_senders);
@@ -237,7 +243,7 @@ Status PlanFragmentExecutor::Prepare(const 
TExecPlanFragmentParams& request) {
   for (int i = 0; i < scan_nodes.size(); ++i) {
     ScanNode* scan_node = static_cast<ScanNode*>(scan_nodes[i]);
     const vector<TScanRangeParams>& scan_ranges = FindWithDefault(
-        params.per_node_scan_ranges, scan_node->id(), no_scan_ranges);
+        fragment_instance_ctx.per_node_scan_ranges, scan_node->id(), 
no_scan_ranges);
     scan_node->SetScanRanges(scan_ranges);
   }
 
@@ -247,13 +253,14 @@ Status PlanFragmentExecutor::Prepare(const 
TExecPlanFragmentParams& request) {
     RETURN_IF_ERROR(plan_->Prepare(runtime_state_.get()));
   }
 
-  PrintVolumeIds(params.per_node_scan_ranges);
+  PrintVolumeIds(fragment_instance_ctx.per_node_scan_ranges);
 
   // set up sink, if required
-  if (request.fragment.__isset.output_sink) {
+  if (request.fragment_ctx.fragment.__isset.output_sink) {
     RETURN_IF_ERROR(DataSink::CreateDataSink(
-        obj_pool(), request.fragment.output_sink, 
request.fragment.output_exprs,
-        params, row_desc(), &sink_));
+        obj_pool(), request.fragment_ctx.fragment.output_sink,
+        request.fragment_ctx.fragment.output_exprs,
+        fragment_instance_ctx, row_desc(), &sink_));
     RETURN_IF_ERROR(sink_->Prepare(runtime_state()));
 
     RuntimeProfile* sink_profile = sink_->profile();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/runtime/plan-fragment-executor.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.h 
b/be/src/runtime/plan-fragment-executor.h
index 29250a3..082442e 100644
--- a/be/src/runtime/plan-fragment-executor.h
+++ b/be/src/runtime/plan-fragment-executor.h
@@ -40,7 +40,6 @@ class RuntimeState;
 class TRowBatch;
 class TPlanExecRequest;
 class TPlanFragment;
-class TPlanFragmentExecParams;
 class TPlanExecParams;
 
 /// PlanFragmentExecutor handles all aspects of the execution of a single plan 
fragment,
@@ -242,7 +241,7 @@ class PlanFragmentExecutor {
 
   ObjectPool* obj_pool() { return runtime_state_->obj_pool(); }
 
-  /// typedef for TPlanFragmentExecParams.per_node_scan_ranges
+  /// typedef for TPlanFragmentInstanceCtx.per_node_scan_ranges
   typedef std::map<TPlanNodeId, std::vector<TScanRangeParams> > 
PerNodeScanRanges;
 
   /// Main loop of profile reporting thread.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index c970163..3701907 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -68,8 +68,8 @@ RuntimeState::RuntimeState(const TExecPlanFragmentParams& 
fragment_params,
     const string& cgroup, ExecEnv* exec_env)
   : obj_pool_(new ObjectPool()),
     fragment_params_(fragment_params),
-    now_(new TimestampValue(fragment_ctx().query_ctx.now_string.c_str(),
-        fragment_ctx().query_ctx.now_string.size())),
+    now_(new TimestampValue(query_ctx().now_string.c_str(),
+        query_ctx().now_string.size())),
     cgroup_(cgroup),
     codegen_expr_(false),
     profile_(obj_pool_.get(),
@@ -77,7 +77,7 @@ RuntimeState::RuntimeState(const TExecPlanFragmentParams& 
fragment_params,
     is_cancelled_(false),
     query_resource_mgr_(NULL),
     root_node_id_(-1),
-    filter_bank_(new RuntimeFilterBank(fragment_ctx().query_ctx, this)) {
+    filter_bank_(new RuntimeFilterBank(query_ctx(), this)) {
   Status status = Init(exec_env);
   DCHECK(status.ok()) << status.GetDetail();
 }
@@ -93,8 +93,8 @@ RuntimeState::RuntimeState(const TQueryCtx& query_ctx)
     query_resource_mgr_(NULL),
     root_node_id_(-1),
     filter_bank_(new RuntimeFilterBank(query_ctx, this)) {
-  fragment_params_.fragment_instance_ctx.__set_query_ctx(query_ctx);
-  fragment_params_.fragment_instance_ctx.query_ctx.request.query_options
+  fragment_params_.__set_query_ctx(query_ctx);
+  fragment_params_.query_ctx.request.query_options
       .__set_batch_size(DEFAULT_BATCH_SIZE);
 }
 
@@ -119,7 +119,7 @@ Status RuntimeState::Init(ExecEnv* exec_env) {
   SCOPED_TIMER(profile_.total_time_counter());
   exec_env_ = exec_env;
   TQueryOptions& query_options =
-      fragment_params_.fragment_instance_ctx.query_ctx.request.query_options;
+      fragment_params_.query_ctx.request.query_options;
 
   // max_errors does not indicate how many errors in total have been recorded, 
but rather
   // how many are distinct. It is defined as the sum of the number of generic 
errors and

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 15c8d9c..22cecaf 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -100,7 +100,7 @@ class RuntimeState {
   bool abort_on_default_limit_exceeded() const {
     return query_ctx().request.query_options.abort_on_default_limit_exceeded;
   }
-  const TQueryCtx& query_ctx() const { return fragment_ctx().query_ctx; }
+  const TQueryCtx& query_ctx() const { return fragment_params_.query_ctx; }
   const TPlanFragmentInstanceCtx& fragment_ctx() const {
     return fragment_params_.fragment_instance_ctx;
   }
@@ -243,9 +243,9 @@ class RuntimeState {
   Status SetMemLimitExceeded(MemTracker* tracker = NULL,
       int64_t failed_allocation_size = 0, const ErrorMsg* msg = NULL);
 
-  /// Returns a non-OK status if query execution should stop (e.g., the query 
was cancelled
-  /// or a mem limit was exceeded). Exec nodes should check this periodically 
so execution
-  /// doesn't continue if the query terminates abnormally.
+  /// Returns a non-OK status if query execution should stop (e.g., the query 
was
+  /// cancelled or a mem limit was exceeded). Exec nodes should check this 
periodically so
+  /// execution doesn't continue if the query terminates abnormally.
   Status CheckQueryState();
 
   QueryResourceMgr* query_resource_mgr() const { return query_resource_mgr_; }
@@ -337,8 +337,8 @@ class RuntimeState {
   SpinLock query_status_lock_;
   Status query_status_;
 
-  /// Query-wide resource manager for resource expansion etc. Not owned by us; 
owned by the
-  /// ResourceBroker instead.
+  /// Query-wide resource manager for resource expansion etc. Not owned by us; 
owned by
+  /// the ResourceBroker instead.
   QueryResourceMgr* query_resource_mgr_;
 
   /// Reader contexts that need to be closed when the fragment is closed.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 39fc7cd..223ecad 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -61,8 +61,8 @@ TestEnv::~TestEnv() {
 
 RuntimeState* TestEnv::CreateRuntimeState(int64_t query_id) {
   TExecPlanFragmentParams plan_params = TExecPlanFragmentParams();
-  plan_params.fragment_instance_ctx.query_ctx.query_id.hi = 0;
-  plan_params.fragment_instance_ctx.query_ctx.query_id.lo = query_id;
+  plan_params.query_ctx.query_id.hi = 0;
+  plan_params.query_ctx.query_id.lo = query_id;
   return new RuntimeState(plan_params, "", exec_env_.get());
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/scheduling/query-schedule.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.h 
b/be/src/scheduling/query-schedule.h
index 5c891a8..20f7dfe 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -42,7 +42,7 @@ typedef boost::unordered_map<TNetworkAddress, 
PerNodeScanRanges>
     FragmentScanRangeAssignment;
 
 /// execution parameters for a single fragment; used to assemble the
-/// per-fragment instance TPlanFragmentExecParams;
+/// TPlanFragmentInstanceCtx;
 /// hosts.size() == instance_ids.size()
 struct FragmentExecParams {
   std::vector<TNetworkAddress> hosts; // execution backends

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/service/fragment-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-exec-state.cc 
b/be/src/service/fragment-exec-state.cc
index 38866f1..e80cd78 100644
--- a/be/src/service/fragment-exec-state.cc
+++ b/be/src/service/fragment-exec-state.cc
@@ -77,8 +77,8 @@ void FragmentMgr::FragmentExecState::ReportStatusCb(
 
   TReportExecStatusParams params;
   params.protocol_version = ImpalaInternalServiceVersion::V1;
-  params.__set_query_id(fragment_instance_ctx_.query_ctx.query_id);
-  
params.__set_fragment_instance_idx(fragment_instance_ctx_.fragment_instance_idx);
+  params.__set_query_id(query_ctx_.query_id);
+  params.__set_instance_state_idx( fragment_instance_ctx_.instance_state_idx);
   
params.__set_fragment_instance_id(fragment_instance_ctx_.fragment_instance_id);
   exec_status.SetTStatus(&params);
   params.__set_done(done);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/service/fragment-exec-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-exec-state.h 
b/be/src/service/fragment-exec-state.h
index c940386..8c36198 100644
--- a/be/src/service/fragment-exec-state.h
+++ b/be/src/service/fragment-exec-state.h
@@ -29,7 +29,7 @@ namespace impala {
 class FragmentMgr::FragmentExecState {
  public:
   FragmentExecState(const TExecPlanFragmentParams& params, ExecEnv* exec_env)
-    : fragment_instance_ctx_(params.fragment_instance_ctx),
+    : query_ctx_(params.query_ctx), 
fragment_instance_ctx_(params.fragment_instance_ctx),
       executor_(exec_env, boost::bind<void>(
           boost::mem_fn(&FragmentMgr::FragmentExecState::ReportStatusCb),
               this, _1, _2, _3)),
@@ -50,17 +50,13 @@ class FragmentMgr::FragmentExecState {
   /// Main loop of plan fragment execution. Blocks until execution finishes.
   void Exec();
 
-  const TUniqueId& query_id() const {
-    return fragment_instance_ctx_.query_ctx.query_id;
-  }
+  const TUniqueId& query_id() const { return query_ctx_.query_id; }
 
   const TUniqueId& fragment_instance_id() const {
     return fragment_instance_ctx_.fragment_instance_id;
   }
 
-  const TNetworkAddress& coord_address() const {
-    return fragment_instance_ctx_.query_ctx.coord_address;
-  }
+  const TNetworkAddress& coord_address() const { return 
query_ctx_.coord_address; }
 
   /// Set the execution thread, taking ownership of the object.
   void set_exec_thread(Thread* exec_thread) { exec_thread_.reset(exec_thread); 
}
@@ -69,6 +65,7 @@ class FragmentMgr::FragmentExecState {
   void PublishFilter(int32_t filter_id, const TBloomFilter& 
thrift_bloom_filter);
 
  private:
+  TQueryCtx query_ctx_;
   TPlanFragmentInstanceCtx fragment_instance_ctx_;
   PlanFragmentExecutor executor_;
   ImpalaBackendClientCache* client_cache_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/service/fragment-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-mgr.cc b/be/src/service/fragment-mgr.cc
index f64ca7e..8472832 100644
--- a/be/src/service/fragment-mgr.cc
+++ b/be/src/service/fragment-mgr.cc
@@ -36,9 +36,9 @@ DEFINE_int32(log_mem_usage_interval, 0, "If non-zero, impalad 
will output memory
 Status FragmentMgr::ExecPlanFragment(const TExecPlanFragmentParams& 
exec_params) {
   VLOG_QUERY << "ExecPlanFragment() instance_id="
              << exec_params.fragment_instance_ctx.fragment_instance_id
-             << " coord=" << 
exec_params.fragment_instance_ctx.query_ctx.coord_address
+             << " coord=" << exec_params.query_ctx.coord_address
              << " fragment instance#="
-             << exec_params.fragment_instance_ctx.fragment_instance_idx;
+             << exec_params.fragment_instance_ctx.instance_state_idx;
 
   // Preparing and opening the fragment creates a thread and consumes a 
non-trivial
   // amount of memory. If we are already starved for memory, cancel the 
fragment as
@@ -48,13 +48,13 @@ Status FragmentMgr::ExecPlanFragment(const 
TExecPlanFragmentParams& exec_params)
     string msg = Substitute("Instance $0 of plan fragment $1 of query $2 could 
not "
         "start because the backend Impala daemon is over its memory limit",
         PrintId(exec_params.fragment_instance_ctx.fragment_instance_id),
-        exec_params.fragment.display_name,
-        PrintId(exec_params.fragment_instance_ctx.query_ctx.query_id));
+        exec_params.fragment_ctx.fragment.display_name,
+        PrintId(exec_params.query_ctx.query_id));
     return process_mem_tracker->MemLimitExceeded(NULL, msg, 0);
   }
 
   // Remote fragments must always have a sink. Remove when IMPALA-2905 is 
resolved.
-  DCHECK(exec_params.fragment.__isset.output_sink);
+  DCHECK(exec_params.fragment_ctx.fragment.__isset.output_sink);
 
   shared_ptr<FragmentExecState> exec_state(
       new FragmentExecState(exec_params, ExecEnv::GetInstance()));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index e4a08ea..53e7dde 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1087,7 +1087,7 @@ Status ImpalaServer::GetSessionState(const TUniqueId& 
session_id,
 void ImpalaServer::ReportExecStatus(
     TReportExecStatusResult& return_val, const TReportExecStatusParams& 
params) {
   VLOG_FILE << "ReportExecStatus() query_id=" << params.query_id
-            << " fragment instance#=" << params.fragment_instance_idx
+            << " fragment instance#=" << params.instance_state_idx
             << " instance_id=" << params.fragment_instance_id
             << " done=" << (params.done ? "true" : "false");
   // TODO: implement something more efficient here, we're currently
@@ -1104,7 +1104,7 @@ void ImpalaServer::ReportExecStatus(
     const string& err = Substitute("ReportExecStatus(): Received report for 
unknown "
         "query ID (probably closed or cancelled). (query_id: $0, backend: $1, 
instance:"
         " $2 done: $3)", PrintId(params.query_id),
-        params.fragment_instance_idx, PrintId(params.fragment_instance_id),
+        params.instance_state_idx, PrintId(params.fragment_instance_id),
         params.done);
     return_val.status.error_msgs.push_back(err);
     // TODO: Re-enable logging when this only happens once per fragment.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift 
b/common/thrift/ImpalaInternalService.thrift
index 55dd838..cfe0080 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -284,25 +284,19 @@ struct TQueryCtx {
   // results returned from multiple scan nodes are consistent.
   // This defaults to -1 when no timestamp is specified.
   11: optional i64 snapshot_timestamp = -1;
-}
-
-// Context of a fragment instance, including its unique id, the total number
-// of fragment instances, the query context, the coordinator address, etc.
-struct TPlanFragmentInstanceCtx {
-  // context of the query this fragment instance belongs to
-  1: required TQueryCtx query_ctx
 
-  // the globally unique fragment instance id
-  2: required Types.TUniqueId fragment_instance_id
+  // Contains only the union of those descriptors referenced by list of 
fragments destined
+  // for a single host. Optional for frontend tests.
+  12: optional Descriptors.TDescriptorTable desc_tbl
+}
 
-  // ordinal of this fragment instance, range [0, num_fragment_instances)
-  3: required i32 per_fragment_instance_idx
+// Context to collect information, which is shared among all instances of that 
plan
+// fragment.
+struct TPlanFragmentCtx {
+  1: required Planner.TPlanFragment fragment
 
   // total number of instances of this fragment
-  4: required i32 num_fragment_instances
-
-  // Index of this fragment instance across all combined instances in this 
query.
-  5: required i32 fragment_instance_idx
+  2: required i32 num_fragment_instances
 }
 
 // A scan range plus the parameters needed to execute that scan.
@@ -322,35 +316,53 @@ struct TPlanFragmentDestination {
   2: required Types.TNetworkAddress server
 }
 
-// Parameters for a single execution instance of a particular TPlanFragment
+// Execution parameters of a fragment instance, including its unique id, the 
total number
+// of fragment instances, the query context, the coordinator address, etc.
 // TODO: for range partitioning, we also need to specify the range boundaries
-struct TPlanFragmentExecParams {
-  // initial scan ranges for each scan node in TPlanFragment.plan_tree
-  1: required map<Types.TPlanNodeId, list<TScanRangeParams>> 
per_node_scan_ranges
+struct TPlanFragmentInstanceCtx {
+  // the globally unique fragment instance id
+  1: required Types.TUniqueId fragment_instance_id
+
+  // Index of this fragment instance accross all instances of its parent 
fragment,
+  // range [0, TPlanFragmentCtx.num_fragment_instances).
+  2: required i32 fragment_instance_idx
+
+  // Index of this fragment instance in Coordinator::fragment_instance_states_.
+  3: required i32 instance_state_idx
+
+  // Initial scan ranges for each scan node in TPlanFragment.plan_tree
+  4: required map<Types.TPlanNodeId, list<TScanRangeParams>> 
per_node_scan_ranges
 
-  // number of senders for ExchangeNodes contained in TPlanFragment.plan_tree;
+  // Number of senders for ExchangeNodes contained in TPlanFragment.plan_tree;
   // needed to create a DataStreamRecvr
-  2: required map<Types.TPlanNodeId, i32> per_exch_num_senders
+  5: required map<Types.TPlanNodeId, i32> per_exch_num_senders
 
   // Output destinations, one per output partition.
   // The partitioning of the output is specified by
   // TPlanFragment.output_sink.output_partition.
   // The number of output partitions is destinations.size().
-  3: list<TPlanFragmentDestination> destinations
+  6: list<TPlanFragmentDestination> destinations
 
   // Debug options: perform some action in a particular phase of a particular 
node
-  4: optional Types.TPlanNodeId debug_node_id
-  5: optional PlanNodes.TExecNodePhase debug_phase
-  6: optional PlanNodes.TDebugAction debug_action
+  7: optional Types.TPlanNodeId debug_node_id
+  8: optional PlanNodes.TExecNodePhase debug_phase
+  9: optional PlanNodes.TDebugAction debug_action
 
   // The pool to which this request has been submitted. Used to update pool 
statistics
   // for admission control.
-  7: optional string request_pool
+  10: optional string request_pool
 
   // Id of this fragment in its role as a sender.
-  8: optional i32 sender_id
+  11: optional i32 sender_id
+
+  // Resource reservation to run this plan fragment in.
+  12: optional Llama.TAllocatedResource reserved_resource
+
+  // Address of local node manager (used for expanding resource allocations)
+  13: optional Types.TNetworkAddress local_resource_address
 }
 
+
 // Service Protocol Details
 
 enum ImpalaInternalServiceVersion {
@@ -363,25 +375,15 @@ enum ImpalaInternalServiceVersion {
 struct TExecPlanFragmentParams {
   1: required ImpalaInternalServiceVersion protocol_version
 
-  // required in V1
-  2: optional Planner.TPlanFragment fragment
+  // Context of the query, which this fragment is part of.
+  2: optional TQueryCtx query_ctx
 
-  // required in V1
-  // Contains only those descriptors referenced by fragment's scan nodes and 
data sink
-  3: optional Descriptors.TDescriptorTable desc_tbl
+  // Context of this fragment.
+  3: optional TPlanFragmentCtx fragment_ctx
 
-  // required in V1
-  4: optional TPlanFragmentExecParams params
-
-  // Context of this fragment, including its instance id, the total number 
fragment
-  // instances, the query context, etc.
-  5: optional TPlanFragmentInstanceCtx fragment_instance_ctx
-
-  // Resource reservation to run this plan fragment in.
-  6: optional Llama.TAllocatedResource reserved_resource
-
-  // Address of local node manager (used for expanding resource allocations)
-  7: optional Types.TNetworkAddress local_resource_address
+  // Context of this fragment instance, including its instance id, the total 
number
+  // fragment instances, the query context, etc.
+  4: optional TPlanFragmentInstanceCtx fragment_instance_ctx
 }
 
 struct TExecPlanFragmentResult {
@@ -450,9 +452,10 @@ struct TReportExecStatusParams {
   // required in V1
   2: optional Types.TUniqueId query_id
 
-  // passed into ExecPlanFragment() as TPlanFragmentInstanceCtx.backend_num
   // required in V1
-  3: optional i32 fragment_instance_idx
+  // Used to look up the fragment instance state in the coordinator, same 
value as
+  // TExecPlanFragmentParams.instance_state_idx.
+  3: optional i32 instance_state_idx
 
   // required in V1
   4: optional Types.TUniqueId fragment_instance_id

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5be7c68e/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index d095047..6db725c 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -16,7 +16,7 @@
 // This file contains all structs, enums, etc., that together make up
 // a plan tree. All information recorded in struct TPlan and below is 
independent
 // of the execution parameters of any one of the backends on which it is 
running
-// (those are recorded in TPlanFragmentExecParams).
+// (those are recorded in TPlanFragmentInstanceCtx).
 
 namespace cpp impala
 namespace java com.cloudera.impala.thrift

Reply via email to