http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 20a2af5..bc635b1 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -38,7 +38,6 @@
 #include "common/status.h"
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/Types_types.h"
-#include "runtime/query-state.h"
 #include "runtime/runtime-state.h" // for PartitionStatusMap; TODO: disentangle
 #include "scheduling/query-schedule.h"
 #include "util/histogram-metric.h"
@@ -52,7 +51,6 @@ class DataStreamMgr;
 class DataSink;
 class RowBatch;
 class RowDescriptor;
-class PlanFragmentExecutor;
 class ObjectPool;
 class RuntimeState;
 class Expr;
@@ -71,8 +69,8 @@ class QueryResultSet;
 class MemTracker;
 class PlanRootSink;
 class FragmentInstanceState;
+class QueryState;
 
-struct DebugOptions;
 
 /// Query coordinator: handles execution of fragment instances on remote 
nodes, given a
 /// TQueryExecRequest. As part of that, it handles all interactions with the 
executing
@@ -94,31 +92,36 @@ struct DebugOptions;
 /// A typical sequence of calls for a single query (calls under the same 
numbered
 /// item can happen concurrently):
 /// 1. client: Exec()
-/// 2. client: Wait()/client: Cancel()/backend: UpdateFragmentExecStatus()
-/// 3. client: GetNext()*/client: Cancel()/backend: UpdateFragmentExecStatus()
+/// 2. client: Wait()/client: Cancel()/backend: UpdateBackendExecStatus()
+/// 3. client: GetNext()*/client: Cancel()/backend: UpdateBackendExecStatus()
 ///
 /// The implementation ensures that setting an overall error status and 
initiating
-/// cancellation of local and all remote fragments is atomic.
+/// cancellation of all fragment instances is atomic.
 ///
+/// TODO: remove TearDown() and replace with ReleaseResources(); TearDown() 
currently
+/// also disassembles the control structures (such as the local reference to 
the
+/// coordinator's FragmentInstanceState)
 /// TODO: move into separate subdirectory and move nested classes into 
separate files
 /// and unnest them
+/// TODO: clean up locking behavior; in particular, clarify dependency on lock_
+/// TODO: clarify cancellation path; in particular, cancel as soon as we return
+/// all results
 class Coordinator { // NOLINT: The member variables could be re-ordered to 
save space
  public:
-  Coordinator(const QuerySchedule& schedule, ExecEnv* exec_env,
-      RuntimeProfile::EventSequence* events);
+  Coordinator(const QuerySchedule& schedule, RuntimeProfile::EventSequence* 
events);
   ~Coordinator();
 
   /// Initiate asynchronous execution of a query with the given schedule. When 
it returns,
   /// all fragment instances have started executing at their respective 
backends.
   /// A call to Exec() must precede all other member function calls.
-  Status Exec();
+  Status Exec() WARN_UNUSED_RESULT;
 
   /// Blocks until result rows are ready to be retrieved via GetNext(), or, if 
the
   /// query doesn't return rows, until the query finishes or is cancelled.
   /// A call to Wait() must precede all calls to GetNext().
   /// Multiple calls to Wait() are idempotent and it is okay to issue multiple
   /// Wait() calls concurrently.
-  Status Wait();
+  Status Wait() WARN_UNUSED_RESULT;
 
   /// Fills 'results' with up to 'max_rows' rows. May return fewer than 
'max_rows'
   /// rows, but will not return more.
@@ -134,7 +137,7 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   ///
   /// GetNext() is not thread-safe: multiple threads must not make concurrent 
GetNext()
   /// calls (but may call any of the other member functions concurrently with 
GetNext()).
-  Status GetNext(QueryResultSet* results, int max_rows, bool* eos);
+  Status GetNext(QueryResultSet* results, int max_rows, bool* eos) 
WARN_UNUSED_RESULT;
 
   /// Cancel execution of query. This includes the execution of the local plan 
fragment,
   /// if any, as well as all plan fragments on remote nodes. Sets 
query_status_ to the
@@ -142,14 +145,11 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   /// Idempotent.
   void Cancel(const Status* cause = NULL);
 
-  /// Updates status and query execution metadata of a particular
-  /// fragment; if 'status' is an error status or if 'done' is true,
-  /// considers the plan fragment to have finished execution. Assumes
-  /// that calls to UpdateFragmentExecStatus() won't happen
-  /// concurrently for the same backend.
-  /// If 'status' is an error status, also cancel execution of the query via a 
call
-  /// to CancelInternal().
-  Status UpdateFragmentExecStatus(const TReportExecStatusParams& params);
+  /// Updates execution status of a particular backend as well as 
Insert-related
+  /// status (per_partition_status_ and files_to_move_). Also updates
+  /// num_remaining_backends_ and cancels execution if the backend has an 
error status.
+  Status UpdateBackendExecStatus(const TReportExecStatusParams& params)
+      WARN_UNUSED_RESULT;
 
   /// Returns the query state.
   /// Only valid to call after Exec() and before TearDown(). The returned
@@ -172,9 +172,9 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   /// the future if not all fragments have finished execution.
   RuntimeProfile* query_profile() const { return query_profile_.get(); }
 
-  const TUniqueId& query_id() const { return query_id_; }
+  const TUniqueId& query_id() const;
 
-  MemTracker* query_mem_tracker() const { return 
query_state()->query_mem_tracker(); }
+  MemTracker* query_mem_tracker() const;
 
   /// This is safe to call only after Wait()
   const PartitionStatusMap& per_partition_status() { return 
per_partition_status_; }
@@ -198,15 +198,8 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   /// Returns query_status_.
   Status GetStatus();
 
-  /// Returns the exec summary. The exec summary lock must already have been 
taken.
-  /// The caller must not block while holding the lock.
-  const TExecSummary& exec_summary() const {
-    exec_summary_lock_.DCheckLocked();
-    return exec_summary_;
-  }
-
-  /// See the ImpalaServer class comment for the required lock acquisition 
order.
-  SpinLock& GetExecSummaryLock() const { return exec_summary_lock_; }
+  /// Get a copy of the current exec summary. Thread-safe.
+  void GetTExecSummary(TExecSummary* exec_summary);
 
   /// Receive a local filter update from a fragment instance. Aggregate that 
filter update
   /// with others for the same filter ID into a global filter. If all updates 
for that
@@ -214,54 +207,53 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   /// filter to fragment instances.
   void UpdateFilter(const TUpdateFilterParams& params);
 
-  /// Called once the query is complete to tear down any remaining state.
+  /// Called once query execution is complete to tear down any remaining state.
+  /// TODO: change to ReleaseResources() and don't tear down control 
structures.
   void TearDown();
 
  private:
-  class InstanceState;
+  class BackendState;
   struct FilterTarget;
   class FilterState;
-
-  /// Typedef for boost utility to compute averaged stats
-  /// TODO: including the median doesn't compile, looks like some includes are 
missing
-  typedef boost::accumulators::accumulator_set<int64_t,
-      boost::accumulators::features<
-      boost::accumulators::tag::min,
-      boost::accumulators::tag::max,
-      boost::accumulators::tag::mean,
-      boost::accumulators::tag::variance>
-  > SummaryStats;
+  class FragmentStats;
 
   const QuerySchedule schedule_;
-  ExecEnv* exec_env_;
-  TUniqueId query_id_;
 
   /// copied from TQueryExecRequest; constant across all fragments
-  TDescriptorTable desc_tbl_;
   TQueryCtx query_ctx_;
 
   /// copied from TQueryExecRequest, governs when to call ReportQuerySummary
   TStmtType::type stmt_type_;
 
-  /// map from id of a scan node to a specific counter in the node's profile
-  typedef std::map<PlanNodeId, RuntimeProfile::Counter*> CounterMap;
+  /// BackendStates for all execution backends, including the coordinator.
+  /// All elements are non-nullptr. Owned by obj_pool(). Populated by
+  /// InitBackendExec().
+  std::vector<BackendState*> backend_states_;
 
-  /// Struct for per fragment instance counters that will be aggregated by the 
coordinator.
-  struct FragmentInstanceCounters {
-    /// Throughput counters per node
-    CounterMap throughput_counters;
+  // index into backend_states_ for coordinator fragment; -1 if no coordinator 
fragment
+  int coord_backend_idx_ = -1;
 
-    /// Total finished scan ranges per node
-    CounterMap scan_ranges_complete_counters;
-  };
+  /// The QueryState for this coordinator. Set in Exec(). Released in 
TearDown().
+  QueryState* query_state_ = nullptr;
+
+  /// Non-null if and only if the query produces results for the client; i.e. 
is of
+  /// TStmtType::QUERY. Coordinator uses these to pull results from plan tree 
and return
+  /// them to the client in GetNext(), and also to access the fragment 
instance's runtime
+  /// state.
+  ///
+  /// Result rows are materialized by this fragment instance in its own 
thread. They are
+  /// materialized into a QueryResultSet provided to the coordinator during 
GetNext().
+  ///
+  /// Not owned by this class. Set in Exec(). Reset to nullptr (and the implied
+  /// reference of QueryState released) in TearDown().
+  FragmentInstanceState* coord_instance_ = nullptr;
 
-  /// InstanceStates for all fragment instances, including that of the 
coordinator
-  /// fragment. All elements are non-nullptr. Owned by obj_pool(). Filled in
-  /// StartFInstances().
-  std::vector<InstanceState*> fragment_instance_states_;
+  /// Not owned by this class. Set in Exec(). Reset to nullptr in TearDown() 
or when
+  /// GetNext() hits eos.
+  PlanRootSink* coord_sink_ = nullptr;
 
   /// True if the query needs a post-execution step to tidy up
-  bool needs_finalization_;
+  bool needs_finalization_ = false;
 
   /// Only valid if needs_finalization is true
   TFinalizeParams finalize_params_;
@@ -269,17 +261,52 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   /// ensures single-threaded execution of Wait(); must not hold lock_ when 
acquiring this
   boost::mutex wait_lock_;
 
-  bool has_called_wait_;  // if true, Wait() was called; protected by 
wait_lock_
+  bool has_called_wait_ = false;  // if true, Wait() was called; protected by 
wait_lock_
 
   /// Keeps track of number of completed ranges and total scan ranges.
   ProgressUpdater progress_;
 
+  /// Total number of filter updates received (always 0 if filter mode is not
+  /// GLOBAL). Excludes repeated broadcast filter updates. Set in Exec().
+  RuntimeProfile::Counter* filter_updates_received_ = nullptr;
+
+  /// The filtering mode for this query. Set in constructor.
+  TRuntimeFilterMode::type filter_mode_;
+
+  /// Tracks the memory consumed by runtime filters during aggregation. Child 
of
+  /// the query mem tracker in 'query_state_' and set in Exec().
+  std::unique_ptr<MemTracker> filter_mem_tracker_;
+
+  /// Object pool owned by the coordinator.
+  boost::scoped_ptr<ObjectPool> obj_pool_;
+
+  /// Execution summary for a single query.
+  /// A wrapper around TExecSummary, with supporting structures.
+  struct ExecSummary {
+    TExecSummary thrift_exec_summary;
+
+    /// See the ImpalaServer class comment for the required lock acquisition 
order.
+    /// The caller must not block while holding the lock.
+    SpinLock lock;
+
+    /// A mapping of plan node ids to index into thrift_exec_summary.nodes
+    boost::unordered_map<TPlanNodeId, int> node_id_to_idx_map;
+
+    void Init(const QuerySchedule& query_schedule);
+  };
+
+  ExecSummary exec_summary_;
+
+  /// Aggregate counters for the entire query.
+  boost::scoped_ptr<RuntimeProfile> query_profile_;
+
   /// Protects all fields below. This is held while making RPCs, so this lock 
should
   /// only be acquired if the acquiring thread is prepared to wait for a 
significant
   /// time.
+  /// TODO: clarify to what extent the fields below need to be protected by 
lock_
   /// Lock ordering is
   /// 1. lock_
-  /// 2. InstanceState::lock_
+  /// 2. BackendState::lock_
   boost::mutex lock_;
 
   /// Overall status of the entire query; set to the first reported fragment 
error
@@ -289,54 +316,17 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   /// If true, the query is done returning all results.  It is possible that 
the
   /// coordinator still needs to wait for cleanup on remote fragments (e.g. 
queries
   /// with limit)
-  /// Once this is set to true, errors from remote fragments are ignored.
-  bool returned_all_results_;
-
-  /// The QueryState for this coordinator. Set in Exec(). Released in 
TearDown().
-  QueryState* query_state_;
-
-  /// Non-null if and only if the query produces results for the client; i.e. 
is of
-  /// TStmtType::QUERY. Coordinator uses these to pull results from plan tree 
and return
-  /// them to the client in GetNext(), and also to access the fragment 
instance's runtime
-  /// state.
-  ///
-  /// Result rows are materialized by this fragment instance in its own 
thread. They are
-  /// materialized into a QueryResultSet provided to the coordinator during 
GetNext().
-  ///
-  /// Not owned by this class. Set in Exec(). Reset to nullptr (and the implied
-  /// reference of QueryState released) in TearDown().
-  FragmentInstanceState* coord_instance_ = nullptr;
-
-  /// Not owned by this class. Set in Exec(). Reset to nullptr in TearDown() 
or when
-  /// GetNext() hits eos.
-  PlanRootSink* coord_sink_ = nullptr;
-
-  /// owned by plan root, which resides in runtime_state_'s pool
-  const RowDescriptor* row_desc_;
-
-  /// Returns a local object pool.
-  ObjectPool* obj_pool() { return obj_pool_.get(); }
-
-  PlanFragmentExecutor* executor();
-
-  // Sets the TDescriptorTable(s) for the current fragment.
-  void SetExecPlanDescriptorTable(const TPlanFragment& fragment,
-      TExecPlanFragmentParams* rpc_params);
-
-  /// True if execution has completed, false otherwise.
-  bool execution_completed_;
-
-  /// Number of remote fragments that have completed
-  int num_remote_fragements_complete_;
+  /// Once this is set to true, errors from execution backends are ignored.
+  bool returned_all_results_ = false;
 
   /// If there is no coordinator fragment, Wait() simply waits until all
-  /// backends report completion by notifying on instance_completion_cv_.
+  /// backends report completion by notifying on backend_completion_cv_.
   /// Tied to lock_.
-  boost::condition_variable instance_completion_cv_;
+  boost::condition_variable backend_completion_cv_;
 
   /// Count of the number of backends for which done != true. When this
   /// hits 0, any Wait()'ing thread is notified
-  int num_remaining_fragment_instances_;
+  int num_remaining_backends_ = 0;
 
   /// The following two structures, partition_row_counts_ and files_to_move_ 
are filled in
   /// as the query completes, and track the results of INSERT queries that 
alter the
@@ -353,64 +343,15 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   /// empty string for the destination means that a file is to be deleted.
   FileMoveMap files_to_move_;
 
-  /// Object pool owned by the coordinator. Any executor will have its own 
pool.
-  boost::scoped_ptr<ObjectPool> obj_pool_;
-
-  /// Execution summary for this query.
-  /// See the ImpalaServer class comment for the required lock acquisition 
order.
-  mutable SpinLock exec_summary_lock_;
-  TExecSummary exec_summary_;
-
-  /// A mapping of plan node ids to index into exec_summary_.nodes
-  boost::unordered_map<TPlanNodeId, int> plan_node_id_to_summary_map_;
-
-  /// Aggregate counters for the entire query.
-  boost::scoped_ptr<RuntimeProfile> query_profile_;
-
-  /// Event timeline for this query. Unowned.
-  RuntimeProfile::EventSequence* query_events_;
-
-  /// Per fragment profile information
-  struct PerFragmentProfileData {
-    /// Averaged profile for this fragment.  Stored in obj_pool.
-    /// The counters in this profile are averages (type AveragedCounter) of the
-    /// counters in the fragment instance profiles.
-    /// Note that the individual fragment instance profiles themselves are 
stored and
-    /// displayed as children of the root_profile below.
-    RuntimeProfile* averaged_profile;
-
-    /// Number of instances running this fragment.
-    int num_instances;
+  /// Event timeline for this query. Not owned.
+  RuntimeProfile::EventSequence* query_events_ = nullptr;
 
-    /// Root profile for all fragment instances for this fragment
-    RuntimeProfile* root_profile;
+  /// Indexed by fragment idx (TPlanFragment.idx). Filled in 
InitFragmentStats(),
+  /// elements live in obj_pool().
+  std::vector<FragmentStats*> fragment_stats_;
 
-    /// Bytes assigned for instances of this fragment
-    SummaryStats bytes_assigned;
-
-    /// Completion times for instances of this fragment
-    SummaryStats completion_times;
-
-    /// Execution rates for instances of this fragment
-    SummaryStats rates;
-
-    PerFragmentProfileData()
-      : averaged_profile(nullptr), num_instances(-1), root_profile(nullptr) {}
-  };
-
-  /// This is indexed by fragment idx (TPlanFragment.idx).
-  /// This array is only modified at coordinator startup and query completion 
and
-  /// does not need locks.
-  std::vector<PerFragmentProfileData> fragment_profiles_;
-
-  /// Throughput counters for the coordinator fragment
-  FragmentInstanceCounters coordinator_counters_;
-
-  /// The set of hosts that the query will run on. Populated in Exec.
-  boost::unordered_set<TNetworkAddress> unique_hosts_;
-
-  /// Total time spent in finalization (typically 0 except for INSERT into 
hdfs tables)
-  RuntimeProfile::Counter* finalization_timer_;
+  /// total time spent in finalization (typically 0 except for INSERT into 
hdfs tables)
+  RuntimeProfile::Counter* finalization_timer_ = nullptr;
 
   /// Barrier that is released when all calls to ExecRemoteFragment() have
   /// returned, successfully or not. Initialised during Exec().
@@ -425,54 +366,20 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
 
   /// Set to true when all calls to UpdateFilterRoutingTable() have finished, 
and it's
   /// safe to concurrently read from filter_routing_table_.
-  bool filter_routing_table_complete_;
-
-  /// Total number of filter updates received (always 0 if filter mode is not
-  /// GLOBAL). Excludes repeated broadcast filter updates.
-  RuntimeProfile::Counter* filter_updates_received_;
-
-  /// The filtering mode for this query. Set in constructor.
-  TRuntimeFilterMode::type filter_mode_;
-
-  /// Tracks the memory consumed by runtime filters during aggregation. Child 
of
-  /// the query mem tracker in 'query_state_'.
-  std::unique_ptr<MemTracker> filter_mem_tracker_;
+  bool filter_routing_table_complete_ = false;
 
   /// True if and only if TearDown() has been called.
-  bool torn_down_;
+  bool torn_down_ = false;
+
+  /// Returns a local object pool.
+  ObjectPool* obj_pool() { return obj_pool_.get(); }
 
   /// Returns a pretty-printed table of the current filter state.
   std::string FilterDebugString();
 
-  /// Sets 'filter_routing_table_complete_' and prints the table to the 
profile and log.
-  void MarkFilterRoutingTableComplete();
-
-  /// Fill in rpc_params based on params.
-  void SetExecPlanFragmentParams(
-      const FInstanceExecParams& params, TExecPlanFragmentParams* rpc_params);
-
-  /// Wrapper for ExecPlanFragment() RPC. This function will be called in 
parallel from
-  /// multiple threads.
-  void ExecRemoteFInstance(
-      const FInstanceExecParams& exec_params, const DebugOptions* 
debug_options);
-
-  /// Determine fragment number, given fragment id.
-  int GetFragmentNum(const TUniqueId& fragment_id);
-
-  /// Print hdfs split size stats to VLOG_QUERY and details to VLOG_FILE
-  /// Attaches split size summary to the appropriate runtime profile
-  void PrintFragmentInstanceInfo();
-
-  /// Collect scan node counters from the profile.
-  /// Assumes lock protecting profile and result is held.
-  void CollectScanNodeCounters(RuntimeProfile*, FragmentInstanceCounters* 
result);
-
-  /// Runs cancel logic. Assumes that lock_ is held.
-  void CancelInternal();
-
   /// Cancels all fragment instances. Assumes that lock_ is held. This may be 
called when
   /// the query is not being cancelled in the case where the query limit is 
reached.
-  void CancelFragmentInstances();
+  void CancelInternal();
 
   /// Acquires lock_ and updates query_status_ with 'status' if it's not 
already
   /// an error status, and returns the current query_status_.
@@ -480,45 +387,31 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   /// failed_fragment is the fragment_id that has failed, used for error 
reporting along
   /// with instance_hostname.
   Status UpdateStatus(const Status& status, const TUniqueId& failed_fragment,
-      const std::string& instance_hostname);
+      const std::string& instance_hostname) WARN_UNUSED_RESULT;
+
+  /// Update per_partition_status_ and files_to_move_.
+  void UpdateInsertExecStatus(const TInsertExecStatus& insert_exec_status);
 
-  /// Returns only when either all fragment instances have reported success or 
the query
+  /// Returns only when either all execution backends have reported success or 
the query
   /// is in error. Returns the status of the query.
   /// It is safe to call this concurrently, but any calls must be made only 
after Exec().
-  /// WaitForAllInstances may be called before Wait(), but note that Wait() 
guarantees
-  /// that any coordinator fragment has finished, which this method does not.
-  Status WaitForAllInstances();
+  Status WaitForBackendCompletion() WARN_UNUSED_RESULT;
 
-  /// Perform any post-query cleanup required. Called by Wait() only after all 
fragment
-  /// instances have returned, or if the query has failed, in which case it 
only cleans up
-  /// temporary data rather than finishing the INSERT in flight.
-  Status FinalizeQuery();
-
-  /// Moves all temporary staging files to their final destinations.
-  Status FinalizeSuccessfulInsert();
-
-  /// Initializes the structures in fragment_profiles_. Must be called before 
RPCs to
-  /// start remote fragments.
-  void InitExecProfiles();
-
-  /// Initialize the structures to collect execution summary of every plan node
-  /// (exec_summary_ and plan_node_id_to_summary_map_)
-  void InitExecSummary();
+  /// Initializes fragment_stats_ and query_profile_. Must be called before
+  /// InitBackendStates().
+  void InitFragmentStats();
 
-  /// Update fragment profile information from a fragment instance state.
-  void UpdateAverageProfile(InstanceState* instance_state);
+  /// Populates backend_states_ based on schedule_.fragment_exec_params().
+  /// BackendState depends on fragment_stats_, which is why InitFragmentStats()
+  /// must be called before this function.
+  void InitBackendStates();
 
-  /// Compute the summary stats (completion_time and rates)
-  /// for an individual fragment_profile_ based on the specified instance 
state.
-  void ComputeFragmentSummaryStats(InstanceState* instance_state);
+  /// Computes execution summary info strings for fragment_stats_ and 
query_profile_.
+  /// This is assumed to be called at the end of a query -- remote fragments'
+  /// profiles must not be updated while this is running.
+  void ComputeQuerySummary();
 
-  /// Outputs aggregate query profile summary.  This is assumed to be called 
at the end of
-  /// a query -- remote fragments' profiles must not be updated while this is 
running.
-  void ReportQuerySummary();
-
-  /// Populates the summary execution stats from the profile. Can only be 
called when the
-  /// query is done.
-  void UpdateExecSummary(const InstanceState& instance_state);
+  /// TODO: move the next 3 functions into a separate class
 
   /// Determines what the permissions of directories created by INSERT 
statements should
   /// be if permission inheritance is enabled. Populates a map from all 
prefixes of
@@ -534,19 +427,26 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   void PopulatePathPermissionCache(hdfsFS fs, const std::string& path_str,
       PermissionCache* permissions_cache);
 
-  /// Starts all fragment instances contained in the schedule by issuing RPCs 
in
-  /// parallel and then waiting for all of the RPCs to complete. Also sets up 
and
-  /// registers the state for all fragment instances.
-  void StartFInstances();
+  /// Moves all temporary staging files to their final destinations.
+  Status FinalizeSuccessfulInsert() WARN_UNUSED_RESULT;
+
+  /// Perform any post-query cleanup required. Called by Wait() only after all 
fragment
+  /// instances have returned, or if the query has failed, in which case it 
only cleans up
+  /// temporary data rather than finishing the INSERT in flight.
+  Status FinalizeQuery() WARN_UNUSED_RESULT;
+
+  /// Populates backend_states_, starts query execution at all backends in 
parallel, and
+  /// blocks until startup completes.
+  void StartBackendExec();
 
-  /// Calls CancelInternal() and returns an error if there was any error 
starting the
-  /// fragments.
+  /// Calls CancelInternal() and returns an error if there was any error 
starting
+  /// backend execution.
   /// Also updates query_profile_ with the startup latency histogram.
-  Status FinishInstanceStartup();
+  Status FinishBackendStartup() WARN_UNUSED_RESULT;
 
   /// Build the filter routing table by iterating over all plan nodes and 
collecting the
   /// filters that they either produce or consume.
-  void UpdateFilterRoutingTable(const FragmentExecParams& fragment_params);
+  void InitFilterRoutingTable();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc 
b/be/src/runtime/data-stream-test.cc
index f2785cd..4cb2bf0 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -81,20 +81,16 @@ class ImpalaTestBackend : public ImpalaInternalServiceIf {
   ImpalaTestBackend(DataStreamMgr* stream_mgr): mgr_(stream_mgr) {}
   virtual ~ImpalaTestBackend() {}
 
-  virtual void ExecPlanFragment(
-      TExecPlanFragmentResult& return_val, const TExecPlanFragmentParams& 
params) {}
-
-  virtual void ReportExecStatus(
-      TReportExecStatusResult& return_val, const TReportExecStatusParams& 
params) {}
-
-  virtual void CancelPlanFragment(
-      TCancelPlanFragmentResult& return_val, const TCancelPlanFragmentParams& 
params) {}
-
-  virtual void UpdateFilter(
-      TUpdateFilterResult& return_val, const TUpdateFilterParams& params) {}
-
-  virtual void PublishFilter(
-      TPublishFilterResult& return_val, const TPublishFilterParams& params) {}
+  virtual void ExecQueryFInstances(TExecQueryFInstancesResult& return_val,
+      const TExecQueryFInstancesParams& params) {}
+  virtual void CancelQueryFInstances(TCancelQueryFInstancesResult& return_val,
+      const TCancelQueryFInstancesParams& params) {}
+  virtual void ReportExecStatus(TReportExecStatusResult& return_val,
+      const TReportExecStatusParams& params) {}
+  virtual void UpdateFilter(TUpdateFilterResult& return_val,
+      const TUpdateFilterParams& params) {}
+  virtual void PublishFilter(TPublishFilterResult& return_val,
+      const TPublishFilterParams& params) {}
 
   virtual void TransmitData(
       TTransmitDataResult& return_val, const TTransmitDataParams& params) {
@@ -116,7 +112,7 @@ class DataStreamTest : public testing::Test {
   DataStreamTest() : next_val_(0) {
     // Initialize MemTrackers and RuntimeState for use by the data stream 
receiver.
     exec_env_.InitForFeTests();
-    runtime_state_.reset(new RuntimeState(TQueryCtx(), &exec_env_, 
"test-pool"));
+    runtime_state_.reset(new RuntimeState(TQueryCtx(), &exec_env_));
 
     // Stop tests that rely on mismatched sender / receiver pairs timing out 
from failing.
     FLAGS_datastream_sender_timeout_ms = 250;
@@ -275,8 +271,7 @@ class DataStreamTest : public testing::Test {
     slot_desc.__set_nullIndicatorBit(-1);
     slot_desc.__set_slotIdx(0);
     thrift_desc_tbl.slotDescriptors.push_back(slot_desc);
-    EXPECT_OK(DescriptorTbl::Create(&obj_pool_, thrift_desc_tbl, &desc_tbl_));
-    runtime_state_->set_desc_tbl(desc_tbl_);
+    EXPECT_OK(DescriptorTbl::Create(&obj_pool_, thrift_desc_tbl, nullptr, 
&desc_tbl_));
 
     vector<TTupleId> row_tids;
     row_tids.push_back(0);
@@ -480,8 +475,7 @@ class DataStreamTest : public testing::Test {
 
   void Sender(
       int sender_num, int channel_buffer_size, TPartitionType::type 
partition_type) {
-    RuntimeState state(TQueryCtx(), &exec_env_, "test-pool");
-    state.set_desc_tbl(desc_tbl_);
+    RuntimeState state(TQueryCtx(), &exec_env_, desc_tbl_);
     VLOG_QUERY << "create sender " << sender_num;
     const TDataStreamSink& sink = GetSink(partition_type);
     DataStreamSender sender(
@@ -593,8 +587,7 @@ TEST_F(DataStreamTest, BasicTest) {
 //
 // TODO: Make lifecycle requirements more explicit.
 TEST_F(DataStreamTest, CloseRecvrWhileReferencesRemain) {
-  scoped_ptr<RuntimeState> runtime_state(
-      new RuntimeState(TQueryCtx(), &exec_env_, "test-pool"));
+  scoped_ptr<RuntimeState> runtime_state(new RuntimeState(TQueryCtx(), 
&exec_env_));
   scoped_ptr<RuntimeProfile> profile(new RuntimeProfile(&obj_pool_, 
"TestReceiver"));
 
   // Start just one receiver.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/debug-options.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/debug-options.cc b/be/src/runtime/debug-options.cc
new file mode 100644
index 0000000..7a63742
--- /dev/null
+++ b/be/src/runtime/debug-options.cc
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <boost/algorithm/string.hpp>
+
+#include "runtime/debug-options.h"
+#include "common/logging.h"
+
+using namespace impala;
+using namespace std;
+using namespace boost;
+
+DebugOptions::DebugOptions(const TQueryOptions& query_options)
+  : instance_idx_(-1),
+    node_id_(-1),
+    action_(TDebugAction::WAIT),
+    phase_(TExecNodePhase::INVALID) {
+  if (!query_options.__isset.debug_action || 
query_options.debug_action.empty()) {
+    // signal not set
+    return;
+  }
+
+  vector<string> components;
+  split(components, query_options.debug_action, is_any_of(":"), 
token_compress_on);
+  if (components.size() < 3 || components.size() > 4) return;
+  if (components.size() == 3) {
+    instance_idx_ = -1;
+    node_id_ = atoi(components[0].c_str());
+    phase_ = GetExecNodePhase(components[1]);
+    action_ = GetDebugAction(components[2]);
+  } else {
+    instance_idx_ = atoi(components[0].c_str());
+    node_id_ = atoi(components[1].c_str());
+    phase_ = GetExecNodePhase(components[2]);
+    action_ = GetDebugAction(components[3]);
+  }
+  DCHECK(!(phase_ == TExecNodePhase::CLOSE && action_ == TDebugAction::WAIT))
+      << "Do not use CLOSE:WAIT debug actions because nodes cannot be 
cancelled in "
+         "Close()";
+}
+
+TExecNodePhase::type DebugOptions::GetExecNodePhase(const string& key) {
+  map<int, const char*>::const_iterator entry =
+      _TExecNodePhase_VALUES_TO_NAMES.begin();
+  for (; entry != _TExecNodePhase_VALUES_TO_NAMES.end(); ++entry) {
+    if (iequals(key, (*entry).second)) {
+      return static_cast<TExecNodePhase::type>(entry->first);
+    }
+  }
+  return TExecNodePhase::INVALID;
+}
+
+TDebugAction::type DebugOptions::GetDebugAction(const string& key) {
+  for (auto entry: _TDebugAction_VALUES_TO_NAMES) {
+    if (iequals(key, entry.second)) {
+      return static_cast<TDebugAction::type>(entry.first);
+    }
+  }
+  return TDebugAction::WAIT;
+}
+
+
+TDebugOptions DebugOptions::ToThrift() const {
+  TDebugOptions result;
+  result.__set_node_id(node_id_);
+  result.__set_phase(phase_);
+  result.__set_action(action_);
+  return result;
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/debug-options.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/debug-options.h b/be/src/runtime/debug-options.h
new file mode 100644
index 0000000..95f1f65
--- /dev/null
+++ b/be/src/runtime/debug-options.h
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_DEBUG_OPTIONS_H
+#define IMPALA_RUNTIME_DEBUG_OPTIONS_H
+
+#include <vector>
+#include <string>
+
+#include "gen-cpp/Frontend_types.h"
+#include "gen-cpp/Types_types.h"
+
+namespace impala {
+
+// Container for debug options in TPlanFragmentInstanceCtx (debug_node, 
debug_action,
+// debug_phase).
+// TODO: move to subdirectory /coordinator
+class DebugOptions {
+ public:
+  DebugOptions(const TQueryOptions& query_options);
+  TDebugOptions ToThrift() const;
+
+  /// query-wide fragment instance index; -1 if not set
+  int instance_idx() const { return instance_idx_; }
+
+  /// -1 if no debug options set
+  int node_id() const { return node_id_; }
+
+  TDebugAction::type action() const { return action_; }
+  TExecNodePhase::type phase() const { return phase_; }
+
+ private:
+  int instance_idx_;
+  int node_id_;
+  TDebugAction::type action_;
+  TExecNodePhase::type phase_;  // INVALID: debug options invalid
+
+  static TExecNodePhase::type GetExecNodePhase(const std::string& key);
+  static TDebugAction::type GetDebugAction(const std::string& key);
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/descriptors.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc
index 709ddb1..bb7c1d0 100644
--- a/be/src/runtime/descriptors.cc
+++ b/be/src/runtime/descriptors.cc
@@ -480,7 +480,7 @@ string RowDescriptor::DebugString() const {
 }
 
 Status DescriptorTbl::Create(ObjectPool* pool, const TDescriptorTable& 
thrift_tbl,
-                             DescriptorTbl** tbl) {
+    MemTracker* mem_tracker, DescriptorTbl** tbl) {
   *tbl = pool->Add(new DescriptorTbl());
   // deserialize table descriptors first, they are being referenced by tuple 
descriptors
   for (size_t i = 0; i < thrift_tbl.tableDescriptors.size(); ++i) {
@@ -489,6 +489,21 @@ Status DescriptorTbl::Create(ObjectPool* pool, const 
TDescriptorTable& thrift_tb
     switch (tdesc.tableType) {
       case TTableType::HDFS_TABLE:
         desc = pool->Add(new HdfsTableDescriptor(tdesc, pool));
+
+        if (mem_tracker != nullptr) {
+          // prepare and open partition exprs
+          const HdfsTableDescriptor* hdfs_tbl =
+              static_cast<const HdfsTableDescriptor*>(desc);
+          for (const auto& part_entry : hdfs_tbl->partition_descriptors()) {
+            // TODO: RowDescriptor should arguably be optional in Prepare for 
known
+            // literals Partition exprs are not used in the codegen case.  
Don't codegen
+            // them.
+            
RETURN_IF_ERROR(Expr::Prepare(part_entry.second->partition_key_value_ctxs(),
+                nullptr, RowDescriptor(), mem_tracker));
+            RETURN_IF_ERROR(Expr::Open(
+                part_entry.second->partition_key_value_ctxs(), nullptr));
+          }
+        }
         break;
       case TTableType::HBASE_TABLE:
         desc = pool->Add(new HBaseTableDescriptor(tdesc));
@@ -530,27 +545,14 @@ Status DescriptorTbl::Create(ObjectPool* pool, const 
TDescriptorTable& thrift_tb
   return Status::OK();
 }
 
-Status DescriptorTbl::PrepareAndOpenPartitionExprs(RuntimeState* state) const {
-  for (const auto& tbl_entry : tbl_desc_map_) {
-    if (tbl_entry.second->type() != TTableType::HDFS_TABLE) continue;
-    HdfsTableDescriptor* hdfs_tbl = 
static_cast<HdfsTableDescriptor*>(tbl_entry.second);
-    for (const auto& part_entry : hdfs_tbl->partition_descriptors()) {
-      // TODO: RowDescriptor should arguably be optional in Prepare for known 
literals
-      // Partition exprs are not used in the codegen case.  Don't codegen them.
-      
RETURN_IF_ERROR(Expr::Prepare(part_entry.second->partition_key_value_ctxs(), 
state,
-          RowDescriptor(), state->instance_mem_tracker()));
-      
RETURN_IF_ERROR(Expr::Open(part_entry.second->partition_key_value_ctxs(), 
state));
-    }
-  }
-  return Status::OK();
-}
-
-void DescriptorTbl::ClosePartitionExprs(RuntimeState* state) const {
-  for (const auto& tbl_entry: tbl_desc_map_) {
-    if (tbl_entry.second->type() != TTableType::HDFS_TABLE) continue;
-    HdfsTableDescriptor* hdfs_tbl = 
static_cast<HdfsTableDescriptor*>(tbl_entry.second);
+void DescriptorTbl::ReleaseResources() {
+  // close partition exprs of hdfs tables
+  for (auto entry: tbl_desc_map_) {
+    if (entry.second->type() != TTableType::HDFS_TABLE) continue;
+    const HdfsTableDescriptor* hdfs_tbl =
+        static_cast<const HdfsTableDescriptor*>(entry.second);
     for (const auto& part_entry: hdfs_tbl->partition_descriptors()) {
-      Expr::Close(part_entry.second->partition_key_value_ctxs(), state);
+      Expr::Close(part_entry.second->partition_key_value_ctxs(), nullptr);
     }
   }
 }
@@ -585,7 +587,6 @@ SlotDescriptor* DescriptorTbl::GetSlotDescriptor(SlotId id) 
const {
   }
 }
 
-// return all registered tuple descriptors
 void DescriptorTbl::GetTupleDescs(vector<TupleDescriptor*>* descs) const {
   descs->clear();
   for (TupleDescriptorMap::const_iterator i = tuple_desc_map_.begin();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/descriptors.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 26f58d3..06c65c5 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -46,6 +46,7 @@ class LlvmBuilder;
 class LlvmCodeGen;
 class ObjectPool;
 class RuntimeState;
+class MemTracker;
 class TDescriptorTable;
 class TSlotDescriptor;
 class TTable;
@@ -283,7 +284,6 @@ class HdfsPartitionDescriptor {
   /// The Prepare()/Open()/Close() cycle is controlled by the containing 
descriptor table
   /// because the same partition descriptor may be used by multiple exec nodes 
with
   /// different lifetimes.
-  /// TODO: Move these into the new query-wide state, indexed by partition id.
   std::vector<ExprContext*> partition_key_value_ctxs_;
 
   /// The format (e.g. text, sequence file etc.) of data in the files in this 
partition
@@ -461,15 +461,16 @@ class TupleDescriptor {
 class DescriptorTbl {
  public:
   /// Creates a descriptor tbl within 'pool' from thrift_tbl and returns it 
via 'tbl'.
+  /// If mem_tracker_ != nullptr, also opens partition exprs for hdfs tables 
(and does
+  /// memory allocation against that tracker).
   /// Returns OK on success, otherwise error (in which case 'tbl' will be 
unset).
+  /// TODO: when cleaning up ExprCtx, remove the need to pass in a memtracker 
for literal
+  /// exprs that don't require additional memory at runtime.
   static Status Create(ObjectPool* pool, const TDescriptorTable& thrift_tbl,
-                       DescriptorTbl** tbl);
+      MemTracker* mem_tracker, DescriptorTbl** tbl);
 
-  /// Prepares and opens partition exprs of Hdfs tables.
-  Status PrepareAndOpenPartitionExprs(RuntimeState* state) const;
-
-  /// Closes partition exprs of Hdfs tables.
-  void ClosePartitionExprs(RuntimeState* state) const;
+  /// Free memory allocated in Create().
+  void ReleaseResources();
 
   TableDescriptor* GetTableDescriptor(TableId id) const;
   TupleDescriptor* GetTupleDescriptor(TupleId id) const;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index e5d0c0b..6eeeaee 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -105,6 +105,7 @@ DEFINE_int32(resource_broker_cnxn_retry_interval_ms, 3000, 
"Deprecated");
 DEFINE_int32(resource_broker_send_timeout, 0, "Deprecated");
 DEFINE_int32(resource_broker_recv_timeout, 0, "Deprecated");
 
+// TODO-MT: rename or retire
 DEFINE_int32(coordinator_rpc_threads, 12, "(Advanced) Number of threads 
available to "
     "start fragments on remote Impala daemons.");
 
@@ -150,7 +151,7 @@ ExecEnv::ExecEnv()
     tmp_file_mgr_(new TmpFileMgr),
     request_pool_service_(new RequestPoolService(metrics_.get())),
     frontend_(new Frontend()),
-    fragment_exec_thread_pool_(new 
CallableThreadPool("coordinator-fragment-rpc",
+    exec_rpc_thread_pool_(new CallableThreadPool("exec-rpc-pool",
         "worker", FLAGS_coordinator_rpc_threads, 
numeric_limits<int32_t>::max())),
     async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 
10000)),
     query_exec_mgr_(new QueryExecMgr()),
@@ -215,7 +216,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, 
int subscriber_port,
         CreateHdfsOpThreadPool("hdfs-worker-pool", 
FLAGS_num_hdfs_worker_threads, 1024)),
     tmp_file_mgr_(new TmpFileMgr),
     frontend_(new Frontend()),
-    fragment_exec_thread_pool_(new 
CallableThreadPool("coordinator-fragment-rpc",
+    exec_rpc_thread_pool_(new CallableThreadPool("exec-rpc-pool",
         "worker", FLAGS_coordinator_rpc_threads, 
numeric_limits<int32_t>::max())),
     async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 
10000)),
     query_exec_mgr_(new QueryExecMgr()),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 87824ba..43b0e0b 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -88,9 +88,7 @@ class ExecEnv {
   ThreadResourceMgr* thread_mgr() { return thread_mgr_.get(); }
   HdfsOpThreadPool* hdfs_op_thread_pool() { return hdfs_op_thread_pool_.get(); 
}
   TmpFileMgr* tmp_file_mgr() { return tmp_file_mgr_.get(); }
-  CallableThreadPool* fragment_exec_thread_pool() {
-    return fragment_exec_thread_pool_.get();
-  }
+  CallableThreadPool* exec_rpc_thread_pool() { return 
exec_rpc_thread_pool_.get(); }
   ImpalaServer* impala_server() { return impala_server_; }
   Frontend* frontend() { return frontend_.get(); }
   RequestPoolService* request_pool_service() { return 
request_pool_service_.get(); }
@@ -141,7 +139,7 @@ class ExecEnv {
   boost::scoped_ptr<TmpFileMgr> tmp_file_mgr_;
   boost::scoped_ptr<RequestPoolService> request_pool_service_;
   boost::scoped_ptr<Frontend> frontend_;
-  boost::scoped_ptr<CallableThreadPool> fragment_exec_thread_pool_;
+  boost::scoped_ptr<CallableThreadPool> exec_rpc_thread_pool_;
   boost::scoped_ptr<CallableThreadPool> async_rpc_pool_;
   boost::scoped_ptr<QueryExecMgr> query_exec_mgr_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc 
b/be/src/runtime/fragment-instance-state.cc
index b0adde9..9376767 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -18,150 +18,442 @@
 
 #include "runtime/fragment-instance-state.h"
 
+#include <sstream>
+#include <thrift/protocol/TDebugProtocol.h>
 #include <boost/thread/locks.hpp>
+#include <boost/thread/thread_time.hpp>
 #include <boost/thread/lock_guard.hpp>
+#include <boost/date_time/posix_time/posix_time_types.hpp>
 
+#include "common/names.h"
+#include "codegen/llvm-codegen.h"
+#include "exec/plan-root-sink.h"
+#include "exec/exec-node.h"
+#include "exec/hdfs-scan-node-base.h"  // for PerVolumeStats
+#include "exec/exchange-node.h"
+#include "exec/scan-node.h"
 #include "runtime/exec-env.h"
 #include "runtime/backend-client.h"
 #include "runtime/runtime-filter-bank.h"
 #include "runtime/client-cache.h"
 #include "runtime/runtime-state.h"
 #include "runtime/query-state.h"
+#include "runtime/query-state.h"
+#include "runtime/data-stream-mgr.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/row-batch.h"
+#include "scheduling/query-schedule.h"
+#include "util/debug-util.h"
+#include "util/container-util.h"
+#include "util/periodic-counter-updater.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 
-#include "common/names.h"
+DEFINE_int32(status_report_interval, 5, "interval between profile reports; in 
seconds");
 
 using namespace impala;
+using namespace apache::thrift;
+
+const string FragmentInstanceState::PER_HOST_PEAK_MEM_COUNTER = 
"PerHostPeakMemUsage";
+
+static const string OPEN_TIMER_NAME = "OpenTime";
+static const string PREPARE_TIMER_NAME = "PrepareTime";
+static const string EXEC_TIMER_NAME = "ExecTime";
+
 
 FragmentInstanceState::FragmentInstanceState(
     QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
-    const TPlanFragmentInstanceCtx& instance_ctx, const TDescriptorTable& 
desc_tbl)
+    const TPlanFragmentInstanceCtx& instance_ctx)
   : query_state_(query_state),
     fragment_ctx_(fragment_ctx),
-    instance_ctx_(instance_ctx),
-    desc_tbl_(desc_tbl),
-    executor_(
-        [this](const Status& status, RuntimeProfile* profile, bool done) {
-          ReportStatusCb(status, profile, done);
-        }) {
+    instance_ctx_(instance_ctx) {
+}
+
+Status FragmentInstanceState::Exec() {
+  Status status = Prepare();
+  DCHECK(runtime_state_ != nullptr);  // we need to guarantee at least that
+  prepared_promise_.Set(status);
+  if (!status.ok()) {
+    opened_promise_.Set(status);
+    goto done;
+  }
+  status = Open();
+  opened_promise_.Set(status);
+  if (!status.ok()) goto done;
+
+  {
+    // Must go out of scope before Finalize(), otherwise counter will not be
+    // updated by time final profile is sent.
+    SCOPED_TIMER(profile()->total_time_counter());
+    SCOPED_TIMER(ADD_TIMER(timings_profile_, EXEC_TIMER_NAME));
+    status = ExecInternal();
+  }
+
+done:
+  // call this before Close() to make sure the thread token got released
+  Finalize(status);
+  Close();
+  return status;
 }
 
-Status FragmentInstanceState::UpdateStatus(const Status& status) {
-  lock_guard<mutex> l(status_lock_);
-  if (!status.ok() && exec_status_.ok()) exec_status_ = status;
-  return exec_status_;
+void FragmentInstanceState::Cancel() {
+  WaitForPrepare();  // make sure Prepare() finished
+
+  // Ensure that the sink is closed from both sides. Although in ordinary 
executions we
+  // rely on the consumer to do this, in error cases the consumer may not be 
able to send
+  // CloseConsumer() (see IMPALA-4348 for an example).
+  if (root_sink_ != nullptr) root_sink_->CloseConsumer();
+
+  DCHECK(runtime_state_ != nullptr);
+  runtime_state_->set_is_cancelled();
+  runtime_state_->stream_mgr()->Cancel(runtime_state_->fragment_instance_id());
 }
 
-Status FragmentInstanceState::Cancel() {
-  lock_guard<mutex> l(status_lock_);
-  RETURN_IF_ERROR(exec_status_);
-  executor_.Cancel();
+Status FragmentInstanceState::Prepare() {
+  DCHECK(!prepared_promise_.IsSet());
+
+  VLOG(2) << "fragment_instance_ctx:\n" << ThriftDebugString(instance_ctx_);
+
+  // Do not call RETURN_IF_ERROR or explicitly return before this line,
+  // runtime_state_ != nullptr is a postcondition of this function.
+  runtime_state_ = obj_pool()->Add(new RuntimeState(
+      query_state_, fragment_ctx_, instance_ctx_, ExecEnv::GetInstance()));
+
+  // total_time_counter() is in the runtime_state_ so start it up now.
+  SCOPED_TIMER(profile()->total_time_counter());
+  timings_profile_ = obj_pool()->Add(
+      new RuntimeProfile(obj_pool(), "Fragment Instance Lifecycle Timings"));
+  profile()->AddChild(timings_profile_);
+  SCOPED_TIMER(ADD_TIMER(timings_profile_, PREPARE_TIMER_NAME));
+
+  // TODO: move this into a RuntimeState::Init()
+  RETURN_IF_ERROR(runtime_state_->CreateBlockMgr());
+  runtime_state_->InitFilterBank();
+
+  // Reserve one main thread from the pool
+  runtime_state_->resource_pool()->AcquireThreadToken();
+
+  avg_thread_tokens_ = profile()->AddSamplingCounter("AverageThreadTokens",
+      bind<int64_t>(mem_fn(&ThreadResourceMgr::ResourcePool::num_threads),
+          runtime_state_->resource_pool()));
+  mem_usage_sampled_counter_ = profile()->AddTimeSeriesCounter("MemoryUsage",
+      TUnit::BYTES,
+      bind<int64_t>(mem_fn(&MemTracker::consumption),
+          runtime_state_->instance_mem_tracker()));
+  thread_usage_sampled_counter_ = 
profile()->AddTimeSeriesCounter("ThreadUsage",
+      TUnit::UNIT,
+      bind<int64_t>(mem_fn(&ThreadResourceMgr::ResourcePool::num_threads),
+          runtime_state_->resource_pool()));
+
+  // set up plan
+  RETURN_IF_ERROR(ExecNode::CreateTree(
+      runtime_state_, fragment_ctx_.fragment.plan, query_state_->desc_tbl(),
+      &exec_tree_));
+  runtime_state_->set_fragment_root_id(exec_tree_->id());
+  if (instance_ctx_.__isset.debug_options) {
+    ExecNode::SetDebugOptions(instance_ctx_.debug_options, exec_tree_);
+  }
+
+  // set #senders of exchange nodes before calling Prepare()
+  vector<ExecNode*> exch_nodes;
+  exec_tree_->CollectNodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes);
+  for (ExecNode* exch_node : exch_nodes) {
+    DCHECK_EQ(exch_node->type(), TPlanNodeType::EXCHANGE_NODE);
+    int num_senders =
+        FindWithDefault(instance_ctx_.per_exch_num_senders, exch_node->id(), 
0);
+    DCHECK_GT(num_senders, 0);
+    static_cast<ExchangeNode*>(exch_node)->set_num_senders(num_senders);
+  }
+
+  // set scan ranges
+  vector<ExecNode*> scan_nodes;
+  vector<TScanRangeParams> no_scan_ranges;
+  exec_tree_->CollectScanNodes(&scan_nodes);
+  for (ExecNode* scan_node: scan_nodes) {
+    const vector<TScanRangeParams>& scan_ranges = FindWithDefault(
+        instance_ctx_.per_node_scan_ranges, scan_node->id(), no_scan_ranges);
+    static_cast<ScanNode*>(scan_node)->SetScanRanges(scan_ranges);
+  }
+
+  RuntimeProfile::Counter* prepare_timer =
+      ADD_CHILD_TIMER(timings_profile_, "ExecTreePrepareTime", 
PREPARE_TIMER_NAME);
+  {
+    SCOPED_TIMER(prepare_timer);
+    RETURN_IF_ERROR(exec_tree_->Prepare(runtime_state_));
+  }
+  PrintVolumeIds();
+
+  // prepare sink_
+  DCHECK(fragment_ctx_.fragment.__isset.output_sink);
+  RETURN_IF_ERROR(
+      DataSink::Create(
+          obj_pool(), fragment_ctx_, instance_ctx_, exec_tree_->row_desc(), 
&sink_));
+  RETURN_IF_ERROR(sink_->Prepare(runtime_state_, 
runtime_state_->instance_mem_tracker()));
+  RuntimeProfile* sink_profile = sink_->profile();
+  if (sink_profile != nullptr) profile()->AddChild(sink_profile);
+
+  if (fragment_ctx_.fragment.output_sink.type == 
TDataSinkType::PLAN_ROOT_SINK) {
+    root_sink_ = reinterpret_cast<PlanRootSink*>(sink_);
+    // Release the thread token on the root fragment instance. This fragment 
spends most
+    // of the time waiting and doing very little work. Holding on to the token 
causes
+    // underutilization of the machine. If there are 12 queries on this node, 
that's 12
+    // tokens reserved for no reason.
+    ReleaseThreadToken();
+  }
+
+  if (runtime_state_->ShouldCodegen()) {
+    RETURN_IF_ERROR(runtime_state_->CreateCodegen());
+    exec_tree_->Codegen(runtime_state_);
+    // It shouldn't be fatal to fail codegen. However, until IMPALA-4233 is 
fixed,
+    // ScalarFnCall has no fall back to interpretation when codegen fails so 
propagates
+    // the error status for now.
+    RETURN_IF_ERROR(runtime_state_->CodegenScalarFns());
+  }
+
+  // set up profile counters
+  profile()->AddChild(exec_tree_->runtime_profile());
+  rows_produced_counter_ =
+      ADD_COUNTER(profile(), "RowsProduced", TUnit::UNIT);
+  per_host_mem_usage_ =
+      ADD_COUNTER(profile(), PER_HOST_PEAK_MEM_COUNTER, TUnit::BYTES);
+
+  row_batch_.reset(
+      new RowBatch(exec_tree_->row_desc(), runtime_state_->batch_size(),
+        runtime_state_->instance_mem_tracker()));
+  VLOG(2) << "plan_root=\n" << exec_tree_->DebugString();
+
+  // We need to start the profile-reporting thread before calling Open(),
+  // since it may block.
+  if (FLAGS_status_report_interval > 0) {
+    unique_lock<mutex> l(report_thread_lock_);
+    report_thread_.reset(
+        new Thread("plan-fragment-executor", "report-profile",
+            &FragmentInstanceState::ReportProfileThread, this));
+    // Make sure the thread started up, otherwise ReportProfileThread() might 
get into
+    // a race with StopReportThread().
+    while (!report_thread_active_) report_thread_started_cv_.wait(l);
+  }
+
   return Status::OK();
 }
 
-void FragmentInstanceState::Exec() {
-  Status status =
-      executor_.Prepare(query_state_, desc_tbl_, fragment_ctx_, instance_ctx_);
-  prepare_promise_.Set(status);
-  if (status.ok()) {
-    if (executor_.Open().ok()) {
-      executor_.Exec();
-    }
+Status FragmentInstanceState::Open() {
+  DCHECK(prepared_promise_.IsSet());
+  DCHECK(!opened_promise_.IsSet());
+  SCOPED_TIMER(profile()->total_time_counter());
+  SCOPED_TIMER(ADD_TIMER(timings_profile_, OPEN_TIMER_NAME));
+  SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
+
+  // codegen prior to exec_tree_->Open()
+  if (runtime_state_->ShouldCodegen()) {
+    LlvmCodeGen* codegen = runtime_state_->codegen();
+    DCHECK(codegen != nullptr);
+    RETURN_IF_ERROR(codegen->FinalizeModule());
+  }
+
+  {
+    SCOPED_TIMER(ADD_CHILD_TIMER(timings_profile_, "ExecTreeOpenTime", 
OPEN_TIMER_NAME));
+    RETURN_IF_ERROR(exec_tree_->Open(runtime_state_));
   }
-  executor_.Close();
-}
-
-void FragmentInstanceState::ReportStatusCb(
-    const Status& status, RuntimeProfile* profile, bool done) {
-  DCHECK(status.ok() || done);  // if !status.ok() => done
-  Status exec_status = UpdateStatus(status);
-
-  Status coord_status;
-  ImpalaBackendConnection coord(
-      ExecEnv::GetInstance()->impalad_client_cache(), coord_address(), 
&coord_status);
-  if (!coord_status.ok()) {
-    stringstream s;
-    s << "Couldn't get a client for " << coord_address() <<"\tReason: "
-      << coord_status.GetDetail();
-    UpdateStatus(Status(ErrorMsg(TErrorCode::INTERNAL_ERROR, s.str())));
-    return;
-  }
-
-  TReportExecStatusParams params;
-  params.protocol_version = ImpalaInternalServiceVersion::V1;
-  params.__set_query_id(query_state_->query_ctx().query_id);
-  params.__set_fragment_instance_id(instance_ctx_.fragment_instance_id);
-  exec_status.SetTStatus(&params);
-  params.__set_done(done);
-
-  if (profile != NULL) {
-    profile->ToThrift(&params.profile);
-    params.__isset.profile = true;
-  }
-
-  RuntimeState* runtime_state = executor_.runtime_state();
-  // If executor_ did not successfully prepare, runtime state may not have 
been set.
-  if (runtime_state != NULL) {
-    // Only send updates to insert status if fragment is finished, the 
coordinator
-    // waits until query execution is done to use them anyhow.
-    if (done) {
-      TInsertExecStatus insert_status;
-
-      if (runtime_state->hdfs_files_to_move()->size() > 0) {
-        
insert_status.__set_files_to_move(*runtime_state->hdfs_files_to_move());
-      }
-      if (runtime_state->per_partition_status()->size() > 0) {
-        
insert_status.__set_per_partition_status(*runtime_state->per_partition_status());
-      }
-
-      params.__set_insert_exec_status(insert_status);
+  return sink_->Open(runtime_state_);
+}
+
+Status FragmentInstanceState::ExecInternal() {
+  RuntimeProfile::Counter* plan_exec_timer =
+      ADD_CHILD_TIMER(timings_profile_, "ExecTreeExecTime", EXEC_TIMER_NAME);
+  SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
+  bool exec_tree_complete = false;
+  do {
+    Status status;
+    row_batch_->Reset();
+    {
+      SCOPED_TIMER(plan_exec_timer);
+      RETURN_IF_ERROR(
+          exec_tree_->GetNext(runtime_state_, row_batch_.get(), 
&exec_tree_complete));
     }
+    if (VLOG_ROW_IS_ON) 
row_batch_->VLogRows("FragmentInstanceState::ExecInternal()");
+    COUNTER_ADD(rows_produced_counter_, row_batch_->num_rows());
+    RETURN_IF_ERROR(sink_->Send(runtime_state_, row_batch_.get()));
+  } while (!exec_tree_complete);
+
+  // Flush the sink *before* stopping the report thread. Flush may need to add 
some
+  // important information to the last report that gets sent. (e.g. table 
sinks record the
+  // files they have written to in this method)
+  RETURN_IF_ERROR(sink_->FlushFinal(runtime_state()));
+  return Status::OK();
+}
 
-    // Send new errors to coordinator
-    runtime_state->GetUnreportedErrors(&(params.error_log));
-  }
-  params.__isset.error_log = (params.error_log.size() > 0);
-
-  TReportExecStatusResult res;
-  Status rpc_status;
-  bool retry_is_safe;
-  // Try to send the RPC 3 times before failing.
-  for (int i = 0; i < 3; ++i) {
-    rpc_status = coord.DoRpc(
-        &ImpalaBackendClient::ReportExecStatus, params, &res, &retry_is_safe);
-    if (rpc_status.ok()) {
-      rpc_status = Status(res.status);
-      break;
+void FragmentInstanceState::Close() {
+  DCHECK(!report_thread_active_);
+  DCHECK(runtime_state_ != nullptr);
+
+  // guard against partially-finished Prepare()
+  if (sink_ != nullptr) sink_->Close(runtime_state_);
+
+  // disconnect mem_usage_sampled_counter_ from the periodic updater before
+  // RuntimeState::ReleaseResources(), it references the instance memtracker
+  if (mem_usage_sampled_counter_ != nullptr) {
+    PeriodicCounterUpdater::StopTimeSeriesCounter(mem_usage_sampled_counter_);
+    mem_usage_sampled_counter_ = nullptr;
+  }
+
+  // We need to delete row_batch_ here otherwise we can't delete the 
instance_mem_tracker_
+  // in runtime_state_->ReleaseResources().
+  // TODO: do not delete mem trackers in Close()/ReleaseResources(), they are 
part of
+  // the control structures we need to preserve until the underlying QueryState
+  // disappears.
+  row_batch_.reset();
+  if (exec_tree_ != nullptr) exec_tree_->Close(runtime_state_);
+  runtime_state_->ReleaseResources();
+
+  // Sanity timer checks
+#ifndef NDEBUG
+  if (profile() != nullptr && timings_profile_ != nullptr) {
+    int64_t total_time = profile()->total_time_counter()->value();
+    int64_t other_time = 0;
+    for (auto& name: {PREPARE_TIMER_NAME, OPEN_TIMER_NAME, EXEC_TIMER_NAME}) {
+      RuntimeProfile::Counter* counter = timings_profile_->GetCounter(name);
+      if (counter != nullptr) other_time += counter->value();
     }
-    if (!retry_is_safe) break;
-    if (i < 2) SleepForMs(100);
+    // TODO: IMPALA-4631: Occasionally we see other_time = total_time + 1 for 
some reason
+    // we don't yet understand, so add 1 to total_time to avoid DCHECKing in 
that case.
+    DCHECK_LE(other_time, total_time + 1);
+  }
+#endif
+}
+
+void FragmentInstanceState::ReportProfileThread() {
+  VLOG_FILE << "ReportProfileThread(): instance_id=" << PrintId(instance_id());
+  unique_lock<mutex> l(report_thread_lock_);
+  // tell Prepare() that we started
+  report_thread_active_ = true;
+  report_thread_started_cv_.notify_one();
+
+  // Jitter the reporting time of remote fragments by a random amount between
+  // 0 and the report_interval.  This way, the coordinator doesn't get all the
+  // updates at once so its better for contention as well as smoother progress
+  // reporting.
+  int report_fragment_offset = rand() % FLAGS_status_report_interval;
+  boost::system_time timeout = boost::get_system_time()
+      + boost::posix_time::seconds(report_fragment_offset);
+  // We don't want to wait longer than it takes to run the entire fragment.
+  stop_report_thread_cv_.timed_wait(l, timeout);
+
+  while (report_thread_active_) {
+    boost::system_time timeout = boost::get_system_time()
+        + boost::posix_time::seconds(FLAGS_status_report_interval);
+
+    // timed_wait can return because the timeout occurred or the condition 
variable
+    // was signaled.  We can't rely on its return value to distinguish between 
the
+    // two cases (e.g. there is a race here where the wait timed out but 
before grabbing
+    // the lock, the condition variable was signaled).  Instead, we will use 
an external
+    // flag, report_thread_active_, to coordinate this.
+    stop_report_thread_cv_.timed_wait(l, timeout);
+
+    if (!report_thread_active_) break;
+    SendReport(false, Status::OK());
+  }
+
+  VLOG_FILE << "exiting reporting thread: instance_id=" << instance_id();
+}
+
+void FragmentInstanceState::SendReport(bool done, const Status& status) {
+  DCHECK(status.ok() || done);
+  DCHECK(runtime_state_ != nullptr);
+
+  if (VLOG_FILE_IS_ON) {
+    VLOG_FILE << "Reporting " << (done ? "final " : "") << "profile for 
instance "
+        << runtime_state_->fragment_instance_id();
+    stringstream ss;
+    profile()->PrettyPrint(&ss);
+    VLOG_FILE << ss.str();
+  }
+
+  // Update the counter for the peak per host mem usage.
+  if (per_host_mem_usage_ != nullptr) {
+    
per_host_mem_usage_->Set(runtime_state()->query_mem_tracker()->peak_consumption());
+  }
+
+  query_state_->ReportExecStatus(done, status, this);
+}
+
+void FragmentInstanceState::StopReportThread() {
+  if (!report_thread_active_) return;
+  {
+    lock_guard<mutex> l(report_thread_lock_);
+    report_thread_active_ = false;
   }
-  if (!rpc_status.ok()) {
-    UpdateStatus(rpc_status);
-    executor_.Cancel();
+  stop_report_thread_cv_.notify_one();
+  report_thread_->Join();
+}
+
+void FragmentInstanceState::Finalize(const Status& status) {
+  if (fragment_ctx_.fragment.output_sink.type != 
TDataSinkType::PLAN_ROOT_SINK) {
+    // if we haven't already release this thread token in Prepare(), release 
it now
+    ReleaseThreadToken();
+  }
+  StopReportThread();
+  // It's safe to send final report now that the reporting thread is stopped.
+  SendReport(true, status);
+}
+
+void FragmentInstanceState::ReleaseThreadToken() {
+  DCHECK(runtime_state_ != nullptr);
+  DCHECK(runtime_state_->resource_pool() != nullptr);
+  runtime_state_->resource_pool()->ReleaseThreadToken(true);
+  if (avg_thread_tokens_ != nullptr) {
+    PeriodicCounterUpdater::StopSamplingCounter(avg_thread_tokens_);
+  }
+  if (thread_usage_sampled_counter_ != nullptr) {
+    
PeriodicCounterUpdater::StopTimeSeriesCounter(thread_usage_sampled_counter_);
   }
 }
 
+Status FragmentInstanceState::WaitForPrepare() {
+  return prepared_promise_.Get();
+}
+
+bool FragmentInstanceState::IsPrepared() {
+  return prepared_promise_.IsSet();
+}
+
+Status FragmentInstanceState::WaitForOpen() {
+  return opened_promise_.Get();
+}
+
 void FragmentInstanceState::PublishFilter(
     int32_t filter_id, const TBloomFilter& thrift_bloom_filter) {
   VLOG_FILE << "PublishFilter(): instance_id=" << PrintId(instance_id())
             << " filter_id=" << filter_id;
-  // Defensively protect against blocking forever in case there's some problem 
with
-  // Prepare().
-  static const int WAIT_MS = 30000;
-  bool timed_out = false;
   // Wait until Prepare() is done, so we know that the filter bank is set up.
-  // TODO: get rid of concurrency in the setup phase as part of the per-query 
exec rpc
-  Status prepare_status = prepare_promise_.Get(WAIT_MS, &timed_out);
-  if (timed_out) {
-    LOG(ERROR) << "Unexpected timeout in PublishFilter()";
-    return;
-  }
-  if (!prepare_status.ok()) return;
-  executor_.runtime_state()->filter_bank()->PublishGlobalFilter(
-      filter_id, thrift_bloom_filter);
+  if (!WaitForPrepare().ok()) return;
+  runtime_state_->filter_bank()->PublishGlobalFilter(filter_id, 
thrift_bloom_filter);
 }
 
 const TQueryCtx& FragmentInstanceState::query_ctx() const {
   return query_state_->query_ctx();
 }
+
+ObjectPool* FragmentInstanceState::obj_pool() {
+  return query_state_->obj_pool();
+}
+
+RuntimeProfile* FragmentInstanceState::profile() const {
+  return runtime_state_->runtime_profile();
+}
+
+void FragmentInstanceState::PrintVolumeIds() {
+  if (instance_ctx_.per_node_scan_ranges.empty()) return;
+
+  HdfsScanNodeBase::PerVolumeStats per_volume_stats;
+  for (const PerNodeScanRanges::value_type& entry: 
instance_ctx_.per_node_scan_ranges) {
+    HdfsScanNodeBase::UpdateHdfsSplitStats(entry.second, &per_volume_stats);
+  }
+
+  stringstream str;
+  HdfsScanNodeBase::PrintHdfsSplitStats(per_volume_stats, &str);
+  profile()->AddInfoString(HdfsScanNodeBase::HDFS_SPLIT_STATS_DESC, str.str());
+  VLOG_FILE
+      << "Hdfs split stats (<volume id>:<# splits>/<split lengths>) for query="
+      << query_id() << ":\n" << str.str();
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/fragment-instance-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.h 
b/be/src/runtime/fragment-instance-state.h
index c938a31..28b8a54 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -19,11 +19,17 @@
 #ifndef IMPALA_RUNTIME_FRAGMENT_INSTANCE_STATE_H
 #define IMPALA_RUNTIME_FRAGMENT_INSTANCE_STATE_H
 
+#include <string>
+#include <boost/scoped_ptr.hpp>
+#include <boost/thread/mutex.hpp>
+
 #include "common/status.h"
 #include "util/promise.h"
-#include "runtime/plan-fragment-executor.h"
 
 #include "gen-cpp/ImpalaInternalService_types.h"
+#include "runtime/row-batch.h"
+#include "util/promise.h"
+#include "util/runtime-profile.h"
 
 namespace impala {
 
@@ -32,85 +38,211 @@ class TPlanFragmentInstanceCtx;
 class TBloomFilter;
 class TUniqueId;
 class TNetworkAddress;
+class TQueryCtx;
 class QueryState;
-class PlanFragmentExecutor;
 class RuntimeProfile;
-
-/// Collection of state specific to the execution of a particular fragment 
instance,
-/// as well as manager of the execution of that fragment instance, including
-/// set-up and tear-down.
-/// Tear-down happens automatically in the d'tor and frees all memory 
allocated for
-/// this fragment instance and closes all data streams.
+class ExecNode;
+class PlanRootSink;
+class Thread;
+class DataSink;
+class RuntimeState;
+
+/// FragmentInstanceState handles all aspects of the execution of a single 
plan fragment
+/// instance, including setup and finalization, both in the success and error 
case.
+/// Close() happens automatically at the end of Exec() and frees all memory 
allocated
+/// for this fragment instance and closes all data streams.
+///
+/// The FIS makes an aggregated profile for the entire fragment available, 
which
+/// includes profile information for the plan itself as well as the output 
sink.
+/// The FIS periodically makes a ReportExecStatus RPC to the coordinator to 
report the
+/// execution status and profile. The frequency of those reports is controlled 
by the flag
+/// status_report_interval; setting that flag to 0 disables periodic reporting 
altogether
+/// Regardless of the value of that flag, a report is sent at least once at 
the end of
+/// execution with an overall status and profile (and 'done' indicator).
+/// The FIS will send at least one final status report. If execution ended 
with an error,
+/// that error status will be part of the final report (it will not be 
overridden by
+/// the resulting cancellation).
 ///
-/// Aside from Cancel(), which may be called asynchronously, this class is not
-/// thread-safe.
+/// This class is thread-safe.
+/// All non-getter public functions other than Exec() block until the Prepare 
phase
+/// finishes.
+/// No member variables, other than the ones passed to the c'tor, are valid 
before
+/// the Prepare phase finishes.
 ///
 /// TODO:
-/// - merge PlanFragmentExecutor into this class
-/// - move tear-down logic out of d'tor and into ReleaseResources() function
-/// - as part of the per-query exec rpc, get rid of concurrency during setup
-///   (and remove prepare_promise_)
-/// - move ReportStatusCb() logic into PFE::SendReport() and get rid of the 
callback
+/// - absorb RuntimeState?
+/// - should WaitForPrepare/Open() return the overall execution status, if 
there
+///   was a failure?
 class FragmentInstanceState {
  public:
   FragmentInstanceState(QueryState* query_state, const TPlanFragmentCtx& 
fragment_ctx,
-      const TPlanFragmentInstanceCtx& instance_ctx, const TDescriptorTable& 
desc_tbl);
+      const TPlanFragmentInstanceCtx& instance_ctx);
 
-  /// Frees up all resources allocated in Exec().
-  /// It is an error to delete a FragmentInstanceState before Exec() returns.
-  ~FragmentInstanceState() { }
+  /// Main loop of fragment instance execution. Blocks until execution 
finishes and
+  /// automatically releases resources. Returns execution status.
+  /// Must only be called once.
+  Status Exec() WARN_UNUSED_RESULT;
 
-  /// Main loop of plan fragment execution. Blocks until execution finishes.
-  void Exec();
+  /// Cancels execution and sends a final status report. Idempotent.
+  void Cancel();
 
-  /// Returns current execution status, if there was an error. Otherwise 
cancels
-  /// the fragment and returns OK.
-  Status Cancel();
+  /// Blocks until the Prepare phase of Exec() is finished and returns the 
status.
+  Status WaitForPrepare();
+
+  /// Returns true if the Prepare phase of Exec() is finished.
+  bool IsPrepared();
+
+  /// Blocks until the Prepare phase of Exec() is finished and the exec tree is
+  /// opened, and returns that status. If the preparation phase encountered an 
error,
+  /// GetOpenStatus() will return that error without blocking.
+  Status WaitForOpen();
 
   /// Publishes filter with ID 'filter_id' to this fragment instance's filter 
bank.
   void PublishFilter(int32_t filter_id, const TBloomFilter& 
thrift_bloom_filter);
 
+  /// Returns fragment instance's sink if this is the root fragment instance. 
Valid after
+  /// the Prepare phase. May be nullptr.
+  PlanRootSink* root_sink() { return root_sink_; }
+
+  /// Name of the counter that is tracking per query, per host peak mem usage.
+  /// TODO: this doesn't look like it belongs here
+  static const std::string PER_HOST_PEAK_MEM_COUNTER;
+
   QueryState* query_state() { return query_state_; }
-  PlanFragmentExecutor* executor() { return &executor_; }
+  RuntimeState* runtime_state() { return runtime_state_; }
+  RuntimeProfile* profile() const;
   const TQueryCtx& query_ctx() const;
   const TPlanFragmentCtx& fragment_ctx() const { return fragment_ctx_; }
   const TPlanFragmentInstanceCtx& instance_ctx() const { return instance_ctx_; 
}
   const TUniqueId& query_id() const { return query_ctx().query_id; }
   const TUniqueId& instance_id() const { return 
instance_ctx_.fragment_instance_id; }
   const TNetworkAddress& coord_address() const { return 
query_ctx().coord_address; }
+  ObjectPool* obj_pool();
 
  private:
   QueryState* query_state_;
-  const TPlanFragmentCtx fragment_ctx_;
-  const TPlanFragmentInstanceCtx instance_ctx_;
-
-  /// instance-specific descriptor table
-  /// TODO: remove when switching to per-query exec rpc
-  const TDescriptorTable desc_tbl_;
-
-  PlanFragmentExecutor executor_;
-
-  /// protects exec_status_
-  boost::mutex status_lock_;
-
-  /// set in ReportStatusCb();
-  /// if set to anything other than OK, execution has terminated w/ an error
-  Status exec_status_;
-
-  /// Barrier for the completion of executor_.Prepare().
-  Promise<Status> prepare_promise_;
-
-  /// Update 'exec_status_' w/ 'status', if the former is not already an error.
-  /// Returns the value of 'exec_status_' after this method completes.
-  Status UpdateStatus(const Status& status);
-
-  /// Callback for executor; updates exec_status_ if 'status' indicates an 
error
-  /// or if there was a thrift error.
-  /// If not NULL, `profile` is encoded as a Thrift structure and transmitted 
as part of
-  /// the reporting RPC. `profile` may be NULL if a runtime profile has not 
been created
-  /// for this fragment (e.g. when the fragment has failed during preparation).
-  /// The executor must ensure that there is only one invocation at a time.
-  void ReportStatusCb(const Status& status, RuntimeProfile* profile, bool 
done);
+  const TPlanFragmentCtx& fragment_ctx_;
+  const TPlanFragmentInstanceCtx& instance_ctx_;
+
+  /// All following member variables that are initialized to nullptr are set
+  /// in Prepare().
+
+  ExecNode* exec_tree_ = nullptr; // lives in obj_pool()
+  RuntimeState* runtime_state_ = nullptr;  // lives in obj_pool()
+
+  /// profile reporting-related
+  boost::scoped_ptr<Thread> report_thread_;
+  boost::mutex report_thread_lock_;
+
+  /// Indicates that profile reporting thread should stop.
+  /// Tied to report_thread_lock_.
+  boost::condition_variable stop_report_thread_cv_;
+
+  /// Indicates that profile reporting thread started.
+  /// Tied to report_thread_lock_.
+  boost::condition_variable report_thread_started_cv_;
+
+  /// When the report thread starts, it sets report_thread_active_ to true and 
signals
+  /// report_thread_started_cv_. The report thread is shut down by setting
+  /// report_thread_active_ to false and signalling stop_report_thread_cv_. 
Protected
+  /// by report_thread_lock_.
+  bool report_thread_active_ = false;
+
+  /// Profile for timings for each stage of the plan fragment instance's 
lifecycle.
+  /// Lives in obj_pool().
+  RuntimeProfile* timings_profile_ = nullptr;
+
+  /// Output sink for rows sent to this fragment. Created in Prepare(), lives 
in
+  /// obj_pool().
+  DataSink* sink_ = nullptr;
+
+  /// Set if this fragment instance is the root of the entire plan, so that a 
consumer can
+  /// pull results by calling root_sink_->GetNext(). Same object as sink_.
+  PlanRootSink* root_sink_ = nullptr;
+
+  /// should live in obj_pool(), but managed separately so we can delete it in 
Close()
+  boost::scoped_ptr<RowBatch> row_batch_;
+
+  /// Set when Prepare() returns.
+  Promise<Status> prepared_promise_;
+
+  /// Set when OpenInternal() returns.
+  Promise<Status> opened_promise_;
+
+  /// A counter for the per query, per host peak mem usage. Note that this is 
not the
+  /// max of the peak memory of all fragments running on a host since it needs 
to take
+  /// into account when they are running concurrently. All fragments for a 
single query
+  /// on a single host will have the same value for this counter.
+  RuntimeProfile::Counter* per_host_mem_usage_ = nullptr;
+
+  /// Number of rows returned by this fragment
+  /// TODO: by this instance?
+  RuntimeProfile::Counter* rows_produced_counter_ = nullptr;
+
+  /// Average number of thread tokens for the duration of the fragment 
instance execution.
+  /// Instances that do a lot of cpu work (non-coordinator fragment) will have 
at
+  /// least 1 token.  Instances that contain a hdfs scan node will have 1+ 
tokens
+  /// depending on system load.  Other nodes (e.g. hash join node) can also 
reserve
+  /// additional tokens.
+  /// This is a measure of how much CPU resources this instance used during 
the course
+  /// of the execution.
+  /// TODO-MT: remove
+  RuntimeProfile::Counter* avg_thread_tokens_ = nullptr;
+
+  /// Sampled memory usage at even time intervals.
+  RuntimeProfile::TimeSeriesCounter* mem_usage_sampled_counter_ = nullptr;
+
+  /// Sampled thread usage (tokens) at even time intervals.
+  RuntimeProfile::TimeSeriesCounter* thread_usage_sampled_counter_ = nullptr;
+
+  /// Prepare for execution. runtime_state() will not be valid until Prepare() 
is called.
+  /// runtime_state() will always be valid after Prepare() returns.
+  /// If request.query_options.mem_limit > 0, it is used as an
+  /// approximate limit on the number of bytes this query can consume at 
runtime.  The
+  /// query will be aborted (MEM_LIMIT_EXCEEDED) if it goes over that limit.
+  ///
+  /// A failure in Prepare() will result in partially-initialized state.
+  Status Prepare() WARN_UNUSED_RESULT;
+
+  /// Executes Open() logic and returns resulting status.
+  Status Open() WARN_UNUSED_RESULT;
+
+  /// Pulls row batches from exec_tree_ and pushes them to sink_ in a loop. 
Returns
+  /// OK if the input was exhausted and sent to the sink successfully, an 
error otherwise.
+  /// If ExecInternal() returns without an error condition, all rows will have 
been sent
+  /// to the sink and the sink will have been flushed.
+  Status ExecInternal() WARN_UNUSED_RESULT;
+
+  /// Closes the underlying fragment instance and frees up all resources 
allocated in
+  /// Prepare() and Open(). Assumes the report thread is stopped. Can handle
+  /// partially-finished Prepare().
+  void Close();
+
+  /// Main loop of profile reporting thread.
+  /// Exits when notified on stop_report_thread_cv_ and report_thread_active_ 
is set to
+  /// false. This will not send the final report.
+  void ReportProfileThread();
+
+  /// Invoked the report callback. If 'done' is true, sends the final report 
with
+  /// 'status' and the profile. This type of report is sent once and only by 
the
+  /// instance execution thread.  Otherwise, a profile-only report is sent, 
which the
+  /// ReportProfileThread() thread will do periodically.
+  void SendReport(bool done, const Status& status);
+
+  /// Called when execution is complete to finalize counters and send the 
final status
+  /// report.  Must be called only once. Can handle partially-finished 
Prepare().
+  void Finalize(const Status& status);
+
+  /// Releases the thread token for this fragment executor. Can handle
+  /// partially-finished Prepare().
+  void ReleaseThreadToken();
+
+  /// Stops report thread, if one is running. Blocks until report thread 
terminates.
+  /// Idempotent.
+  void StopReportThread();
+
+  /// Print stats about scan ranges for each volumeId in params to info log.
+  void PrintVolumeIds();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/mem-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index cfd1881..4ad9b26 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -151,7 +151,7 @@ MemTracker* 
PoolMemTrackerRegistry::GetRequestPoolMemTracker(
   lock_guard<SpinLock> l(pool_to_mem_trackers_lock_);
   PoolTrackersMap::iterator it = pool_to_mem_trackers_.find(pool_name);
   if (it != pool_to_mem_trackers_.end()) {
-    MemTracker* tracker = it->second;
+    MemTracker* tracker = it->second.get();
     DCHECK(pool_name == tracker->pool_name_);
     return tracker;
   }
@@ -161,7 +161,7 @@ MemTracker* 
PoolMemTrackerRegistry::GetRequestPoolMemTracker(
       new MemTracker(-1, Substitute(REQUEST_POOL_MEM_TRACKER_LABEL_FORMAT, 
pool_name),
           ExecEnv::GetInstance()->process_mem_tracker());
   tracker->pool_name_ = pool_name;
-  pool_to_mem_trackers_[pool_name] = tracker;
+  pool_to_mem_trackers_.emplace(pool_name, unique_ptr<MemTracker>(tracker));
   return tracker;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index 863d3c7..e3282c7 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -446,7 +446,7 @@ class PoolMemTrackerRegistry {
   /// All per-request pool MemTracker objects. It is assumed that request 
pools will live
   /// for the entire duration of the process lifetime so MemTrackers are never 
removed
   /// from this map. Protected by 'pool_to_mem_trackers_lock_'
-  typedef boost::unordered_map<std::string, MemTracker*> PoolTrackersMap;
+  typedef boost::unordered_map<std::string, std::unique_ptr<MemTracker>> 
PoolTrackersMap;
   PoolTrackersMap pool_to_mem_trackers_;
   /// IMPALA-3068: Use SpinLock instead of boost::mutex so that the lock won't
   /// automatically destroy itself as part of process teardown, which could 
cause races.

Reply via email to