http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 34e204d..88ee934 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -69,6 +69,7 @@ class TSessionState;
 class TQueryOptions;
 class TGetExecSummaryResp;
 class TGetExecSummaryReq;
+class ClientRequestState;
 
 /// An ImpalaServer contains both frontend and backend functionality;
 /// it implements ImpalaService (Beeswax), ImpalaHiveServer2Service 
(HiveServer2)
@@ -82,14 +83,14 @@ class TGetExecSummaryReq;
 /// 1. session_state_map_lock_
 /// 2. SessionState::lock
 /// 3. query_expiration_lock_
-/// 4. query_exec_state_map_lock_
-/// 5. QueryExecState::fetch_rows_lock
-/// 6. QueryExecState::lock
-/// 7. QueryExecState::expiration_data_lock_
+/// 4. client_request_state_map_lock_
+/// 5. ClientRequestState::fetch_rows_lock
+/// 6. ClientRequestState::lock
+/// 7. ClientRequestState::expiration_data_lock_
 /// 8. Coordinator::exec_summary_lock
 ///
 /// Coordinator::lock_ should not be acquired at the same time as the
-/// ImpalaServer/SessionState/QueryExecState locks. Aside from
+/// ImpalaServer/SessionState/ClientRequestState locks. Aside from
 /// Coordinator::exec_summary_lock_ the Coordinator's lock ordering is 
independent of
 /// the above lock ordering.
 ///
@@ -142,7 +143,8 @@ class ImpalaServer : public ImpalaServiceIf, public 
ImpalaHiveServer2ServiceIf,
   /// These APIs will not be implemented because ODBC driver does not use them.
   virtual void dump_config(std::string& config);
 
-  /// ImpalaService rpcs: extensions over Beeswax (implemented in 
impala-beeswax-server.cc)
+  /// ImpalaService rpcs: extensions over Beeswax (implemented in
+  /// impala-beeswax-server.cc)
   virtual void Cancel(impala::TStatus& status, const beeswax::QueryHandle& 
query_id);
   virtual void CloseInsert(impala::TInsertResult& insert_result,
       const beeswax::QueryHandle& query_handle);
@@ -156,7 +158,8 @@ class ImpalaServer : public ImpalaServiceIf, public 
ImpalaHiveServer2ServiceIf,
   virtual void GetExecSummary(impala::TExecSummary& result,
       const beeswax::QueryHandle& query_id);
 
-  /// Performs a full catalog metadata reset, invalidating all table and 
database metadata.
+  /// Performs a full catalog metadata reset, invalidating all table and 
database
+  /// metadata.
   virtual void ResetCatalog(impala::TStatus& status);
 
   /// Resets the specified table's catalog metadata, forcing a reload on the 
next access.
@@ -229,7 +232,6 @@ class ImpalaServer : public ImpalaServiceIf, public 
ImpalaHiveServer2ServiceIf,
       apache::hive::service::cli::thrift::TRenewDelegationTokenResp& 
return_val,
       const apache::hive::service::cli::thrift::TRenewDelegationTokenReq& req);
 
-  /// ImpalaService common extensions (implemented in impala-server.cc)
   /// ImpalaInternalService rpcs
   void ReportExecStatus(TReportExecStatusResult& return_val,
       const TReportExecStatusParams& params);
@@ -247,8 +249,8 @@ class ImpalaServer : public ImpalaServiceIf, public 
ImpalaHiveServer2ServiceIf,
   /// SessionHandlerIf methods
 
   /// Called when a Beeswax or HS2 connection starts. For Beeswax, registers a 
new
-  /// SessionState associated with the new connection. For HS2, this is a 
no-op (HS2 has an
-  /// explicit CreateSession RPC).
+  /// SessionState associated with the new connection. For HS2, this is a 
no-op (HS2
+  /// has an explicit CreateSession RPC).
   virtual void ConnectionStart(const ThriftServer::ConnectionContext& 
session_context);
 
   /// Called when a Beeswax or HS2 connection terminates. Unregisters all 
sessions
@@ -267,6 +269,22 @@ class ImpalaServer : public ImpalaServiceIf, public 
ImpalaHiveServer2ServiceIf,
   void CatalogUpdateCallback(const StatestoreSubscriber::TopicDeltaMap& 
topic_deltas,
       std::vector<TTopicDelta>* topic_updates);
 
+  /// Processes a CatalogUpdateResult returned from the CatalogServer and 
ensures
+  /// the update has been applied to the local impalad's catalog cache. If
+  /// wait_for_all_subscribers is true, this function will also wait until all
+  /// catalog topic subscribers have processed the update. Called from 
ClientRequestState
+  /// after executing any statement that modifies the catalog.
+  /// If wait_for_all_subscribers is false AND if the TCatalogUpdateResult 
contains
+  /// TCatalogObject(s) to add and/or remove, this function will update the 
local cache
+  /// by directly calling UpdateCatalog() with the TCatalogObject results.
+  /// Otherwise this function will wait until the local impalad's catalog 
cache has been
+  /// updated from a statestore heartbeat that includes this catalog update's 
catalog
+  /// version. If wait_for_all_subscribers is true, this function also wait 
all other
+  /// catalog topic subscribers to process this update by checking the current
+  /// min_subscriber_topic_version included in each state store heartbeat.
+  Status ProcessCatalogUpdateResult(const TCatalogUpdateResult& 
catalog_update_result,
+      bool wait_for_all_subscribers) WARN_UNUSED_RESULT;
+
   /// Returns true if lineage logging is enabled, false otherwise.
   bool IsLineageLoggingEnabled();
 
@@ -282,17 +300,96 @@ class ImpalaServer : public ImpalaServiceIf, public 
ImpalaHiveServer2ServiceIf,
   /// The prefix of audit event log filename.
   static const string AUDIT_EVENT_LOG_FILE_PREFIX;
 
+  /// Per-session state.  This object is reference counted using shared_ptrs.  
There
+  /// is one ref count in the SessionStateMap for as long as the session is 
active.
+  /// All queries running from this session also have a reference.
+  struct SessionState {
+    /// The default hs2_version must be V1 so that child queries (which use 
HS2, but may
+    /// run as children of Beeswax sessions) get results back in the expected 
format -
+    /// child queries inherit the HS2 version from their parents, and a 
Beeswax session
+    /// will never update the HS2 version from the default.
+    SessionState() : closed(false), expired(false),
+        hs2_version(apache::hive::service::cli::thrift::
+        TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V1), total_queries(0), 
ref_count(0) {
+    }
+
+    TSessionType::type session_type;
+
+    /// Time the session was created, in ms since epoch (UTC).
+    int64_t start_time_ms;
+
+    /// Connected user for this session, i.e. the user which originated this 
session.
+    std::string connected_user;
+
+    /// The user to delegate to. Empty for no delegation.
+    std::string do_as_user;
+
+    /// Client network address.
+    TNetworkAddress network_address;
+
+    /// Protects all fields below. See "Locking" in the class comment for lock
+    /// acquisition order.
+    boost::mutex lock;
+
+    /// If true, the session has been closed.
+    bool closed;
+
+    /// If true, the session was idle for too long and has been expired. Only 
set when
+    /// ref_count == 0, after which point ref_count should never become 
non-zero (since
+    /// clients will use ScopedSessionState to access the session, which will 
prevent use
+    /// after expiration).
+    bool expired;
+
+    /// The default database (changed as a result of 'use' query execution).
+    std::string database;
+
+    /// The default query options of this session. When the session is 
created, the
+    /// session inherits the global defaults from 
ImpalaServer::default_query_options_.
+    TQueryOptions default_query_options;
+
+    /// BitSet indicating which query options in default_query_options have 
been
+    /// explicitly set in the session. Updated when a query option is 
specified using a
+    /// SET command: the bit corresponding to the TImpalaQueryOptions enum is 
set.
+    QueryOptionsMask set_query_options_mask;
+
+    /// For HS2 only, the protocol version this session is expecting.
+    apache::hive::service::cli::thrift::TProtocolVersion::type hs2_version;
+
+    /// Inflight queries belonging to this session
+    boost::unordered_set<TUniqueId> inflight_queries;
+
+    /// Total number of queries run as part of this session.
+    int64_t total_queries;
+
+    /// Time the session was last accessed, in ms since epoch (UTC).
+    int64_t last_accessed_ms;
+
+    /// The latest Kudu timestamp observed after DML operations executed 
within this
+    /// session.
+    uint64_t kudu_latest_observed_ts;
+
+    /// Number of RPCs concurrently accessing this session state. Used to 
detect when a
+    /// session may be correctly expired after a timeout (when ref_count == 
0). Typically
+    /// at most one RPC will be issued against a session at a time, but 
clients may do
+    /// something unexpected and, for example, poll in one thread and fetch in 
another.
+    uint32_t ref_count;
+
+    /// Per-session idle timeout in seconds. Default value is 
FLAGS_idle_session_timeout.
+    /// It can be overridden with a smaller value via the option 
"idle_session_timeout"
+    /// when opening a HS2 session.
+    int32_t session_timeout;
+
+    /// Builds a Thrift representation of this SessionState for serialisation 
to
+    /// the frontend.
+    void ToThrift(const TUniqueId& session_id, TSessionState* session_state);
+  };
+
  private:
   friend class ChildQuery;
   friend class ImpalaHttpHandler;
 
   boost::scoped_ptr<ImpalaHttpHandler> http_handler_;
 
-  struct SessionState;
-
-  /// Execution state of a query.
-  class QueryExecState;
-
   /// Relevant ODBC SQL State code; for more info,
   /// goto http://msdn.microsoft.com/en-us/library/ms714687.aspx
   static const char* SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION;
@@ -301,8 +398,8 @@ class ImpalaServer : public ImpalaServiceIf, public 
ImpalaHiveServer2ServiceIf,
 
   /// Return exec state for given query_id, or NULL if not found.
   /// If 'lock' is true, the returned exec state's lock() will be acquired 
before
-  /// the query_exec_state_map_lock_ is released.
-  std::shared_ptr<QueryExecState> GetQueryExecState(
+  /// the client_request_state_map_lock_ is released.
+  std::shared_ptr<ClientRequestState> GetClientRequestState(
       const TUniqueId& query_id, bool lock);
 
   /// Writes the session id, if found, for the given query to the output
@@ -310,31 +407,30 @@ class ImpalaServer : public ImpalaServiceIf, public 
ImpalaHiveServer2ServiceIf,
   bool GetSessionIdForQuery(const TUniqueId& query_id, TUniqueId* session_id);
 
   /// Updates the number of databases / tables metrics from the FE catalog
-  Status UpdateCatalogMetrics();
+  Status UpdateCatalogMetrics() WARN_UNUSED_RESULT;
 
-  /// Starts asynchronous execution of query. Creates QueryExecState (returned
+  /// Starts asynchronous execution of query. Creates ClientRequestState 
(returned
   /// in exec_state), registers it and calls Coordinator::Execute().
   /// If it returns with an error status, exec_state will be NULL and nothing
-  /// will have been registered in query_exec_state_map_.
+  /// will have been registered in client_request_state_map_.
   /// session_state is a ptr to the session running this query and must have
   /// been checked out.
   /// query_session_state is a snapshot of session state that changes when the
   /// query was run. (e.g. default database).
   Status Execute(TQueryCtx* query_ctx,
-                 std::shared_ptr<SessionState> session_state,
-                 std::shared_ptr<QueryExecState>* exec_state);
+      std::shared_ptr<SessionState> session_state,
+      std::shared_ptr<ClientRequestState>* exec_state) WARN_UNUSED_RESULT;
 
   /// Implements Execute() logic, but doesn't unregister query on error.
   Status ExecuteInternal(const TQueryCtx& query_ctx,
-                         std::shared_ptr<SessionState> session_state,
-                         bool* registered_exec_state,
-                         std::shared_ptr<QueryExecState>* exec_state);
+      std::shared_ptr<SessionState> session_state, bool* registered_exec_state,
+      std::shared_ptr<ClientRequestState>* exec_state) WARN_UNUSED_RESULT;
 
-  /// Registers the query exec state with query_exec_state_map_ using the 
globally
+  /// Registers the query exec state with client_request_state_map_ using the 
globally
   /// unique query_id and add the query id to session state's open query list.
   /// The caller must have checked out the session state.
   Status RegisterQuery(std::shared_ptr<SessionState> session_state,
-      const std::shared_ptr<QueryExecState>& exec_state);
+      const std::shared_ptr<ClientRequestState>& exec_state) 
WARN_UNUSED_RESULT;
 
   /// Adds the query to the set of in-flight queries for the session. The 
query remains
   /// in-flight until the query is unregistered.  Until a query is in-flight, 
an attempt
@@ -346,15 +442,15 @@ class ImpalaServer : public ImpalaServiceIf, public 
ImpalaHiveServer2ServiceIf,
   /// The query must have already been registered using RegisterQuery().  The 
caller
   /// must have checked out the session state.
   Status SetQueryInflight(std::shared_ptr<SessionState> session_state,
-      const std::shared_ptr<QueryExecState>& exec_state);
+      const std::shared_ptr<ClientRequestState>& exec_state) 
WARN_UNUSED_RESULT;
 
   /// Unregister the query by cancelling it, removing exec_state from
-  /// query_exec_state_map_, and removing the query id from session state's 
in-flight
+  /// client_request_state_map_, and removing the query id from session 
state's in-flight
   /// query list.  If check_inflight is true, then return an error if the 
query is not
   /// yet in-flight.  Otherwise, proceed even if the query isn't yet in-flight 
(for
   /// cleaning up after an error on the query issuing path).
   Status UnregisterQuery(const TUniqueId& query_id, bool check_inflight,
-      const Status* cause = NULL);
+      const Status* cause = NULL) WARN_UNUSED_RESULT;
 
   /// Initiates query cancellation reporting the given cause as the query 
status.
   /// Assumes deliberate cancellation by the user if the cause is NULL.  
Returns an
@@ -363,13 +459,14 @@ class ImpalaServer : public ImpalaServiceIf, public 
ImpalaHiveServer2ServiceIf,
   /// be unregistered, after cancellation.  Caller should not hold any locks 
when
   /// calling this function.
   Status CancelInternal(const TUniqueId& query_id, bool check_inflight,
-      const Status* cause = NULL);
+      const Status* cause = NULL) WARN_UNUSED_RESULT;
 
   /// Close the session and release all resource used by this session.
   /// Caller should not hold any locks when calling this function.
   /// If ignore_if_absent is true, returns OK even if a session with the 
supplied ID does
   /// not exist.
-  Status CloseSessionInternal(const TUniqueId& session_id, bool 
ignore_if_absent);
+  Status CloseSessionInternal(const TUniqueId& session_id, bool 
ignore_if_absent)
+      WARN_UNUSED_RESULT;
 
   /// Gets the runtime profile string for a given query_id and writes it to 
the output
   /// stream. First searches for the query id in the map of in-flight queries. 
If no
@@ -379,10 +476,11 @@ class ImpalaServer : public ImpalaServiceIf, public 
ImpalaHiveServer2ServiceIf,
   /// If base64_encoded, outputs the base64 encoded profile output, otherwise 
the human
   /// readable string.
   Status GetRuntimeProfileStr(const TUniqueId& query_id, bool base64_encoded,
-      std::stringstream* output);
+      std::stringstream* output) WARN_UNUSED_RESULT;
 
   /// Returns the exec summary for this query.
-  Status GetExecSummary(const TUniqueId& query_id, TExecSummary* result);
+  Status GetExecSummary(const TUniqueId& query_id, TExecSummary* result)
+      WARN_UNUSED_RESULT;
 
   /// Initialize "default_configs_" to show the default values for 
ImpalaQueryOptions and
   /// "support_start_over/false" to indicate that Impala does not support 
start over
@@ -394,33 +492,35 @@ class ImpalaServer : public ImpalaServiceIf, public 
ImpalaHiveServer2ServiceIf,
   /// Returns OK unless there is some problem preventing profile log files
   /// from being written. If an error is returned, the constructor will disable
   /// profile logging.
-  Status InitProfileLogging();
+  Status InitProfileLogging() WARN_UNUSED_RESULT;
 
   /// Checks settings for audit event logging, including whether the output
   /// directory exists and is writeable, and initialises the first log file.
   /// Returns OK unless there is some problem preventing audit event log files
   /// from being written. If an error is returned, impalad startup will be 
aborted.
-  Status InitAuditEventLogging();
+  Status InitAuditEventLogging() WARN_UNUSED_RESULT;
 
   /// Checks settings for lineage logging, including whether the output
   /// directory exists and is writeable, and initialises the first log file.
   /// Returns OK unless there is some problem preventing lineage log files
   /// from being written. If an error is returned, impalad startup will be 
aborted.
-  Status InitLineageLogging();
+  Status InitLineageLogging() WARN_UNUSED_RESULT;
 
   /// Initializes a logging directory, creating the directory if it does not 
already
   /// exist. If there is any error creating the directory an error will be 
returned.
-  static Status InitLoggingDir(const std::string& log_dir);
+  static Status InitLoggingDir(const std::string& log_dir) WARN_UNUSED_RESULT;
 
   /// Returns true if audit event logging is enabled, false otherwise.
   bool IsAuditEventLoggingEnabled();
 
-  Status LogAuditRecord(const QueryExecState& exec_state, const TExecRequest& 
request);
+  Status LogAuditRecord(
+      const ClientRequestState& exec_state, const TExecRequest& request)
+      WARN_UNUSED_RESULT;
 
-  Status LogLineageRecord(const QueryExecState& exec_state);
+  Status LogLineageRecord(const ClientRequestState& exec_state) 
WARN_UNUSED_RESULT;
 
   /// Log audit and column lineage events
-  void LogQueryEvents(const QueryExecState& exec_state);
+  void LogQueryEvents(const ClientRequestState& exec_state);
 
   /// Runs once every 5s to flush the profile log file to disk.
   [[noreturn]] void LogFileFlushThread();
@@ -432,14 +532,15 @@ class ImpalaServer : public ImpalaServiceIf, public 
ImpalaHiveServer2ServiceIf,
   [[noreturn]] void LineageLoggerFlushThread();
 
   /// Copies a query's state into the query log. Called immediately prior to a
-  /// QueryExecState's deletion. Also writes the query profile to the profile 
log on disk.
-  /// Must be called with query_exec_state_map_lock_ held
-  void ArchiveQuery(const QueryExecState& query);
+  /// ClientRequestState's deletion. Also writes the query profile to the 
profile log
+  /// on disk. Must be called with client_request_state_map_lock_ held
+  void ArchiveQuery(const ClientRequestState& query);
 
   /// Checks whether the given user is allowed to delegate as the specified 
do_as_user.
   /// Returns OK if the authorization suceeds, otherwise returns an status 
with details
   /// on why the failure occurred.
-  Status AuthorizeProxyUser(const std::string& user, const std::string& 
do_as_user);
+  Status AuthorizeProxyUser(const std::string& user, const std::string& 
do_as_user)
+      WARN_UNUSED_RESULT;
 
   // Check if the local backend descriptor is in the list of known backends. 
If not, add
   // it to the list of known backends and add it to the 'topic_updates'.
@@ -518,7 +619,7 @@ class ImpalaServer : public ImpalaServiceIf, public 
ImpalaHiveServer2ServiceIf,
     /// otherwise leave this.profile empty.
     /// If encoded_str is non-empty, it is the base64 encoded string for
     /// exec_state->profile.
-    QueryStateRecord(const QueryExecState& exec_state, bool copy_profile = 
false,
+    QueryStateRecord(const ClientRequestState& exec_state, bool copy_profile = 
false,
         const std::string& encoded_str = "");
 
     /// Default constructor used only when participating in collections
@@ -533,7 +634,8 @@ class ImpalaServer : public ImpalaServiceIf, public 
ImpalaHiveServer2ServiceIf,
   /// Beeswax private methods
 
   /// Helper functions to translate between Beeswax and Impala structs
-  Status QueryToTQueryContext(const beeswax::Query& query, TQueryCtx* 
query_ctx);
+  Status QueryToTQueryContext(const beeswax::Query& query, TQueryCtx* 
query_ctx)
+      WARN_UNUSED_RESULT;
   void TUniqueIdToQueryHandle(const TUniqueId& query_id, beeswax::QueryHandle* 
handle);
   void QueryHandleToTUniqueId(const beeswax::QueryHandle& handle, TUniqueId* 
query_id);
 
@@ -542,19 +644,21 @@ class ImpalaServer : public ImpalaServiceIf, public 
ImpalaHiveServer2ServiceIf,
 
   /// Executes the fetch logic. Doesn't clean up the exec state if an error 
occurs.
   Status FetchInternal(const TUniqueId& query_id, bool start_over,
-      int32_t fetch_size, beeswax::Results* query_results);
+      int32_t fetch_size, beeswax::Results* query_results) WARN_UNUSED_RESULT;
 
   /// Populate insert_result and clean up exec state. If the query
   /// status is an error, insert_result is not populated and the status is 
returned.
-  Status CloseInsertInternal(const TUniqueId& query_id, TInsertResult* 
insert_result);
+  Status CloseInsertInternal(const TUniqueId& query_id, TInsertResult* 
insert_result)
+      WARN_UNUSED_RESULT;
 
   /// HiveServer2 private methods (implemented in impala-hs2-server.cc)
 
   /// Starts the synchronous execution of a HiverServer2 metadata operation.
-  /// If the execution succeeds, an QueryExecState will be created and 
registered in
-  /// query_exec_state_map_. Otherwise, nothing will be registered in 
query_exec_state_map_
-  /// and an error status will be returned. As part of this call, the 
TMetadataOpRequest
-  /// struct will be populated with the requesting user's session state.
+  /// If the execution succeeds, an ClientRequestState will be created and 
registered in
+  /// client_request_state_map_. Otherwise, nothing will be registered in
+  /// client_request_state_map_ and an error status will be returned. As part 
of this
+  /// call, the TMetadataOpRequest struct will be populated with the 
requesting user's
+  /// session state.
   /// Returns a TOperationHandle and TStatus.
   void ExecuteMetadataOp(
       const apache::hive::service::cli::thrift::THandleIdentifier& 
session_handle,
@@ -566,20 +670,21 @@ class ImpalaServer : public ImpalaServiceIf, public 
ImpalaHiveServer2ServiceIf,
   /// the query's state should be reset to fetch from the beginning of the 
result set.
   /// Doesn't clean up the exec state if an error occurs.
   Status FetchInternal(const TUniqueId& query_id, int32_t fetch_size, bool 
fetch_first,
-      apache::hive::service::cli::thrift::TFetchResultsResp* fetch_results);
+      apache::hive::service::cli::thrift::TFetchResultsResp* fetch_results)
+      WARN_UNUSED_RESULT;
 
   /// Helper functions to translate between HiveServer2 and Impala structs
 
   /// Returns !ok() if handle.guid.size() or handle.secret.size() != 16
   static Status THandleIdentifierToTUniqueId(
       const apache::hive::service::cli::thrift::THandleIdentifier& handle,
-      TUniqueId* unique_id, TUniqueId* secret);
+      TUniqueId* unique_id, TUniqueId* secret) WARN_UNUSED_RESULT;
   static void TUniqueIdToTHandleIdentifier(
       const TUniqueId& unique_id, const TUniqueId& secret,
       apache::hive::service::cli::thrift::THandleIdentifier* handle);
   Status TExecuteStatementReqToTQueryContext(
       const apache::hive::service::cli::thrift::TExecuteStatementReq 
execute_request,
-      TQueryCtx* query_ctx);
+      TQueryCtx* query_ctx) WARN_UNUSED_RESULT;
 
   /// Helper method to process cancellations that result from failed backends, 
called from
   /// the cancellation thread pool. The cancellation_work contains the query 
id to cancel
@@ -595,22 +700,6 @@ class ImpalaServer : public ImpalaServiceIf, public 
ImpalaHiveServer2ServiceIf,
   void AddPoolQueryOptions(TQueryCtx* query_ctx,
       const QueryOptionsMask& override_options_mask);
 
-  /// Processes a CatalogUpdateResult returned from the CatalogServer and 
ensures
-  /// the update has been applied to the local impalad's catalog cache. If
-  /// wait_for_all_subscribers is true, this function will also wait until all
-  /// catalog topic subscribers have processed the update. Called from 
QueryExecState
-  /// after executing any statement that modifies the catalog.
-  /// If wait_for_all_subscribers is false AND if the TCatalogUpdateResult 
contains
-  /// TCatalogObject(s) to add and/or remove, this function will update the 
local cache
-  /// by directly calling UpdateCatalog() with the TCatalogObject results.
-  /// Otherwise this function will wait until the local impalad's catalog 
cache has been
-  /// updated from a statestore heartbeat that includes this catalog update's 
catalog
-  /// version. If wait_for_all_subscribers is true, this function also wait 
all other
-  /// catalog topic subscribers to process this update by checking the current
-  /// min_subscriber_topic_version included in each state store heartbeat.
-  Status ProcessCatalogUpdateResult(const TCatalogUpdateResult& 
catalog_update_result,
-      bool wait_for_all_subscribers);
-
   /// Register timeout value upon opening a new session. This will wake up
   /// session_timeout_thread_ to update its poll period.
   void RegisterSessionTimeout(int32_t timeout);
@@ -680,104 +769,20 @@ class ImpalaServer : public ImpalaServiceIf, public 
ImpalaHiveServer2ServiceIf,
   /// on every poll period expiration or when the poll period changes.
   boost::condition_variable session_timeout_cv_;
 
-  /// map from query id to exec state; QueryExecState is owned by us and 
referenced
+  /// map from query id to exec state; ClientRequestState is owned by us and 
referenced
   /// as a shared_ptr to allow asynchronous deletion
-  typedef boost::unordered_map<TUniqueId, std::shared_ptr<QueryExecState>>
-      QueryExecStateMap;
-  QueryExecStateMap query_exec_state_map_;
+  typedef boost::unordered_map<TUniqueId, std::shared_ptr<ClientRequestState>>
+      ClientRequestStateMap;
+  ClientRequestStateMap client_request_state_map_;
 
-  /// Protects query_exec_state_map_. See "Locking" in the class comment for 
lock
+  /// Protects client_request_state_map_. See "Locking" in the class comment 
for lock
   /// acquisition order.
-  boost::mutex query_exec_state_map_lock_;
+  boost::mutex client_request_state_map_lock_;
 
   /// Default query options in the form of TQueryOptions and 
beeswax::ConfigVariable
   TQueryOptions default_query_options_;
   std::vector<beeswax::ConfigVariable> default_configs_;
 
-  /// Per-session state.  This object is reference counted using shared_ptrs.  
There
-  /// is one ref count in the SessionStateMap for as long as the session is 
active.
-  /// All queries running from this session also have a reference.
-  struct SessionState {
-    /// The default hs2_version must be V1 so that child queries (which use 
HS2, but may
-    /// run as children of Beeswax sessions) get results back in the expected 
format -
-    /// child queries inherit the HS2 version from their parents, and a 
Beeswax session
-    /// will never update the HS2 version from the default.
-    SessionState() : closed(false), expired(false),
-        hs2_version(apache::hive::service::cli::thrift::
-        TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V1), total_queries(0), 
ref_count(0) {
-    }
-
-    TSessionType::type session_type;
-
-    /// Time the session was created, in ms since epoch (UTC).
-    int64_t start_time_ms;
-
-    /// Connected user for this session, i.e. the user which originated this 
session.
-    std::string connected_user;
-
-    /// The user to delegate to. Empty for no delegation.
-    std::string do_as_user;
-
-    /// Client network address.
-    TNetworkAddress network_address;
-
-    /// Protects all fields below. See "Locking" in the class comment for lock
-    /// acquisition order.
-    boost::mutex lock;
-
-    /// If true, the session has been closed.
-    bool closed;
-
-    /// If true, the session was idle for too long and has been expired. Only 
set when
-    /// ref_count == 0, after which point ref_count should never become 
non-zero (since
-    /// clients will use ScopedSessionState to access the session, which will 
prevent use
-    /// after expiration).
-    bool expired;
-
-    /// The default database (changed as a result of 'use' query execution).
-    std::string database;
-
-    /// The default query options of this session. When the session is 
created, the
-    /// session inherits the global defaults from 
ImpalaServer::default_query_options_.
-    TQueryOptions default_query_options;
-
-    /// BitSet indicating which query options in default_query_options have 
been
-    /// explicitly set in the session. Updated when a query option is 
specified using a
-    /// SET command: the bit corresponding to the TImpalaQueryOptions enum is 
set.
-    QueryOptionsMask set_query_options_mask;
-
-    /// For HS2 only, the protocol version this session is expecting.
-    apache::hive::service::cli::thrift::TProtocolVersion::type hs2_version;
-
-    /// Inflight queries belonging to this session
-    boost::unordered_set<TUniqueId> inflight_queries;
-
-    /// Total number of queries run as part of this session.
-    int64_t total_queries;
-
-    /// Time the session was last accessed, in ms since epoch (UTC).
-    int64_t last_accessed_ms;
-
-    /// The latest Kudu timestamp observed after DML operations executed 
within this
-    /// session.
-    uint64_t kudu_latest_observed_ts;
-
-    /// Number of RPCs concurrently accessing this session state. Used to 
detect when a
-    /// session may be correctly expired after a timeout (when ref_count == 
0). Typically
-    /// at most one RPC will be issued against a session at a time, but 
clients may do
-    /// something unexpected and, for example, poll in one thread and fetch in 
another.
-    uint32_t ref_count;
-
-    /// Per-session idle timeout in seconds. Default value is 
FLAGS_idle_session_timeout.
-    /// It can be overridden with a smaller value via the option 
"idle_session_timeout"
-    /// when opening a HS2 session.
-    int32_t session_timeout;
-
-    /// Builds a Thrift representation of this SessionState for serialisation 
to
-    /// the frontend.
-    void ToThrift(const TUniqueId& session_id, TSessionState* session_state);
-  };
-
   /// Class that allows users of SessionState to mark a session as in-use, and 
therefore
   /// immune to expiration. The marking is done in WithSession() and undone in 
the
   /// destructor, so this class can be used to 'check-out' a session for the 
duration of a
@@ -790,7 +795,7 @@ class ImpalaServer : public ImpalaServiceIf, public 
ImpalaHiveServer2ServiceIf,
     /// object goes out of scope. Returns OK unless there is an error in 
GetSessionState.
     /// Must only be called once per ScopedSessionState.
     Status WithSession(const TUniqueId& session_id,
-        std::shared_ptr<SessionState>* session = NULL) {
+        std::shared_ptr<SessionState>* session = NULL) WARN_UNUSED_RESULT {
       DCHECK(session_.get() == NULL);
       RETURN_IF_ERROR(impala_->GetSessionState(session_id, &session_, true));
       if (session != NULL) (*session) = session_;
@@ -840,7 +845,8 @@ class ImpalaServer : public ImpalaServiceIf, public 
ImpalaHiveServer2ServiceIf,
   /// If mark_active is true, also checks if the session is expired or closed 
and
   /// increments the session's reference counter if it is still alive.
   Status GetSessionState(const TUniqueId& session_id,
-      std::shared_ptr<SessionState>* session_state, bool mark_active = false);
+      std::shared_ptr<SessionState>* session_state, bool mark_active = false)
+      WARN_UNUSED_RESULT;
 
   /// Decrement the session's reference counter and mark last_accessed_ms so 
that state
   /// expiration can proceed.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/query-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.cc 
b/be/src/service/query-exec-state.cc
deleted file mode 100644
index 72dba6f..0000000
--- a/be/src/service/query-exec-state.cc
+++ /dev/null
@@ -1,1084 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#include "service/query-exec-state.h"
-
-#include <limits>
-#include <gutil/strings/substitute.h>
-
-#include "exprs/expr-context.h"
-#include "exprs/expr.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/row-batch.h"
-#include "runtime/runtime-state.h"
-#include "runtime/exec-env.h"
-#include "scheduling/admission-controller.h"
-#include "scheduling/scheduler.h"
-#include "service/frontend.h"
-#include "service/impala-server.h"
-#include "service/query-options.h"
-#include "service/query-result-set.h"
-#include "util/debug-util.h"
-#include "util/impalad-metrics.h"
-#include "util/runtime-profile-counters.h"
-#include "util/time.h"
-
-#include "gen-cpp/CatalogService.h"
-#include "gen-cpp/CatalogService_types.h"
-
-#include <thrift/Thrift.h>
-
-#include "common/names.h"
-
-using boost::algorithm::join;
-using namespace apache::hive::service::cli::thrift;
-using namespace apache::thrift;
-using namespace beeswax;
-using namespace strings;
-
-DECLARE_int32(catalog_service_port);
-DECLARE_string(catalog_service_host);
-DECLARE_int64(max_result_cache_size);
-
-namespace impala {
-
-// Keys into the info string map of the runtime profile referring to specific
-// items used by CM for monitoring purposes.
-static const string PER_HOST_MEM_KEY = "Estimated Per-Host Mem";
-static const string PER_HOST_MEMORY_RESERVATION_KEY = "Per-Host Memory 
Reservation";
-static const string TABLES_MISSING_STATS_KEY = "Tables Missing Stats";
-static const string TABLES_WITH_CORRUPT_STATS_KEY = "Tables With Corrupt Table 
Stats";
-static const string TABLES_WITH_MISSING_DISK_IDS_KEY = "Tables With Missing 
Disk Ids";
-
-ImpalaServer::QueryExecState::QueryExecState(const TQueryCtx& query_ctx,
-    ExecEnv* exec_env, Frontend* frontend, ImpalaServer* server,
-    shared_ptr<SessionState> session)
-  : query_ctx_(query_ctx),
-    last_active_time_ms_(numeric_limits<int64_t>::max()),
-    ref_count_(0L),
-    child_query_executor_(new ChildQueryExecutor),
-    exec_env_(exec_env),
-    is_block_on_wait_joining_(false),
-    session_(session),
-    schedule_(NULL),
-    coord_(NULL),
-    result_cache_max_size_(-1),
-    profile_(&profile_pool_, "Query"), // assign name w/ id after planning
-    server_profile_(&profile_pool_, "ImpalaServer"),
-    summary_profile_(&profile_pool_, "Summary"),
-    is_cancelled_(false),
-    eos_(false),
-    query_state_(beeswax::QueryState::CREATED),
-    current_batch_(NULL),
-    current_batch_row_(0),
-    num_rows_fetched_(0),
-    fetched_rows_(false),
-    frontend_(frontend),
-    parent_server_(server),
-    start_time_(TimestampValue::LocalTime()) {
-#ifndef NDEBUG
-  profile_.AddInfoString("DEBUG MODE WARNING", "Query profile created while 
running a "
-      "DEBUG build of Impala. Use RELEASE builds to measure query 
performance.");
-#endif
-  row_materialization_timer_ = ADD_TIMER(&server_profile_, 
"RowMaterializationTimer");
-  client_wait_timer_ = ADD_TIMER(&server_profile_, "ClientFetchWaitTimer");
-  query_events_ = summary_profile_.AddEventSequence("Query Timeline");
-  query_events_->Start();
-  profile_.AddChild(&summary_profile_);
-
-  profile_.set_name("Query (id=" + PrintId(query_id()) + ")");
-  summary_profile_.AddInfoString("Session ID", PrintId(session_id()));
-  summary_profile_.AddInfoString("Session Type", 
PrintTSessionType(session_type()));
-  if (session_type() == TSessionType::HIVESERVER2) {
-    summary_profile_.AddInfoString("HiveServer2 Protocol Version",
-        Substitute("V$0", 1 + session->hs2_version));
-  }
-  summary_profile_.AddInfoString("Start Time", start_time().DebugString());
-  summary_profile_.AddInfoString("End Time", "");
-  summary_profile_.AddInfoString("Query Type", "N/A");
-  summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_));
-  summary_profile_.AddInfoString("Query Status", "OK");
-  summary_profile_.AddInfoString("Impala Version", GetVersionString(/* compact 
*/ true));
-  summary_profile_.AddInfoString("User", effective_user());
-  summary_profile_.AddInfoString("Connected User", connected_user());
-  summary_profile_.AddInfoString("Delegated User", do_as_user());
-  summary_profile_.AddInfoString("Network Address",
-      lexical_cast<string>(session_->network_address));
-  summary_profile_.AddInfoString("Default Db", default_db());
-  summary_profile_.AddInfoString("Sql Statement", 
query_ctx_.client_request.stmt);
-  summary_profile_.AddInfoString("Coordinator",
-      TNetworkAddressToString(exec_env->backend_address()));
-}
-
-ImpalaServer::QueryExecState::~QueryExecState() {
-  DCHECK(wait_thread_.get() == NULL) << "BlockOnWait() needs to be called!";
-}
-
-Status ImpalaServer::QueryExecState::SetResultCache(QueryResultSet* cache,
-    int64_t max_size) {
-  lock_guard<mutex> l(lock_);
-  DCHECK(result_cache_ == NULL);
-  result_cache_.reset(cache);
-  if (max_size > FLAGS_max_result_cache_size) {
-    return Status(
-        Substitute("Requested result-cache size of $0 exceeds Impala's maximum 
of $1.",
-            max_size, FLAGS_max_result_cache_size));
-  }
-  result_cache_max_size_ = max_size;
-  return Status::OK();
-}
-
-Status ImpalaServer::QueryExecState::Exec(TExecRequest* exec_request) {
-  MarkActive();
-  exec_request_ = *exec_request;
-
-  profile_.AddChild(&server_profile_);
-  summary_profile_.AddInfoString("Query Type", PrintTStmtType(stmt_type()));
-  summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_));
-  summary_profile_.AddInfoString("Query Options (non default)",
-      DebugQueryOptions(query_ctx_.client_request.query_options));
-
-  switch (exec_request->stmt_type) {
-    case TStmtType::QUERY:
-    case TStmtType::DML:
-      DCHECK(exec_request_.__isset.query_exec_request);
-      return ExecQueryOrDmlRequest(exec_request_.query_exec_request);
-    case TStmtType::EXPLAIN: {
-      request_result_set_.reset(new vector<TResultRow>(
-          exec_request_.explain_result.results));
-      return Status::OK();
-    }
-    case TStmtType::DDL: {
-      DCHECK(exec_request_.__isset.catalog_op_request);
-      return ExecDdlRequest();
-    }
-    case TStmtType::LOAD: {
-      DCHECK(exec_request_.__isset.load_data_request);
-      TLoadDataResp response;
-      RETURN_IF_ERROR(
-          frontend_->LoadData(exec_request_.load_data_request, &response));
-      request_result_set_.reset(new vector<TResultRow>);
-      request_result_set_->push_back(response.load_summary);
-
-      // Now refresh the table metadata.
-      TCatalogOpRequest reset_req;
-      reset_req.__set_op_type(TCatalogOpType::RESET_METADATA);
-      reset_req.__set_reset_metadata_params(TResetMetadataRequest());
-      
reset_req.reset_metadata_params.__set_header(TCatalogServiceRequestHeader());
-      reset_req.reset_metadata_params.__set_is_refresh(true);
-      reset_req.reset_metadata_params.__set_table_name(
-          exec_request_.load_data_request.table_name);
-      catalog_op_executor_.reset(
-          new CatalogOpExecutor(exec_env_, frontend_, &server_profile_));
-      RETURN_IF_ERROR(catalog_op_executor_->Exec(reset_req));
-      RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
-          *catalog_op_executor_->update_catalog_result(),
-          exec_request_.query_options.sync_ddl));
-      return Status::OK();
-    }
-    case TStmtType::SET: {
-      DCHECK(exec_request_.__isset.set_query_option_request);
-      lock_guard<mutex> l(session_->lock);
-      if (exec_request_.set_query_option_request.__isset.key) {
-        // "SET key=value" updates the session query options.
-        DCHECK(exec_request_.set_query_option_request.__isset.value);
-        RETURN_IF_ERROR(SetQueryOption(
-            exec_request_.set_query_option_request.key,
-            exec_request_.set_query_option_request.value,
-            &session_->default_query_options,
-            &session_->set_query_options_mask));
-        SetResultSet({}, {});
-      } else {
-        // "SET" returns a table of all query options.
-        map<string, string> config;
-        TQueryOptionsToMap(
-            session_->default_query_options, &config);
-        vector<string> keys, values;
-        map<string, string>::const_iterator itr = config.begin();
-        for (; itr != config.end(); ++itr) {
-          keys.push_back(itr->first);
-          values.push_back(itr->second);
-        }
-        SetResultSet(keys, values);
-      }
-      return Status::OK();
-    }
-    default:
-      stringstream errmsg;
-      errmsg << "Unknown  exec request stmt type: " << exec_request_.stmt_type;
-      return Status(errmsg.str());
-  }
-}
-
-Status ImpalaServer::QueryExecState::ExecLocalCatalogOp(
-    const TCatalogOpRequest& catalog_op) {
-  switch (catalog_op.op_type) {
-    case TCatalogOpType::USE: {
-      lock_guard<mutex> l(session_->lock);
-      session_->database = exec_request_.catalog_op_request.use_db_params.db;
-      return Status::OK();
-    }
-    case TCatalogOpType::SHOW_TABLES: {
-      const TShowTablesParams* params = &catalog_op.show_tables_params;
-      // A NULL pattern means match all tables. However, Thrift string types 
can't
-      // be NULL in C++, so we have to test if it's set rather than just 
blindly
-      // using the value.
-      const string* table_name =
-          params->__isset.show_pattern ? &(params->show_pattern) : NULL;
-      TGetTablesResult table_names;
-      RETURN_IF_ERROR(frontend_->GetTableNames(params->db, table_name,
-          &query_ctx_.session, &table_names));
-      SetResultSet(table_names.tables);
-      return Status::OK();
-    }
-    case TCatalogOpType::SHOW_DBS: {
-      const TShowDbsParams* params = &catalog_op.show_dbs_params;
-      TGetDbsResult dbs;
-      const string* db_pattern =
-          params->__isset.show_pattern ? (&params->show_pattern) : NULL;
-      RETURN_IF_ERROR(
-          frontend_->GetDbs(db_pattern, &query_ctx_.session, &dbs));
-      vector<string> names, comments;
-      names.reserve(dbs.dbs.size());
-      comments.reserve(dbs.dbs.size());
-      for (const TDatabase& db: dbs.dbs) {
-        names.push_back(db.db_name);
-        comments.push_back(db.metastore_db.description);
-      }
-      SetResultSet(names, comments);
-      return Status::OK();
-    }
-    case TCatalogOpType::SHOW_DATA_SRCS: {
-      const TShowDataSrcsParams* params = &catalog_op.show_data_srcs_params;
-      TGetDataSrcsResult result;
-      const string* pattern =
-          params->__isset.show_pattern ? (&params->show_pattern) : NULL;
-      RETURN_IF_ERROR(
-          frontend_->GetDataSrcMetadata(pattern, &result));
-      SetResultSet(result.data_src_names, result.locations, result.class_names,
-          result.api_versions);
-      return Status::OK();
-    }
-    case TCatalogOpType::SHOW_STATS: {
-      const TShowStatsParams& params = catalog_op.show_stats_params;
-      TResultSet response;
-      RETURN_IF_ERROR(frontend_->GetStats(params, &response));
-      // Set the result set and its schema from the response.
-      request_result_set_.reset(new vector<TResultRow>(response.rows));
-      result_metadata_ = response.schema;
-      return Status::OK();
-    }
-    case TCatalogOpType::SHOW_FUNCTIONS: {
-      const TShowFunctionsParams* params = &catalog_op.show_fns_params;
-      TGetFunctionsResult functions;
-      const string* fn_pattern =
-          params->__isset.show_pattern ? (&params->show_pattern) : NULL;
-      RETURN_IF_ERROR(frontend_->GetFunctions(
-          params->category, params->db, fn_pattern, &query_ctx_.session, 
&functions));
-      SetResultSet(functions.fn_ret_types, functions.fn_signatures,
-          functions.fn_binary_types, functions.fn_persistence);
-      return Status::OK();
-    }
-    case TCatalogOpType::SHOW_ROLES: {
-      const TShowRolesParams& params = catalog_op.show_roles_params;
-      if (params.is_admin_op) {
-        // Verify the user has privileges to perform this operation by 
checking against
-        // the Sentry Service (via the Catalog Server).
-        catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_,
-            &server_profile_));
-
-        TSentryAdminCheckRequest req;
-        req.__set_header(TCatalogServiceRequestHeader());
-        req.header.__set_requesting_user(effective_user());
-        RETURN_IF_ERROR(catalog_op_executor_->SentryAdminCheck(req));
-      }
-
-      // If we have made it here, the user has privileges to execute this 
operation.
-      // Return the results.
-      TShowRolesResult result;
-      RETURN_IF_ERROR(frontend_->ShowRoles(params, &result));
-      SetResultSet(result.role_names);
-      return Status::OK();
-    }
-    case TCatalogOpType::SHOW_GRANT_ROLE: {
-      const TShowGrantRoleParams& params = catalog_op.show_grant_role_params;
-      if (params.is_admin_op) {
-        // Verify the user has privileges to perform this operation by 
checking against
-        // the Sentry Service (via the Catalog Server).
-        catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_,
-            &server_profile_));
-
-        TSentryAdminCheckRequest req;
-        req.__set_header(TCatalogServiceRequestHeader());
-        req.header.__set_requesting_user(effective_user());
-        RETURN_IF_ERROR(catalog_op_executor_->SentryAdminCheck(req));
-      }
-
-      TResultSet response;
-      RETURN_IF_ERROR(frontend_->GetRolePrivileges(params, &response));
-      // Set the result set and its schema from the response.
-      request_result_set_.reset(new vector<TResultRow>(response.rows));
-      result_metadata_ = response.schema;
-      return Status::OK();
-    }
-    case TCatalogOpType::DESCRIBE_DB: {
-      TDescribeResult response;
-      RETURN_IF_ERROR(frontend_->DescribeDb(catalog_op.describe_db_params,
-          &response));
-      // Set the result set
-      request_result_set_.reset(new vector<TResultRow>(response.results));
-      return Status::OK();
-    }
-    case TCatalogOpType::DESCRIBE_TABLE: {
-      TDescribeResult response;
-      
RETURN_IF_ERROR(frontend_->DescribeTable(catalog_op.describe_table_params,
-          &response));
-      // Set the result set
-      request_result_set_.reset(new vector<TResultRow>(response.results));
-      return Status::OK();
-    }
-    case TCatalogOpType::SHOW_CREATE_TABLE: {
-      string response;
-      
RETURN_IF_ERROR(frontend_->ShowCreateTable(catalog_op.show_create_table_params,
-          &response));
-      SetResultSet(vector<string>(1, response));
-      return Status::OK();
-    }
-    case TCatalogOpType::SHOW_CREATE_FUNCTION: {
-      string response;
-      
RETURN_IF_ERROR(frontend_->ShowCreateFunction(catalog_op.show_create_function_params,
-          &response));
-      SetResultSet(vector<string>(1, response));
-      return Status::OK();
-    }
-    case TCatalogOpType::SHOW_FILES: {
-      TResultSet response;
-      RETURN_IF_ERROR(frontend_->GetTableFiles(catalog_op.show_files_params, 
&response));
-      // Set the result set and its schema from the response.
-      request_result_set_.reset(new vector<TResultRow>(response.rows));
-      result_metadata_ = response.schema;
-      return Status::OK();
-    }
-    default: {
-      stringstream ss;
-      ss << "Unexpected TCatalogOpType: " << catalog_op.op_type;
-      return Status(ss.str());
-    }
-  }
-}
-
-Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest(
-    const TQueryExecRequest& query_exec_request) {
-  // we always need at least one plan fragment
-  DCHECK(query_exec_request.plan_exec_info.size() > 0);
-
-  if (query_exec_request.__isset.query_plan) {
-    stringstream plan_ss;
-    // Add some delimiters to make it clearer where the plan
-    // begins and the profile ends
-    plan_ss << "\n----------------\n"
-            << query_exec_request.query_plan
-            << "----------------";
-    summary_profile_.AddInfoString("Plan", plan_ss.str());
-  }
-  // Add info strings consumed by CM: Estimated mem and tables missing stats.
-  if (query_exec_request.__isset.per_host_mem_estimate) {
-    stringstream ss;
-    ss << query_exec_request.per_host_mem_estimate;
-    summary_profile_.AddInfoString(PER_HOST_MEM_KEY, ss.str());
-  }
-  if (query_exec_request.__isset.per_host_min_reservation) {
-    stringstream ss;
-    ss << query_exec_request.per_host_min_reservation;
-    summary_profile_.AddInfoString(PER_HOST_MEMORY_RESERVATION_KEY, ss.str());
-  }
-  if (!query_exec_request.query_ctx.__isset.parent_query_id &&
-      query_exec_request.query_ctx.__isset.tables_missing_stats &&
-      !query_exec_request.query_ctx.tables_missing_stats.empty()) {
-    stringstream ss;
-    const vector<TTableName>& tbls = 
query_exec_request.query_ctx.tables_missing_stats;
-    for (int i = 0; i < tbls.size(); ++i) {
-      if (i != 0) ss << ",";
-      ss << tbls[i].db_name << "." << tbls[i].table_name;
-    }
-    summary_profile_.AddInfoString(TABLES_MISSING_STATS_KEY, ss.str());
-  }
-
-  if (!query_exec_request.query_ctx.__isset.parent_query_id &&
-      query_exec_request.query_ctx.__isset.tables_with_corrupt_stats &&
-      !query_exec_request.query_ctx.tables_with_corrupt_stats.empty()) {
-    stringstream ss;
-    const vector<TTableName>& tbls =
-        query_exec_request.query_ctx.tables_with_corrupt_stats;
-    for (int i = 0; i < tbls.size(); ++i) {
-      if (i != 0) ss << ",";
-      ss << tbls[i].db_name << "." << tbls[i].table_name;
-    }
-    summary_profile_.AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, ss.str());
-  }
-
-  if (query_exec_request.query_ctx.__isset.tables_missing_diskids &&
-      !query_exec_request.query_ctx.tables_missing_diskids.empty()) {
-    stringstream ss;
-    const vector<TTableName>& tbls =
-        query_exec_request.query_ctx.tables_missing_diskids;
-    for (int i = 0; i < tbls.size(); ++i) {
-      if (i != 0) ss << ",";
-      ss << tbls[i].db_name << "." << tbls[i].table_name;
-    }
-    summary_profile_.AddInfoString(TABLES_WITH_MISSING_DISK_IDS_KEY, ss.str());
-  }
-
-  {
-    lock_guard<mutex> l(lock_);
-    // Don't start executing the query if Cancel() was called concurrently 
with Exec().
-    if (is_cancelled_) return Status::CANCELLED;
-    // TODO: make schedule local to coordinator and move schedule_->Release() 
into
-    // Coordinator::TearDown()
-    schedule_.reset(new QuerySchedule(query_id(), query_exec_request,
-        exec_request_.query_options, &summary_profile_, query_events_));
-  }
-  Status status = exec_env_->scheduler()->Schedule(schedule_.get());
-  {
-    lock_guard<mutex> l(lock_);
-    RETURN_IF_ERROR(UpdateQueryStatus(status));
-  }
-
-  if (exec_env_->admission_controller() != nullptr) {
-    status = exec_env_->admission_controller()->AdmitQuery(schedule_.get());
-    {
-      lock_guard<mutex> l(lock_);
-      RETURN_IF_ERROR(UpdateQueryStatus(status));
-    }
-  }
-
-  coord_.reset(new Coordinator(*schedule_, exec_env_, query_events_));
-  status = coord_->Exec();
-  {
-    lock_guard<mutex> l(lock_);
-    RETURN_IF_ERROR(UpdateQueryStatus(status));
-  }
-
-  profile_.AddChild(coord_->query_profile());
-  return Status::OK();
-}
-
-Status ImpalaServer::QueryExecState::ExecDdlRequest() {
-  string op_type = catalog_op_type() == TCatalogOpType::DDL ?
-      PrintTDdlType(ddl_type()) : PrintTCatalogOpType(catalog_op_type());
-  summary_profile_.AddInfoString("DDL Type", op_type);
-
-  if (catalog_op_type() != TCatalogOpType::DDL &&
-      catalog_op_type() != TCatalogOpType::RESET_METADATA) {
-    Status status = ExecLocalCatalogOp(exec_request_.catalog_op_request);
-    lock_guard<mutex> l(lock_);
-    return UpdateQueryStatus(status);
-  }
-
-  if (ddl_type() == TDdlType::COMPUTE_STATS) {
-    TComputeStatsParams& compute_stats_params =
-        exec_request_.catalog_op_request.ddl_params.compute_stats_params;
-    // Add child queries for computing table and column stats.
-    vector<ChildQuery> child_queries;
-    if (compute_stats_params.__isset.tbl_stats_query) {
-      child_queries.push_back(
-          ChildQuery(compute_stats_params.tbl_stats_query, this, 
parent_server_));
-    }
-    if (compute_stats_params.__isset.col_stats_query) {
-      child_queries.push_back(
-          ChildQuery(compute_stats_params.col_stats_query, this, 
parent_server_));
-    }
-
-    if (child_queries.size() > 0) 
child_query_executor_->ExecAsync(move(child_queries));
-    return Status::OK();
-  }
-
-  catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_,
-      &server_profile_));
-  Status status = catalog_op_executor_->Exec(exec_request_.catalog_op_request);
-  {
-    lock_guard<mutex> l(lock_);
-    RETURN_IF_ERROR(UpdateQueryStatus(status));
-  }
-
-  // If this is a CTAS request, there will usually be more work to do
-  // after executing the CREATE TABLE statement (the INSERT portion of the 
operation).
-  // The exception is if the user specified IF NOT EXISTS and the table already
-  // existed, in which case we do not execute the INSERT.
-  if (catalog_op_type() == TCatalogOpType::DDL &&
-      ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT &&
-      !catalog_op_executor_->ddl_exec_response()->new_table_created) {
-    DCHECK(exec_request_.catalog_op_request.
-        ddl_params.create_table_params.if_not_exists);
-    return Status::OK();
-  }
-
-  // Add newly created table to catalog cache.
-  RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
-      *catalog_op_executor_->update_catalog_result(),
-      exec_request_.query_options.sync_ddl));
-
-  if (catalog_op_type() == TCatalogOpType::DDL &&
-      ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT) {
-    // At this point, the remainder of the CTAS request executes
-    // like a normal DML request. As with other DML requests, it will
-    // wait for another catalog update if any partitions were altered as a 
result
-    // of the operation.
-    DCHECK(exec_request_.__isset.query_exec_request);
-    RETURN_IF_ERROR(ExecQueryOrDmlRequest(exec_request_.query_exec_request));
-  }
-
-  // Set the results to be reported to the client.
-  SetResultSet(catalog_op_executor_->ddl_exec_response());
-  return Status::OK();
-}
-
-void ImpalaServer::QueryExecState::Done() {
-  MarkActive();
-  // Make sure we join on wait_thread_ before we finish (and especially before 
this object
-  // is destroyed).
-  BlockOnWait();
-
-  // Update latest observed Kudu timestamp stored in the session from the 
coordinator.
-  // Needs to take the session_ lock which must not be taken while holding 
lock_, so this
-  // must happen before taking lock_ below.
-  if (coord_.get() != NULL) {
-    // This is safe to access on coord_ after Wait() has been called.
-    uint64_t latest_kudu_ts = coord_->GetLatestKuduInsertTimestamp();
-    if (latest_kudu_ts > 0) {
-      VLOG_RPC << "Updating session (id=" << session_id()  << ") with latest "
-               << "observed Kudu timestamp: " << latest_kudu_ts;
-      lock_guard<mutex> session_lock(session_->lock);
-      session_->kudu_latest_observed_ts = std::max<uint64_t>(
-          session_->kudu_latest_observed_ts, latest_kudu_ts);
-    }
-  }
-
-  unique_lock<mutex> l(lock_);
-  end_time_ = TimestampValue::LocalTime();
-  summary_profile_.AddInfoString("End Time", end_time().DebugString());
-  summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_));
-  query_events_->MarkEvent("Unregister query");
-
-  // Update result set cache metrics, and update mem limit accounting before 
tearing
-  // down the coordinator.
-  ClearResultCache();
-
-  if (coord_.get() != NULL) {
-    // Release any reserved resources.
-    if (exec_env_->admission_controller() != nullptr) {
-      Status status = 
exec_env_->admission_controller()->ReleaseQuery(schedule_.get());
-      if (!status.ok()) {
-        LOG(WARNING) << "Failed to release resources of query " << 
schedule_->query_id()
-                     << " because of error: " << status.GetDetail();
-      }
-    }
-    coord_->TearDown();
-  }
-}
-
-Status ImpalaServer::QueryExecState::Exec(const TMetadataOpRequest& 
exec_request) {
-  TResultSet metadata_op_result;
-  // Like the other Exec(), fill out as much profile information as we're able 
to.
-  summary_profile_.AddInfoString("Query Type", PrintTStmtType(TStmtType::DDL));
-  summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_));
-  RETURN_IF_ERROR(frontend_->ExecHiveServer2MetadataOp(exec_request,
-      &metadata_op_result));
-  result_metadata_ = metadata_op_result.schema;
-  request_result_set_.reset(new vector<TResultRow>(metadata_op_result.rows));
-  return Status::OK();
-}
-
-void ImpalaServer::QueryExecState::WaitAsync() {
-  wait_thread_.reset(new Thread(
-      "query-exec-state", "wait-thread", &ImpalaServer::QueryExecState::Wait, 
this));
-}
-
-void ImpalaServer::QueryExecState::BlockOnWait() {
-  unique_lock<mutex> l(lock_);
-  if (wait_thread_.get() == NULL) return;
-  if (!is_block_on_wait_joining_) {
-    // No other thread is already joining on wait_thread_, so this thread 
needs to do
-    // it.  Other threads will need to block on the cond-var.
-    is_block_on_wait_joining_ = true;
-    l.unlock();
-    wait_thread_->Join();
-    l.lock();
-    is_block_on_wait_joining_ = false;
-    wait_thread_.reset();
-    block_on_wait_cv_.notify_all();
-  } else {
-    // Another thread is already joining with wait_thread_.  Block on the 
cond-var
-    // until the Join() executed in the other thread has completed.
-    do {
-      block_on_wait_cv_.wait(l);
-    } while (is_block_on_wait_joining_);
-  }
-}
-
-void ImpalaServer::QueryExecState::Wait() {
-  // block until results are ready
-  Status status = WaitInternal();
-  {
-    lock_guard<mutex> l(lock_);
-    if (returns_result_set()) {
-      query_events()->MarkEvent("Rows available");
-    } else {
-      query_events()->MarkEvent("Request finished");
-    }
-    UpdateQueryStatus(status);
-  }
-  if (status.ok()) {
-    UpdateNonErrorQueryState(beeswax::QueryState::FINISHED);
-  }
-}
-
-Status ImpalaServer::QueryExecState::WaitInternal() {
-  // Explain requests have already populated the result set. Nothing to do 
here.
-  if (exec_request_.stmt_type == TStmtType::EXPLAIN) {
-    MarkInactive();
-    return Status::OK();
-  }
-
-  vector<ChildQuery*> child_queries;
-  Status child_queries_status = 
child_query_executor_->WaitForAll(&child_queries);
-  {
-    lock_guard<mutex> l(lock_);
-    RETURN_IF_ERROR(query_status_);
-    RETURN_IF_ERROR(UpdateQueryStatus(child_queries_status));
-  }
-  if (!child_queries.empty()) query_events_->MarkEvent("Child queries 
finished");
-
-  if (coord_.get() != NULL) {
-    RETURN_IF_ERROR(coord_->Wait());
-    RETURN_IF_ERROR(UpdateCatalog());
-  }
-
-  if (catalog_op_type() == TCatalogOpType::DDL &&
-      ddl_type() == TDdlType::COMPUTE_STATS && child_queries.size() > 0) {
-    RETURN_IF_ERROR(UpdateTableAndColumnStats(child_queries));
-  }
-
-  if (!returns_result_set()) {
-    // Queries that do not return a result are finished at this point. This 
includes
-    // DML operations and a subset of the DDL operations.
-    eos_ = true;
-  } else if (catalog_op_type() == TCatalogOpType::DDL &&
-      ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT) {
-    SetCreateTableAsSelectResultSet();
-  }
-  // Rows are available now (for SELECT statement), so start the 'wait' timer 
that tracks
-  // how long Impala waits for the client to fetch rows. For other statements, 
track the
-  // time until a Close() is received.
-  MarkInactive();
-  return Status::OK();
-}
-
-Status ImpalaServer::QueryExecState::FetchRows(const int32_t max_rows,
-    QueryResultSet* fetched_rows) {
-  // Pause the wait timer, since the client has instructed us to do work on 
its behalf.
-  MarkActive();
-
-  // ImpalaServer::FetchInternal has already taken our lock_
-  UpdateQueryStatus(FetchRowsInternal(max_rows, fetched_rows));
-
-  MarkInactive();
-  return query_status_;
-}
-
-Status ImpalaServer::QueryExecState::RestartFetch() {
-  // No result caching for this query. Restart is invalid.
-  if (result_cache_max_size_ <= 0) {
-    return Status(ErrorMsg(TErrorCode::RECOVERABLE_ERROR,
-        "Restarting of fetch requires enabling of query result caching."));
-  }
-  // The cache overflowed on a previous fetch.
-  if (result_cache_.get() == NULL) {
-    stringstream ss;
-    ss << "The query result cache exceeded its limit of " << 
result_cache_max_size_
-       << " rows. Restarting the fetch is not possible.";
-    return Status(ErrorMsg(TErrorCode::RECOVERABLE_ERROR, ss.str()));
-  }
-  // Reset fetch state to start over.
-  eos_ = false;
-  num_rows_fetched_ = 0;
-  return Status::OK();
-}
-
-void ImpalaServer::QueryExecState::UpdateNonErrorQueryState(
-    beeswax::QueryState::type query_state) {
-  lock_guard<mutex> l(lock_);
-  DCHECK(query_state != beeswax::QueryState::EXCEPTION);
-  if (query_state_ < query_state) query_state_ = query_state;
-}
-
-Status ImpalaServer::QueryExecState::UpdateQueryStatus(const Status& status) {
-  // Preserve the first non-ok status
-  if (!status.ok() && query_status_.ok()) {
-    query_state_ = beeswax::QueryState::EXCEPTION;
-    query_status_ = status;
-    summary_profile_.AddInfoString("Query Status", query_status_.GetDetail());
-  }
-
-  return status;
-}
-
-Status ImpalaServer::QueryExecState::FetchRowsInternal(const int32_t max_rows,
-    QueryResultSet* fetched_rows) {
-  DCHECK(query_state_ != beeswax::QueryState::EXCEPTION);
-
-  if (eos_) return Status::OK();
-
-  if (request_result_set_ != NULL) {
-    query_state_ = beeswax::QueryState::FINISHED;
-    int num_rows = 0;
-    const vector<TResultRow>& all_rows = (*(request_result_set_.get()));
-    // max_rows <= 0 means no limit
-    while ((num_rows < max_rows || max_rows <= 0)
-        && num_rows_fetched_ < all_rows.size()) {
-      fetched_rows->AddOneRow(all_rows[num_rows_fetched_]);
-      ++num_rows_fetched_;
-      ++num_rows;
-    }
-    eos_ = (num_rows_fetched_ == all_rows.size());
-    return Status::OK();
-  }
-
-  if (coord_.get() == nullptr) {
-    return Status("Client tried to fetch rows on a query that produces no 
results.");
-  }
-
-  int32_t num_rows_fetched_from_cache = 0;
-  if (result_cache_max_size_ > 0 && result_cache_ != NULL) {
-    // Satisfy the fetch from the result cache if possible.
-    int cache_fetch_size = (max_rows <= 0) ? result_cache_->size() : max_rows;
-    num_rows_fetched_from_cache =
-        fetched_rows->AddRows(result_cache_.get(), num_rows_fetched_, 
cache_fetch_size);
-    num_rows_fetched_ += num_rows_fetched_from_cache;
-    if (num_rows_fetched_from_cache >= max_rows) return Status::OK();
-  }
-
-  query_state_ = beeswax::QueryState::FINISHED;  // results will be ready 
after this call
-
-  // Maximum number of rows to be fetched from the coord.
-  int32_t max_coord_rows = max_rows;
-  if (max_rows > 0) {
-    DCHECK_LE(num_rows_fetched_from_cache, max_rows);
-    max_coord_rows = max_rows - num_rows_fetched_from_cache;
-  }
-  {
-    SCOPED_TIMER(row_materialization_timer_);
-    size_t before = fetched_rows->size();
-    // Temporarily release lock so calls to Cancel() are not blocked. 
fetch_rows_lock_
-    // (already held) ensures that we do not call coord_->GetNext() multiple 
times
-    // concurrently.
-    // TODO: Simplify this.
-    lock_.unlock();
-    Status status = coord_->GetNext(fetched_rows, max_coord_rows, &eos_);
-    lock_.lock();
-    int num_fetched = fetched_rows->size() - before;
-    DCHECK(max_coord_rows <= 0 || num_fetched <= max_coord_rows) << Substitute(
-        "Fetched more rows ($0) than asked for ($1)", num_fetched, 
max_coord_rows);
-    num_rows_fetched_ += num_fetched;
-
-    RETURN_IF_ERROR(status);
-    // Check if query status has changed during GetNext() call
-    if (!query_status_.ok()) {
-      eos_ = true;
-      return query_status_;
-    }
-  }
-
-  // Update the result cache if necessary.
-  if (result_cache_max_size_ > 0 && result_cache_.get() != NULL) {
-    int rows_fetched_from_coord = fetched_rows->size() - 
num_rows_fetched_from_cache;
-    if (result_cache_->size() + rows_fetched_from_coord > 
result_cache_max_size_) {
-      // Set the cache to NULL to indicate that adding the rows fetched from 
the coord
-      // would exceed the bound of the cache, and therefore, RestartFetch() 
should fail.
-      ClearResultCache();
-      return Status::OK();
-    }
-
-    // We guess the size of the cache after adding fetched_rows by looking at 
the size of
-    // fetched_rows itself, and using this estimate to confirm that the 
memtracker will
-    // allow us to use this much extra memory. In fact, this might be an 
overestimate, as
-    // the size of two result sets combined into one is not always the size of 
both result
-    // sets added together (the best example is the null bitset for each 
column: it might
-    // have only one entry in each result set, and as a result consume two 
bytes, but when
-    // the result sets are combined, only one byte is needed). Therefore after 
we add the
-    // new result set into the cache, we need to fix up the memory consumption 
to the
-    // actual levels to ensure we don't 'leak' bytes that we aren't using.
-    int64_t before = result_cache_->ByteSize();
-
-    // Upper-bound on memory required to add fetched_rows to the cache.
-    int64_t delta_bytes =
-        fetched_rows->ByteSize(num_rows_fetched_from_cache, 
fetched_rows->size());
-    MemTracker* query_mem_tracker = coord_->query_mem_tracker();
-    // Count the cached rows towards the mem limit.
-    if (UNLIKELY(!query_mem_tracker->TryConsume(delta_bytes))) {
-      string details("Failed to allocate memory for result cache.");
-      return query_mem_tracker->MemLimitExceeded(coord_->runtime_state(), 
details,
-          delta_bytes);
-    }
-    // Append all rows fetched from the coordinator into the cache.
-    int num_rows_added = result_cache_->AddRows(
-        fetched_rows, num_rows_fetched_from_cache, fetched_rows->size());
-
-    int64_t after = result_cache_->ByteSize();
-
-    // Confirm that this was not an underestimate of the memory required.
-    DCHECK_GE(before + delta_bytes, after)
-        << "Combined result sets consume more memory than both individually "
-        << Substitute("(before: $0, delta_bytes: $1, after: $2)",
-            before, delta_bytes, after);
-
-    // Fix up the tracked values
-    if (before + delta_bytes > after) {
-      query_mem_tracker->Release(before + delta_bytes - after);
-      delta_bytes = after - before;
-    }
-
-    // Update result set cache metrics.
-    ImpaladMetrics::RESULTSET_CACHE_TOTAL_NUM_ROWS->Increment(num_rows_added);
-    ImpaladMetrics::RESULTSET_CACHE_TOTAL_BYTES->Increment(delta_bytes);
-  }
-
-  return Status::OK();
-}
-
-Status ImpalaServer::QueryExecState::Cancel(bool check_inflight, const Status* 
cause) {
-  if (check_inflight) {
-    // If the query is in 'inflight_queries' it means that the query has 
actually started
-    // executing. It is ok if the query is removed from 'inflight_queries' 
during
-    // cancellation, so we can release the session lock before starting the 
cancellation
-    // work.
-    lock_guard<mutex> session_lock(session_->lock);
-    if (session_->inflight_queries.find(query_id()) == 
session_->inflight_queries.end()) {
-      return Status("Query not yet running");
-    }
-  }
-
-  Coordinator* coord;
-  {
-    lock_guard<mutex> lock(lock_);
-    // If the query is completed or cancelled, no need to update state.
-    bool already_done = eos_ || query_state_ == beeswax::QueryState::EXCEPTION;
-    if (!already_done && cause != NULL) {
-      DCHECK(!cause->ok());
-      UpdateQueryStatus(*cause);
-      query_events_->MarkEvent("Cancelled");
-      DCHECK_EQ(query_state_, beeswax::QueryState::EXCEPTION);
-    }
-    // Get a copy of the coordinator pointer while holding 'lock_'.
-    coord = coord_.get();
-    is_cancelled_ = true;
-  } // Release lock_ before doing cancellation work.
-
-  // Cancel and close child queries before cancelling parent. 'lock_' should 
not be held
-  // because a) ChildQuery::Cancel() calls back into ImpalaServer and b) 
cancellation
-  // involves RPCs and can take quite some time.
-  child_query_executor_->Cancel();
-
-  // Cancel the parent query. 'lock_' should not be held because cancellation 
involves
-  // RPCs and can block for a long time.
-  if (coord != NULL) coord->Cancel(cause);
-  return Status::OK();
-}
-
-Status ImpalaServer::QueryExecState::UpdateCatalog() {
-  if (!exec_request().__isset.query_exec_request ||
-      exec_request().query_exec_request.stmt_type != TStmtType::DML) {
-    return Status::OK();
-  }
-
-  query_events_->MarkEvent("DML data written");
-  SCOPED_TIMER(ADD_TIMER(&server_profile_, "MetastoreUpdateTimer"));
-
-  TQueryExecRequest query_exec_request = exec_request().query_exec_request;
-  if (query_exec_request.__isset.finalize_params) {
-    const TFinalizeParams& finalize_params = 
query_exec_request.finalize_params;
-    TUpdateCatalogRequest catalog_update;
-    catalog_update.__set_header(TCatalogServiceRequestHeader());
-    catalog_update.header.__set_requesting_user(effective_user());
-    if (!coord()->PrepareCatalogUpdate(&catalog_update)) {
-      VLOG_QUERY << "No partitions altered, not updating metastore (query id: "
-                 << query_id() << ")";
-    } else {
-      // TODO: We track partitions written to, not created, which means
-      // that we do more work than is necessary, because written-to
-      // partitions don't always require a metastore change.
-      VLOG_QUERY << "Updating metastore with " << 
catalog_update.created_partitions.size()
-                 << " altered partitions ("
-                 << join (catalog_update.created_partitions, ", ") << ")";
-
-      catalog_update.target_table = finalize_params.table_name;
-      catalog_update.db_name = finalize_params.table_db;
-
-      Status cnxn_status;
-      const TNetworkAddress& address =
-          MakeNetworkAddress(FLAGS_catalog_service_host, 
FLAGS_catalog_service_port);
-      CatalogServiceConnection client(
-          exec_env_->catalogd_client_cache(), address, &cnxn_status);
-      RETURN_IF_ERROR(cnxn_status);
-
-      VLOG_QUERY << "Executing FinalizeDml() using CatalogService";
-      TUpdateCatalogResponse resp;
-      RETURN_IF_ERROR(
-          client.DoRpc(&CatalogServiceClient::UpdateCatalog, catalog_update, 
&resp));
-
-      Status status(resp.result.status);
-      if (!status.ok()) LOG(ERROR) << "ERROR Finalizing DML: " << 
status.GetDetail();
-      RETURN_IF_ERROR(status);
-      RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(resp.result,
-          exec_request_.query_options.sync_ddl));
-    }
-  }
-  query_events_->MarkEvent("DML Metastore update finished");
-  return Status::OK();
-}
-
-void ImpalaServer::QueryExecState::SetResultSet(const TDdlExecResponse* 
ddl_resp) {
-  if (ddl_resp != NULL && ddl_resp->__isset.result_set) {
-    result_metadata_ = ddl_resp->result_set.schema;
-    request_result_set_.reset(new 
vector<TResultRow>(ddl_resp->result_set.rows));
-  }
-}
-
-void ImpalaServer::QueryExecState::SetResultSet(const vector<string>& results) 
{
-  request_result_set_.reset(new vector<TResultRow>);
-  request_result_set_->resize(results.size());
-  for (int i = 0; i < results.size(); ++i) {
-    (*request_result_set_.get())[i].__isset.colVals = true;
-    (*request_result_set_.get())[i].colVals.resize(1);
-    (*request_result_set_.get())[i].colVals[0].__set_string_val(results[i]);
-  }
-}
-
-void ImpalaServer::QueryExecState::SetResultSet(const vector<string>& col1,
-    const vector<string>& col2) {
-  DCHECK_EQ(col1.size(), col2.size());
-
-  request_result_set_.reset(new vector<TResultRow>);
-  request_result_set_->resize(col1.size());
-  for (int i = 0; i < col1.size(); ++i) {
-    (*request_result_set_.get())[i].__isset.colVals = true;
-    (*request_result_set_.get())[i].colVals.resize(2);
-    (*request_result_set_.get())[i].colVals[0].__set_string_val(col1[i]);
-    (*request_result_set_.get())[i].colVals[1].__set_string_val(col2[i]);
-  }
-}
-
-void ImpalaServer::QueryExecState::SetResultSet(const vector<string>& col1,
-    const vector<string>& col2, const vector<string>& col3, const 
vector<string>& col4) {
-  DCHECK_EQ(col1.size(), col2.size());
-  DCHECK_EQ(col1.size(), col3.size());
-  DCHECK_EQ(col1.size(), col4.size());
-
-  request_result_set_.reset(new vector<TResultRow>);
-  request_result_set_->resize(col1.size());
-  for (int i = 0; i < col1.size(); ++i) {
-    (*request_result_set_.get())[i].__isset.colVals = true;
-    (*request_result_set_.get())[i].colVals.resize(4);
-    (*request_result_set_.get())[i].colVals[0].__set_string_val(col1[i]);
-    (*request_result_set_.get())[i].colVals[1].__set_string_val(col2[i]);
-    (*request_result_set_.get())[i].colVals[2].__set_string_val(col3[i]);
-    (*request_result_set_.get())[i].colVals[3].__set_string_val(col4[i]);
-  }
-}
-
-void ImpalaServer::QueryExecState::SetCreateTableAsSelectResultSet() {
-  DCHECK(ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT);
-  int64_t total_num_rows_inserted = 0;
-  // There will only be rows inserted in the case a new table was created as 
part of this
-  // operation.
-  if (catalog_op_executor_->ddl_exec_response()->new_table_created) {
-    DCHECK(coord_.get());
-    for (const PartitionStatusMap::value_type& p: 
coord_->per_partition_status()) {
-      total_num_rows_inserted += p.second.num_modified_rows;
-    }
-  }
-  const string& summary_msg = Substitute("Inserted $0 row(s)", 
total_num_rows_inserted);
-  VLOG_QUERY << summary_msg;
-  vector<string> results(1, summary_msg);
-  SetResultSet(results);
-}
-
-void ImpalaServer::QueryExecState::MarkInactive() {
-  client_wait_sw_.Start();
-  lock_guard<mutex> l(expiration_data_lock_);
-  last_active_time_ms_ = UnixMillis();
-  DCHECK(ref_count_ > 0) << "Invalid MarkInactive()";
-  --ref_count_;
-}
-
-void ImpalaServer::QueryExecState::MarkActive() {
-  client_wait_sw_.Stop();
-  int64_t elapsed_time = client_wait_sw_.ElapsedTime();
-  client_wait_timer_->Set(elapsed_time);
-  lock_guard<mutex> l(expiration_data_lock_);
-  last_active_time_ms_ = UnixMillis();
-  ++ref_count_;
-}
-
-Status ImpalaServer::QueryExecState::UpdateTableAndColumnStats(
-    const vector<ChildQuery*>& child_queries) {
-  DCHECK_GE(child_queries.size(), 1);
-  DCHECK_LE(child_queries.size(), 2);
-  catalog_op_executor_.reset(
-      new CatalogOpExecutor(exec_env_, frontend_, &server_profile_));
-
-  // If there was no column stats query, pass in empty thrift structures to
-  // ExecComputeStats(). Otherwise pass in the column stats result.
-  TTableSchema col_stats_schema;
-  TRowSet col_stats_data;
-  if (child_queries.size() > 1) {
-    col_stats_schema = child_queries[1]->result_schema();
-    col_stats_data = child_queries[1]->result_data();
-  }
-
-  Status status = catalog_op_executor_->ExecComputeStats(
-      exec_request_.catalog_op_request.ddl_params.compute_stats_params,
-      child_queries[0]->result_schema(),
-      child_queries[0]->result_data(),
-      col_stats_schema,
-      col_stats_data);
-  {
-    lock_guard<mutex> l(lock_);
-    RETURN_IF_ERROR(UpdateQueryStatus(status));
-  }
-  RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
-      *catalog_op_executor_->update_catalog_result(),
-      exec_request_.query_options.sync_ddl));
-
-  // Set the results to be reported to the client.
-  SetResultSet(catalog_op_executor_->ddl_exec_response());
-  query_events_->MarkEvent("Metastore update finished");
-  return Status::OK();
-}
-
-void ImpalaServer::QueryExecState::ClearResultCache() {
-  if (result_cache_ == NULL) return;
-  // Update result set cache metrics and mem limit accounting.
-  
ImpaladMetrics::RESULTSET_CACHE_TOTAL_NUM_ROWS->Increment(-result_cache_->size());
-  int64_t total_bytes = result_cache_->ByteSize();
-  ImpaladMetrics::RESULTSET_CACHE_TOTAL_BYTES->Increment(-total_bytes);
-  if (coord_ != NULL) {
-    DCHECK(coord_->query_mem_tracker() != NULL);
-    coord_->query_mem_tracker()->Release(total_bytes);
-  }
-  result_cache_.reset(NULL);
-}
-}

Reply via email to