http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/impala-server.h ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h index 34e204d..88ee934 100644 --- a/be/src/service/impala-server.h +++ b/be/src/service/impala-server.h @@ -69,6 +69,7 @@ class TSessionState; class TQueryOptions; class TGetExecSummaryResp; class TGetExecSummaryReq; +class ClientRequestState; /// An ImpalaServer contains both frontend and backend functionality; /// it implements ImpalaService (Beeswax), ImpalaHiveServer2Service (HiveServer2) @@ -82,14 +83,14 @@ class TGetExecSummaryReq; /// 1. session_state_map_lock_ /// 2. SessionState::lock /// 3. query_expiration_lock_ -/// 4. query_exec_state_map_lock_ -/// 5. QueryExecState::fetch_rows_lock -/// 6. QueryExecState::lock -/// 7. QueryExecState::expiration_data_lock_ +/// 4. client_request_state_map_lock_ +/// 5. ClientRequestState::fetch_rows_lock +/// 6. ClientRequestState::lock +/// 7. ClientRequestState::expiration_data_lock_ /// 8. Coordinator::exec_summary_lock /// /// Coordinator::lock_ should not be acquired at the same time as the -/// ImpalaServer/SessionState/QueryExecState locks. Aside from +/// ImpalaServer/SessionState/ClientRequestState locks. Aside from /// Coordinator::exec_summary_lock_ the Coordinator's lock ordering is independent of /// the above lock ordering. /// @@ -142,7 +143,8 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, /// These APIs will not be implemented because ODBC driver does not use them. virtual void dump_config(std::string& config); - /// ImpalaService rpcs: extensions over Beeswax (implemented in impala-beeswax-server.cc) + /// ImpalaService rpcs: extensions over Beeswax (implemented in + /// impala-beeswax-server.cc) virtual void Cancel(impala::TStatus& status, const beeswax::QueryHandle& query_id); virtual void CloseInsert(impala::TInsertResult& insert_result, const beeswax::QueryHandle& query_handle); @@ -156,7 +158,8 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, virtual void GetExecSummary(impala::TExecSummary& result, const beeswax::QueryHandle& query_id); - /// Performs a full catalog metadata reset, invalidating all table and database metadata. + /// Performs a full catalog metadata reset, invalidating all table and database + /// metadata. virtual void ResetCatalog(impala::TStatus& status); /// Resets the specified table's catalog metadata, forcing a reload on the next access. @@ -229,7 +232,6 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, apache::hive::service::cli::thrift::TRenewDelegationTokenResp& return_val, const apache::hive::service::cli::thrift::TRenewDelegationTokenReq& req); - /// ImpalaService common extensions (implemented in impala-server.cc) /// ImpalaInternalService rpcs void ReportExecStatus(TReportExecStatusResult& return_val, const TReportExecStatusParams& params); @@ -247,8 +249,8 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, /// SessionHandlerIf methods /// Called when a Beeswax or HS2 connection starts. For Beeswax, registers a new - /// SessionState associated with the new connection. For HS2, this is a no-op (HS2 has an - /// explicit CreateSession RPC). + /// SessionState associated with the new connection. For HS2, this is a no-op (HS2 + /// has an explicit CreateSession RPC). virtual void ConnectionStart(const ThriftServer::ConnectionContext& session_context); /// Called when a Beeswax or HS2 connection terminates. Unregisters all sessions @@ -267,6 +269,22 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, void CatalogUpdateCallback(const StatestoreSubscriber::TopicDeltaMap& topic_deltas, std::vector<TTopicDelta>* topic_updates); + /// Processes a CatalogUpdateResult returned from the CatalogServer and ensures + /// the update has been applied to the local impalad's catalog cache. If + /// wait_for_all_subscribers is true, this function will also wait until all + /// catalog topic subscribers have processed the update. Called from ClientRequestState + /// after executing any statement that modifies the catalog. + /// If wait_for_all_subscribers is false AND if the TCatalogUpdateResult contains + /// TCatalogObject(s) to add and/or remove, this function will update the local cache + /// by directly calling UpdateCatalog() with the TCatalogObject results. + /// Otherwise this function will wait until the local impalad's catalog cache has been + /// updated from a statestore heartbeat that includes this catalog update's catalog + /// version. If wait_for_all_subscribers is true, this function also wait all other + /// catalog topic subscribers to process this update by checking the current + /// min_subscriber_topic_version included in each state store heartbeat. + Status ProcessCatalogUpdateResult(const TCatalogUpdateResult& catalog_update_result, + bool wait_for_all_subscribers) WARN_UNUSED_RESULT; + /// Returns true if lineage logging is enabled, false otherwise. bool IsLineageLoggingEnabled(); @@ -282,17 +300,96 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, /// The prefix of audit event log filename. static const string AUDIT_EVENT_LOG_FILE_PREFIX; + /// Per-session state. This object is reference counted using shared_ptrs. There + /// is one ref count in the SessionStateMap for as long as the session is active. + /// All queries running from this session also have a reference. + struct SessionState { + /// The default hs2_version must be V1 so that child queries (which use HS2, but may + /// run as children of Beeswax sessions) get results back in the expected format - + /// child queries inherit the HS2 version from their parents, and a Beeswax session + /// will never update the HS2 version from the default. + SessionState() : closed(false), expired(false), + hs2_version(apache::hive::service::cli::thrift:: + TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V1), total_queries(0), ref_count(0) { + } + + TSessionType::type session_type; + + /// Time the session was created, in ms since epoch (UTC). + int64_t start_time_ms; + + /// Connected user for this session, i.e. the user which originated this session. + std::string connected_user; + + /// The user to delegate to. Empty for no delegation. + std::string do_as_user; + + /// Client network address. + TNetworkAddress network_address; + + /// Protects all fields below. See "Locking" in the class comment for lock + /// acquisition order. + boost::mutex lock; + + /// If true, the session has been closed. + bool closed; + + /// If true, the session was idle for too long and has been expired. Only set when + /// ref_count == 0, after which point ref_count should never become non-zero (since + /// clients will use ScopedSessionState to access the session, which will prevent use + /// after expiration). + bool expired; + + /// The default database (changed as a result of 'use' query execution). + std::string database; + + /// The default query options of this session. When the session is created, the + /// session inherits the global defaults from ImpalaServer::default_query_options_. + TQueryOptions default_query_options; + + /// BitSet indicating which query options in default_query_options have been + /// explicitly set in the session. Updated when a query option is specified using a + /// SET command: the bit corresponding to the TImpalaQueryOptions enum is set. + QueryOptionsMask set_query_options_mask; + + /// For HS2 only, the protocol version this session is expecting. + apache::hive::service::cli::thrift::TProtocolVersion::type hs2_version; + + /// Inflight queries belonging to this session + boost::unordered_set<TUniqueId> inflight_queries; + + /// Total number of queries run as part of this session. + int64_t total_queries; + + /// Time the session was last accessed, in ms since epoch (UTC). + int64_t last_accessed_ms; + + /// The latest Kudu timestamp observed after DML operations executed within this + /// session. + uint64_t kudu_latest_observed_ts; + + /// Number of RPCs concurrently accessing this session state. Used to detect when a + /// session may be correctly expired after a timeout (when ref_count == 0). Typically + /// at most one RPC will be issued against a session at a time, but clients may do + /// something unexpected and, for example, poll in one thread and fetch in another. + uint32_t ref_count; + + /// Per-session idle timeout in seconds. Default value is FLAGS_idle_session_timeout. + /// It can be overridden with a smaller value via the option "idle_session_timeout" + /// when opening a HS2 session. + int32_t session_timeout; + + /// Builds a Thrift representation of this SessionState for serialisation to + /// the frontend. + void ToThrift(const TUniqueId& session_id, TSessionState* session_state); + }; + private: friend class ChildQuery; friend class ImpalaHttpHandler; boost::scoped_ptr<ImpalaHttpHandler> http_handler_; - struct SessionState; - - /// Execution state of a query. - class QueryExecState; - /// Relevant ODBC SQL State code; for more info, /// goto http://msdn.microsoft.com/en-us/library/ms714687.aspx static const char* SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION; @@ -301,8 +398,8 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, /// Return exec state for given query_id, or NULL if not found. /// If 'lock' is true, the returned exec state's lock() will be acquired before - /// the query_exec_state_map_lock_ is released. - std::shared_ptr<QueryExecState> GetQueryExecState( + /// the client_request_state_map_lock_ is released. + std::shared_ptr<ClientRequestState> GetClientRequestState( const TUniqueId& query_id, bool lock); /// Writes the session id, if found, for the given query to the output @@ -310,31 +407,30 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, bool GetSessionIdForQuery(const TUniqueId& query_id, TUniqueId* session_id); /// Updates the number of databases / tables metrics from the FE catalog - Status UpdateCatalogMetrics(); + Status UpdateCatalogMetrics() WARN_UNUSED_RESULT; - /// Starts asynchronous execution of query. Creates QueryExecState (returned + /// Starts asynchronous execution of query. Creates ClientRequestState (returned /// in exec_state), registers it and calls Coordinator::Execute(). /// If it returns with an error status, exec_state will be NULL and nothing - /// will have been registered in query_exec_state_map_. + /// will have been registered in client_request_state_map_. /// session_state is a ptr to the session running this query and must have /// been checked out. /// query_session_state is a snapshot of session state that changes when the /// query was run. (e.g. default database). Status Execute(TQueryCtx* query_ctx, - std::shared_ptr<SessionState> session_state, - std::shared_ptr<QueryExecState>* exec_state); + std::shared_ptr<SessionState> session_state, + std::shared_ptr<ClientRequestState>* exec_state) WARN_UNUSED_RESULT; /// Implements Execute() logic, but doesn't unregister query on error. Status ExecuteInternal(const TQueryCtx& query_ctx, - std::shared_ptr<SessionState> session_state, - bool* registered_exec_state, - std::shared_ptr<QueryExecState>* exec_state); + std::shared_ptr<SessionState> session_state, bool* registered_exec_state, + std::shared_ptr<ClientRequestState>* exec_state) WARN_UNUSED_RESULT; - /// Registers the query exec state with query_exec_state_map_ using the globally + /// Registers the query exec state with client_request_state_map_ using the globally /// unique query_id and add the query id to session state's open query list. /// The caller must have checked out the session state. Status RegisterQuery(std::shared_ptr<SessionState> session_state, - const std::shared_ptr<QueryExecState>& exec_state); + const std::shared_ptr<ClientRequestState>& exec_state) WARN_UNUSED_RESULT; /// Adds the query to the set of in-flight queries for the session. The query remains /// in-flight until the query is unregistered. Until a query is in-flight, an attempt @@ -346,15 +442,15 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, /// The query must have already been registered using RegisterQuery(). The caller /// must have checked out the session state. Status SetQueryInflight(std::shared_ptr<SessionState> session_state, - const std::shared_ptr<QueryExecState>& exec_state); + const std::shared_ptr<ClientRequestState>& exec_state) WARN_UNUSED_RESULT; /// Unregister the query by cancelling it, removing exec_state from - /// query_exec_state_map_, and removing the query id from session state's in-flight + /// client_request_state_map_, and removing the query id from session state's in-flight /// query list. If check_inflight is true, then return an error if the query is not /// yet in-flight. Otherwise, proceed even if the query isn't yet in-flight (for /// cleaning up after an error on the query issuing path). Status UnregisterQuery(const TUniqueId& query_id, bool check_inflight, - const Status* cause = NULL); + const Status* cause = NULL) WARN_UNUSED_RESULT; /// Initiates query cancellation reporting the given cause as the query status. /// Assumes deliberate cancellation by the user if the cause is NULL. Returns an @@ -363,13 +459,14 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, /// be unregistered, after cancellation. Caller should not hold any locks when /// calling this function. Status CancelInternal(const TUniqueId& query_id, bool check_inflight, - const Status* cause = NULL); + const Status* cause = NULL) WARN_UNUSED_RESULT; /// Close the session and release all resource used by this session. /// Caller should not hold any locks when calling this function. /// If ignore_if_absent is true, returns OK even if a session with the supplied ID does /// not exist. - Status CloseSessionInternal(const TUniqueId& session_id, bool ignore_if_absent); + Status CloseSessionInternal(const TUniqueId& session_id, bool ignore_if_absent) + WARN_UNUSED_RESULT; /// Gets the runtime profile string for a given query_id and writes it to the output /// stream. First searches for the query id in the map of in-flight queries. If no @@ -379,10 +476,11 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, /// If base64_encoded, outputs the base64 encoded profile output, otherwise the human /// readable string. Status GetRuntimeProfileStr(const TUniqueId& query_id, bool base64_encoded, - std::stringstream* output); + std::stringstream* output) WARN_UNUSED_RESULT; /// Returns the exec summary for this query. - Status GetExecSummary(const TUniqueId& query_id, TExecSummary* result); + Status GetExecSummary(const TUniqueId& query_id, TExecSummary* result) + WARN_UNUSED_RESULT; /// Initialize "default_configs_" to show the default values for ImpalaQueryOptions and /// "support_start_over/false" to indicate that Impala does not support start over @@ -394,33 +492,35 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, /// Returns OK unless there is some problem preventing profile log files /// from being written. If an error is returned, the constructor will disable /// profile logging. - Status InitProfileLogging(); + Status InitProfileLogging() WARN_UNUSED_RESULT; /// Checks settings for audit event logging, including whether the output /// directory exists and is writeable, and initialises the first log file. /// Returns OK unless there is some problem preventing audit event log files /// from being written. If an error is returned, impalad startup will be aborted. - Status InitAuditEventLogging(); + Status InitAuditEventLogging() WARN_UNUSED_RESULT; /// Checks settings for lineage logging, including whether the output /// directory exists and is writeable, and initialises the first log file. /// Returns OK unless there is some problem preventing lineage log files /// from being written. If an error is returned, impalad startup will be aborted. - Status InitLineageLogging(); + Status InitLineageLogging() WARN_UNUSED_RESULT; /// Initializes a logging directory, creating the directory if it does not already /// exist. If there is any error creating the directory an error will be returned. - static Status InitLoggingDir(const std::string& log_dir); + static Status InitLoggingDir(const std::string& log_dir) WARN_UNUSED_RESULT; /// Returns true if audit event logging is enabled, false otherwise. bool IsAuditEventLoggingEnabled(); - Status LogAuditRecord(const QueryExecState& exec_state, const TExecRequest& request); + Status LogAuditRecord( + const ClientRequestState& exec_state, const TExecRequest& request) + WARN_UNUSED_RESULT; - Status LogLineageRecord(const QueryExecState& exec_state); + Status LogLineageRecord(const ClientRequestState& exec_state) WARN_UNUSED_RESULT; /// Log audit and column lineage events - void LogQueryEvents(const QueryExecState& exec_state); + void LogQueryEvents(const ClientRequestState& exec_state); /// Runs once every 5s to flush the profile log file to disk. [[noreturn]] void LogFileFlushThread(); @@ -432,14 +532,15 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, [[noreturn]] void LineageLoggerFlushThread(); /// Copies a query's state into the query log. Called immediately prior to a - /// QueryExecState's deletion. Also writes the query profile to the profile log on disk. - /// Must be called with query_exec_state_map_lock_ held - void ArchiveQuery(const QueryExecState& query); + /// ClientRequestState's deletion. Also writes the query profile to the profile log + /// on disk. Must be called with client_request_state_map_lock_ held + void ArchiveQuery(const ClientRequestState& query); /// Checks whether the given user is allowed to delegate as the specified do_as_user. /// Returns OK if the authorization suceeds, otherwise returns an status with details /// on why the failure occurred. - Status AuthorizeProxyUser(const std::string& user, const std::string& do_as_user); + Status AuthorizeProxyUser(const std::string& user, const std::string& do_as_user) + WARN_UNUSED_RESULT; // Check if the local backend descriptor is in the list of known backends. If not, add // it to the list of known backends and add it to the 'topic_updates'. @@ -518,7 +619,7 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, /// otherwise leave this.profile empty. /// If encoded_str is non-empty, it is the base64 encoded string for /// exec_state->profile. - QueryStateRecord(const QueryExecState& exec_state, bool copy_profile = false, + QueryStateRecord(const ClientRequestState& exec_state, bool copy_profile = false, const std::string& encoded_str = ""); /// Default constructor used only when participating in collections @@ -533,7 +634,8 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, /// Beeswax private methods /// Helper functions to translate between Beeswax and Impala structs - Status QueryToTQueryContext(const beeswax::Query& query, TQueryCtx* query_ctx); + Status QueryToTQueryContext(const beeswax::Query& query, TQueryCtx* query_ctx) + WARN_UNUSED_RESULT; void TUniqueIdToQueryHandle(const TUniqueId& query_id, beeswax::QueryHandle* handle); void QueryHandleToTUniqueId(const beeswax::QueryHandle& handle, TUniqueId* query_id); @@ -542,19 +644,21 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, /// Executes the fetch logic. Doesn't clean up the exec state if an error occurs. Status FetchInternal(const TUniqueId& query_id, bool start_over, - int32_t fetch_size, beeswax::Results* query_results); + int32_t fetch_size, beeswax::Results* query_results) WARN_UNUSED_RESULT; /// Populate insert_result and clean up exec state. If the query /// status is an error, insert_result is not populated and the status is returned. - Status CloseInsertInternal(const TUniqueId& query_id, TInsertResult* insert_result); + Status CloseInsertInternal(const TUniqueId& query_id, TInsertResult* insert_result) + WARN_UNUSED_RESULT; /// HiveServer2 private methods (implemented in impala-hs2-server.cc) /// Starts the synchronous execution of a HiverServer2 metadata operation. - /// If the execution succeeds, an QueryExecState will be created and registered in - /// query_exec_state_map_. Otherwise, nothing will be registered in query_exec_state_map_ - /// and an error status will be returned. As part of this call, the TMetadataOpRequest - /// struct will be populated with the requesting user's session state. + /// If the execution succeeds, an ClientRequestState will be created and registered in + /// client_request_state_map_. Otherwise, nothing will be registered in + /// client_request_state_map_ and an error status will be returned. As part of this + /// call, the TMetadataOpRequest struct will be populated with the requesting user's + /// session state. /// Returns a TOperationHandle and TStatus. void ExecuteMetadataOp( const apache::hive::service::cli::thrift::THandleIdentifier& session_handle, @@ -566,20 +670,21 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, /// the query's state should be reset to fetch from the beginning of the result set. /// Doesn't clean up the exec state if an error occurs. Status FetchInternal(const TUniqueId& query_id, int32_t fetch_size, bool fetch_first, - apache::hive::service::cli::thrift::TFetchResultsResp* fetch_results); + apache::hive::service::cli::thrift::TFetchResultsResp* fetch_results) + WARN_UNUSED_RESULT; /// Helper functions to translate between HiveServer2 and Impala structs /// Returns !ok() if handle.guid.size() or handle.secret.size() != 16 static Status THandleIdentifierToTUniqueId( const apache::hive::service::cli::thrift::THandleIdentifier& handle, - TUniqueId* unique_id, TUniqueId* secret); + TUniqueId* unique_id, TUniqueId* secret) WARN_UNUSED_RESULT; static void TUniqueIdToTHandleIdentifier( const TUniqueId& unique_id, const TUniqueId& secret, apache::hive::service::cli::thrift::THandleIdentifier* handle); Status TExecuteStatementReqToTQueryContext( const apache::hive::service::cli::thrift::TExecuteStatementReq execute_request, - TQueryCtx* query_ctx); + TQueryCtx* query_ctx) WARN_UNUSED_RESULT; /// Helper method to process cancellations that result from failed backends, called from /// the cancellation thread pool. The cancellation_work contains the query id to cancel @@ -595,22 +700,6 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, void AddPoolQueryOptions(TQueryCtx* query_ctx, const QueryOptionsMask& override_options_mask); - /// Processes a CatalogUpdateResult returned from the CatalogServer and ensures - /// the update has been applied to the local impalad's catalog cache. If - /// wait_for_all_subscribers is true, this function will also wait until all - /// catalog topic subscribers have processed the update. Called from QueryExecState - /// after executing any statement that modifies the catalog. - /// If wait_for_all_subscribers is false AND if the TCatalogUpdateResult contains - /// TCatalogObject(s) to add and/or remove, this function will update the local cache - /// by directly calling UpdateCatalog() with the TCatalogObject results. - /// Otherwise this function will wait until the local impalad's catalog cache has been - /// updated from a statestore heartbeat that includes this catalog update's catalog - /// version. If wait_for_all_subscribers is true, this function also wait all other - /// catalog topic subscribers to process this update by checking the current - /// min_subscriber_topic_version included in each state store heartbeat. - Status ProcessCatalogUpdateResult(const TCatalogUpdateResult& catalog_update_result, - bool wait_for_all_subscribers); - /// Register timeout value upon opening a new session. This will wake up /// session_timeout_thread_ to update its poll period. void RegisterSessionTimeout(int32_t timeout); @@ -680,104 +769,20 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, /// on every poll period expiration or when the poll period changes. boost::condition_variable session_timeout_cv_; - /// map from query id to exec state; QueryExecState is owned by us and referenced + /// map from query id to exec state; ClientRequestState is owned by us and referenced /// as a shared_ptr to allow asynchronous deletion - typedef boost::unordered_map<TUniqueId, std::shared_ptr<QueryExecState>> - QueryExecStateMap; - QueryExecStateMap query_exec_state_map_; + typedef boost::unordered_map<TUniqueId, std::shared_ptr<ClientRequestState>> + ClientRequestStateMap; + ClientRequestStateMap client_request_state_map_; - /// Protects query_exec_state_map_. See "Locking" in the class comment for lock + /// Protects client_request_state_map_. See "Locking" in the class comment for lock /// acquisition order. - boost::mutex query_exec_state_map_lock_; + boost::mutex client_request_state_map_lock_; /// Default query options in the form of TQueryOptions and beeswax::ConfigVariable TQueryOptions default_query_options_; std::vector<beeswax::ConfigVariable> default_configs_; - /// Per-session state. This object is reference counted using shared_ptrs. There - /// is one ref count in the SessionStateMap for as long as the session is active. - /// All queries running from this session also have a reference. - struct SessionState { - /// The default hs2_version must be V1 so that child queries (which use HS2, but may - /// run as children of Beeswax sessions) get results back in the expected format - - /// child queries inherit the HS2 version from their parents, and a Beeswax session - /// will never update the HS2 version from the default. - SessionState() : closed(false), expired(false), - hs2_version(apache::hive::service::cli::thrift:: - TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V1), total_queries(0), ref_count(0) { - } - - TSessionType::type session_type; - - /// Time the session was created, in ms since epoch (UTC). - int64_t start_time_ms; - - /// Connected user for this session, i.e. the user which originated this session. - std::string connected_user; - - /// The user to delegate to. Empty for no delegation. - std::string do_as_user; - - /// Client network address. - TNetworkAddress network_address; - - /// Protects all fields below. See "Locking" in the class comment for lock - /// acquisition order. - boost::mutex lock; - - /// If true, the session has been closed. - bool closed; - - /// If true, the session was idle for too long and has been expired. Only set when - /// ref_count == 0, after which point ref_count should never become non-zero (since - /// clients will use ScopedSessionState to access the session, which will prevent use - /// after expiration). - bool expired; - - /// The default database (changed as a result of 'use' query execution). - std::string database; - - /// The default query options of this session. When the session is created, the - /// session inherits the global defaults from ImpalaServer::default_query_options_. - TQueryOptions default_query_options; - - /// BitSet indicating which query options in default_query_options have been - /// explicitly set in the session. Updated when a query option is specified using a - /// SET command: the bit corresponding to the TImpalaQueryOptions enum is set. - QueryOptionsMask set_query_options_mask; - - /// For HS2 only, the protocol version this session is expecting. - apache::hive::service::cli::thrift::TProtocolVersion::type hs2_version; - - /// Inflight queries belonging to this session - boost::unordered_set<TUniqueId> inflight_queries; - - /// Total number of queries run as part of this session. - int64_t total_queries; - - /// Time the session was last accessed, in ms since epoch (UTC). - int64_t last_accessed_ms; - - /// The latest Kudu timestamp observed after DML operations executed within this - /// session. - uint64_t kudu_latest_observed_ts; - - /// Number of RPCs concurrently accessing this session state. Used to detect when a - /// session may be correctly expired after a timeout (when ref_count == 0). Typically - /// at most one RPC will be issued against a session at a time, but clients may do - /// something unexpected and, for example, poll in one thread and fetch in another. - uint32_t ref_count; - - /// Per-session idle timeout in seconds. Default value is FLAGS_idle_session_timeout. - /// It can be overridden with a smaller value via the option "idle_session_timeout" - /// when opening a HS2 session. - int32_t session_timeout; - - /// Builds a Thrift representation of this SessionState for serialisation to - /// the frontend. - void ToThrift(const TUniqueId& session_id, TSessionState* session_state); - }; - /// Class that allows users of SessionState to mark a session as in-use, and therefore /// immune to expiration. The marking is done in WithSession() and undone in the /// destructor, so this class can be used to 'check-out' a session for the duration of a @@ -790,7 +795,7 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, /// object goes out of scope. Returns OK unless there is an error in GetSessionState. /// Must only be called once per ScopedSessionState. Status WithSession(const TUniqueId& session_id, - std::shared_ptr<SessionState>* session = NULL) { + std::shared_ptr<SessionState>* session = NULL) WARN_UNUSED_RESULT { DCHECK(session_.get() == NULL); RETURN_IF_ERROR(impala_->GetSessionState(session_id, &session_, true)); if (session != NULL) (*session) = session_; @@ -840,7 +845,8 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, /// If mark_active is true, also checks if the session is expired or closed and /// increments the session's reference counter if it is still alive. Status GetSessionState(const TUniqueId& session_id, - std::shared_ptr<SessionState>* session_state, bool mark_active = false); + std::shared_ptr<SessionState>* session_state, bool mark_active = false) + WARN_UNUSED_RESULT; /// Decrement the session's reference counter and mark last_accessed_ms so that state /// expiration can proceed.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/query-exec-state.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc deleted file mode 100644 index 72dba6f..0000000 --- a/be/src/service/query-exec-state.cc +++ /dev/null @@ -1,1084 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -#include "service/query-exec-state.h" - -#include <limits> -#include <gutil/strings/substitute.h> - -#include "exprs/expr-context.h" -#include "exprs/expr.h" -#include "runtime/mem-tracker.h" -#include "runtime/row-batch.h" -#include "runtime/runtime-state.h" -#include "runtime/exec-env.h" -#include "scheduling/admission-controller.h" -#include "scheduling/scheduler.h" -#include "service/frontend.h" -#include "service/impala-server.h" -#include "service/query-options.h" -#include "service/query-result-set.h" -#include "util/debug-util.h" -#include "util/impalad-metrics.h" -#include "util/runtime-profile-counters.h" -#include "util/time.h" - -#include "gen-cpp/CatalogService.h" -#include "gen-cpp/CatalogService_types.h" - -#include <thrift/Thrift.h> - -#include "common/names.h" - -using boost::algorithm::join; -using namespace apache::hive::service::cli::thrift; -using namespace apache::thrift; -using namespace beeswax; -using namespace strings; - -DECLARE_int32(catalog_service_port); -DECLARE_string(catalog_service_host); -DECLARE_int64(max_result_cache_size); - -namespace impala { - -// Keys into the info string map of the runtime profile referring to specific -// items used by CM for monitoring purposes. -static const string PER_HOST_MEM_KEY = "Estimated Per-Host Mem"; -static const string PER_HOST_MEMORY_RESERVATION_KEY = "Per-Host Memory Reservation"; -static const string TABLES_MISSING_STATS_KEY = "Tables Missing Stats"; -static const string TABLES_WITH_CORRUPT_STATS_KEY = "Tables With Corrupt Table Stats"; -static const string TABLES_WITH_MISSING_DISK_IDS_KEY = "Tables With Missing Disk Ids"; - -ImpalaServer::QueryExecState::QueryExecState(const TQueryCtx& query_ctx, - ExecEnv* exec_env, Frontend* frontend, ImpalaServer* server, - shared_ptr<SessionState> session) - : query_ctx_(query_ctx), - last_active_time_ms_(numeric_limits<int64_t>::max()), - ref_count_(0L), - child_query_executor_(new ChildQueryExecutor), - exec_env_(exec_env), - is_block_on_wait_joining_(false), - session_(session), - schedule_(NULL), - coord_(NULL), - result_cache_max_size_(-1), - profile_(&profile_pool_, "Query"), // assign name w/ id after planning - server_profile_(&profile_pool_, "ImpalaServer"), - summary_profile_(&profile_pool_, "Summary"), - is_cancelled_(false), - eos_(false), - query_state_(beeswax::QueryState::CREATED), - current_batch_(NULL), - current_batch_row_(0), - num_rows_fetched_(0), - fetched_rows_(false), - frontend_(frontend), - parent_server_(server), - start_time_(TimestampValue::LocalTime()) { -#ifndef NDEBUG - profile_.AddInfoString("DEBUG MODE WARNING", "Query profile created while running a " - "DEBUG build of Impala. Use RELEASE builds to measure query performance."); -#endif - row_materialization_timer_ = ADD_TIMER(&server_profile_, "RowMaterializationTimer"); - client_wait_timer_ = ADD_TIMER(&server_profile_, "ClientFetchWaitTimer"); - query_events_ = summary_profile_.AddEventSequence("Query Timeline"); - query_events_->Start(); - profile_.AddChild(&summary_profile_); - - profile_.set_name("Query (id=" + PrintId(query_id()) + ")"); - summary_profile_.AddInfoString("Session ID", PrintId(session_id())); - summary_profile_.AddInfoString("Session Type", PrintTSessionType(session_type())); - if (session_type() == TSessionType::HIVESERVER2) { - summary_profile_.AddInfoString("HiveServer2 Protocol Version", - Substitute("V$0", 1 + session->hs2_version)); - } - summary_profile_.AddInfoString("Start Time", start_time().DebugString()); - summary_profile_.AddInfoString("End Time", ""); - summary_profile_.AddInfoString("Query Type", "N/A"); - summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_)); - summary_profile_.AddInfoString("Query Status", "OK"); - summary_profile_.AddInfoString("Impala Version", GetVersionString(/* compact */ true)); - summary_profile_.AddInfoString("User", effective_user()); - summary_profile_.AddInfoString("Connected User", connected_user()); - summary_profile_.AddInfoString("Delegated User", do_as_user()); - summary_profile_.AddInfoString("Network Address", - lexical_cast<string>(session_->network_address)); - summary_profile_.AddInfoString("Default Db", default_db()); - summary_profile_.AddInfoString("Sql Statement", query_ctx_.client_request.stmt); - summary_profile_.AddInfoString("Coordinator", - TNetworkAddressToString(exec_env->backend_address())); -} - -ImpalaServer::QueryExecState::~QueryExecState() { - DCHECK(wait_thread_.get() == NULL) << "BlockOnWait() needs to be called!"; -} - -Status ImpalaServer::QueryExecState::SetResultCache(QueryResultSet* cache, - int64_t max_size) { - lock_guard<mutex> l(lock_); - DCHECK(result_cache_ == NULL); - result_cache_.reset(cache); - if (max_size > FLAGS_max_result_cache_size) { - return Status( - Substitute("Requested result-cache size of $0 exceeds Impala's maximum of $1.", - max_size, FLAGS_max_result_cache_size)); - } - result_cache_max_size_ = max_size; - return Status::OK(); -} - -Status ImpalaServer::QueryExecState::Exec(TExecRequest* exec_request) { - MarkActive(); - exec_request_ = *exec_request; - - profile_.AddChild(&server_profile_); - summary_profile_.AddInfoString("Query Type", PrintTStmtType(stmt_type())); - summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_)); - summary_profile_.AddInfoString("Query Options (non default)", - DebugQueryOptions(query_ctx_.client_request.query_options)); - - switch (exec_request->stmt_type) { - case TStmtType::QUERY: - case TStmtType::DML: - DCHECK(exec_request_.__isset.query_exec_request); - return ExecQueryOrDmlRequest(exec_request_.query_exec_request); - case TStmtType::EXPLAIN: { - request_result_set_.reset(new vector<TResultRow>( - exec_request_.explain_result.results)); - return Status::OK(); - } - case TStmtType::DDL: { - DCHECK(exec_request_.__isset.catalog_op_request); - return ExecDdlRequest(); - } - case TStmtType::LOAD: { - DCHECK(exec_request_.__isset.load_data_request); - TLoadDataResp response; - RETURN_IF_ERROR( - frontend_->LoadData(exec_request_.load_data_request, &response)); - request_result_set_.reset(new vector<TResultRow>); - request_result_set_->push_back(response.load_summary); - - // Now refresh the table metadata. - TCatalogOpRequest reset_req; - reset_req.__set_op_type(TCatalogOpType::RESET_METADATA); - reset_req.__set_reset_metadata_params(TResetMetadataRequest()); - reset_req.reset_metadata_params.__set_header(TCatalogServiceRequestHeader()); - reset_req.reset_metadata_params.__set_is_refresh(true); - reset_req.reset_metadata_params.__set_table_name( - exec_request_.load_data_request.table_name); - catalog_op_executor_.reset( - new CatalogOpExecutor(exec_env_, frontend_, &server_profile_)); - RETURN_IF_ERROR(catalog_op_executor_->Exec(reset_req)); - RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult( - *catalog_op_executor_->update_catalog_result(), - exec_request_.query_options.sync_ddl)); - return Status::OK(); - } - case TStmtType::SET: { - DCHECK(exec_request_.__isset.set_query_option_request); - lock_guard<mutex> l(session_->lock); - if (exec_request_.set_query_option_request.__isset.key) { - // "SET key=value" updates the session query options. - DCHECK(exec_request_.set_query_option_request.__isset.value); - RETURN_IF_ERROR(SetQueryOption( - exec_request_.set_query_option_request.key, - exec_request_.set_query_option_request.value, - &session_->default_query_options, - &session_->set_query_options_mask)); - SetResultSet({}, {}); - } else { - // "SET" returns a table of all query options. - map<string, string> config; - TQueryOptionsToMap( - session_->default_query_options, &config); - vector<string> keys, values; - map<string, string>::const_iterator itr = config.begin(); - for (; itr != config.end(); ++itr) { - keys.push_back(itr->first); - values.push_back(itr->second); - } - SetResultSet(keys, values); - } - return Status::OK(); - } - default: - stringstream errmsg; - errmsg << "Unknown exec request stmt type: " << exec_request_.stmt_type; - return Status(errmsg.str()); - } -} - -Status ImpalaServer::QueryExecState::ExecLocalCatalogOp( - const TCatalogOpRequest& catalog_op) { - switch (catalog_op.op_type) { - case TCatalogOpType::USE: { - lock_guard<mutex> l(session_->lock); - session_->database = exec_request_.catalog_op_request.use_db_params.db; - return Status::OK(); - } - case TCatalogOpType::SHOW_TABLES: { - const TShowTablesParams* params = &catalog_op.show_tables_params; - // A NULL pattern means match all tables. However, Thrift string types can't - // be NULL in C++, so we have to test if it's set rather than just blindly - // using the value. - const string* table_name = - params->__isset.show_pattern ? &(params->show_pattern) : NULL; - TGetTablesResult table_names; - RETURN_IF_ERROR(frontend_->GetTableNames(params->db, table_name, - &query_ctx_.session, &table_names)); - SetResultSet(table_names.tables); - return Status::OK(); - } - case TCatalogOpType::SHOW_DBS: { - const TShowDbsParams* params = &catalog_op.show_dbs_params; - TGetDbsResult dbs; - const string* db_pattern = - params->__isset.show_pattern ? (¶ms->show_pattern) : NULL; - RETURN_IF_ERROR( - frontend_->GetDbs(db_pattern, &query_ctx_.session, &dbs)); - vector<string> names, comments; - names.reserve(dbs.dbs.size()); - comments.reserve(dbs.dbs.size()); - for (const TDatabase& db: dbs.dbs) { - names.push_back(db.db_name); - comments.push_back(db.metastore_db.description); - } - SetResultSet(names, comments); - return Status::OK(); - } - case TCatalogOpType::SHOW_DATA_SRCS: { - const TShowDataSrcsParams* params = &catalog_op.show_data_srcs_params; - TGetDataSrcsResult result; - const string* pattern = - params->__isset.show_pattern ? (¶ms->show_pattern) : NULL; - RETURN_IF_ERROR( - frontend_->GetDataSrcMetadata(pattern, &result)); - SetResultSet(result.data_src_names, result.locations, result.class_names, - result.api_versions); - return Status::OK(); - } - case TCatalogOpType::SHOW_STATS: { - const TShowStatsParams& params = catalog_op.show_stats_params; - TResultSet response; - RETURN_IF_ERROR(frontend_->GetStats(params, &response)); - // Set the result set and its schema from the response. - request_result_set_.reset(new vector<TResultRow>(response.rows)); - result_metadata_ = response.schema; - return Status::OK(); - } - case TCatalogOpType::SHOW_FUNCTIONS: { - const TShowFunctionsParams* params = &catalog_op.show_fns_params; - TGetFunctionsResult functions; - const string* fn_pattern = - params->__isset.show_pattern ? (¶ms->show_pattern) : NULL; - RETURN_IF_ERROR(frontend_->GetFunctions( - params->category, params->db, fn_pattern, &query_ctx_.session, &functions)); - SetResultSet(functions.fn_ret_types, functions.fn_signatures, - functions.fn_binary_types, functions.fn_persistence); - return Status::OK(); - } - case TCatalogOpType::SHOW_ROLES: { - const TShowRolesParams& params = catalog_op.show_roles_params; - if (params.is_admin_op) { - // Verify the user has privileges to perform this operation by checking against - // the Sentry Service (via the Catalog Server). - catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_, - &server_profile_)); - - TSentryAdminCheckRequest req; - req.__set_header(TCatalogServiceRequestHeader()); - req.header.__set_requesting_user(effective_user()); - RETURN_IF_ERROR(catalog_op_executor_->SentryAdminCheck(req)); - } - - // If we have made it here, the user has privileges to execute this operation. - // Return the results. - TShowRolesResult result; - RETURN_IF_ERROR(frontend_->ShowRoles(params, &result)); - SetResultSet(result.role_names); - return Status::OK(); - } - case TCatalogOpType::SHOW_GRANT_ROLE: { - const TShowGrantRoleParams& params = catalog_op.show_grant_role_params; - if (params.is_admin_op) { - // Verify the user has privileges to perform this operation by checking against - // the Sentry Service (via the Catalog Server). - catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_, - &server_profile_)); - - TSentryAdminCheckRequest req; - req.__set_header(TCatalogServiceRequestHeader()); - req.header.__set_requesting_user(effective_user()); - RETURN_IF_ERROR(catalog_op_executor_->SentryAdminCheck(req)); - } - - TResultSet response; - RETURN_IF_ERROR(frontend_->GetRolePrivileges(params, &response)); - // Set the result set and its schema from the response. - request_result_set_.reset(new vector<TResultRow>(response.rows)); - result_metadata_ = response.schema; - return Status::OK(); - } - case TCatalogOpType::DESCRIBE_DB: { - TDescribeResult response; - RETURN_IF_ERROR(frontend_->DescribeDb(catalog_op.describe_db_params, - &response)); - // Set the result set - request_result_set_.reset(new vector<TResultRow>(response.results)); - return Status::OK(); - } - case TCatalogOpType::DESCRIBE_TABLE: { - TDescribeResult response; - RETURN_IF_ERROR(frontend_->DescribeTable(catalog_op.describe_table_params, - &response)); - // Set the result set - request_result_set_.reset(new vector<TResultRow>(response.results)); - return Status::OK(); - } - case TCatalogOpType::SHOW_CREATE_TABLE: { - string response; - RETURN_IF_ERROR(frontend_->ShowCreateTable(catalog_op.show_create_table_params, - &response)); - SetResultSet(vector<string>(1, response)); - return Status::OK(); - } - case TCatalogOpType::SHOW_CREATE_FUNCTION: { - string response; - RETURN_IF_ERROR(frontend_->ShowCreateFunction(catalog_op.show_create_function_params, - &response)); - SetResultSet(vector<string>(1, response)); - return Status::OK(); - } - case TCatalogOpType::SHOW_FILES: { - TResultSet response; - RETURN_IF_ERROR(frontend_->GetTableFiles(catalog_op.show_files_params, &response)); - // Set the result set and its schema from the response. - request_result_set_.reset(new vector<TResultRow>(response.rows)); - result_metadata_ = response.schema; - return Status::OK(); - } - default: { - stringstream ss; - ss << "Unexpected TCatalogOpType: " << catalog_op.op_type; - return Status(ss.str()); - } - } -} - -Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest( - const TQueryExecRequest& query_exec_request) { - // we always need at least one plan fragment - DCHECK(query_exec_request.plan_exec_info.size() > 0); - - if (query_exec_request.__isset.query_plan) { - stringstream plan_ss; - // Add some delimiters to make it clearer where the plan - // begins and the profile ends - plan_ss << "\n----------------\n" - << query_exec_request.query_plan - << "----------------"; - summary_profile_.AddInfoString("Plan", plan_ss.str()); - } - // Add info strings consumed by CM: Estimated mem and tables missing stats. - if (query_exec_request.__isset.per_host_mem_estimate) { - stringstream ss; - ss << query_exec_request.per_host_mem_estimate; - summary_profile_.AddInfoString(PER_HOST_MEM_KEY, ss.str()); - } - if (query_exec_request.__isset.per_host_min_reservation) { - stringstream ss; - ss << query_exec_request.per_host_min_reservation; - summary_profile_.AddInfoString(PER_HOST_MEMORY_RESERVATION_KEY, ss.str()); - } - if (!query_exec_request.query_ctx.__isset.parent_query_id && - query_exec_request.query_ctx.__isset.tables_missing_stats && - !query_exec_request.query_ctx.tables_missing_stats.empty()) { - stringstream ss; - const vector<TTableName>& tbls = query_exec_request.query_ctx.tables_missing_stats; - for (int i = 0; i < tbls.size(); ++i) { - if (i != 0) ss << ","; - ss << tbls[i].db_name << "." << tbls[i].table_name; - } - summary_profile_.AddInfoString(TABLES_MISSING_STATS_KEY, ss.str()); - } - - if (!query_exec_request.query_ctx.__isset.parent_query_id && - query_exec_request.query_ctx.__isset.tables_with_corrupt_stats && - !query_exec_request.query_ctx.tables_with_corrupt_stats.empty()) { - stringstream ss; - const vector<TTableName>& tbls = - query_exec_request.query_ctx.tables_with_corrupt_stats; - for (int i = 0; i < tbls.size(); ++i) { - if (i != 0) ss << ","; - ss << tbls[i].db_name << "." << tbls[i].table_name; - } - summary_profile_.AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, ss.str()); - } - - if (query_exec_request.query_ctx.__isset.tables_missing_diskids && - !query_exec_request.query_ctx.tables_missing_diskids.empty()) { - stringstream ss; - const vector<TTableName>& tbls = - query_exec_request.query_ctx.tables_missing_diskids; - for (int i = 0; i < tbls.size(); ++i) { - if (i != 0) ss << ","; - ss << tbls[i].db_name << "." << tbls[i].table_name; - } - summary_profile_.AddInfoString(TABLES_WITH_MISSING_DISK_IDS_KEY, ss.str()); - } - - { - lock_guard<mutex> l(lock_); - // Don't start executing the query if Cancel() was called concurrently with Exec(). - if (is_cancelled_) return Status::CANCELLED; - // TODO: make schedule local to coordinator and move schedule_->Release() into - // Coordinator::TearDown() - schedule_.reset(new QuerySchedule(query_id(), query_exec_request, - exec_request_.query_options, &summary_profile_, query_events_)); - } - Status status = exec_env_->scheduler()->Schedule(schedule_.get()); - { - lock_guard<mutex> l(lock_); - RETURN_IF_ERROR(UpdateQueryStatus(status)); - } - - if (exec_env_->admission_controller() != nullptr) { - status = exec_env_->admission_controller()->AdmitQuery(schedule_.get()); - { - lock_guard<mutex> l(lock_); - RETURN_IF_ERROR(UpdateQueryStatus(status)); - } - } - - coord_.reset(new Coordinator(*schedule_, exec_env_, query_events_)); - status = coord_->Exec(); - { - lock_guard<mutex> l(lock_); - RETURN_IF_ERROR(UpdateQueryStatus(status)); - } - - profile_.AddChild(coord_->query_profile()); - return Status::OK(); -} - -Status ImpalaServer::QueryExecState::ExecDdlRequest() { - string op_type = catalog_op_type() == TCatalogOpType::DDL ? - PrintTDdlType(ddl_type()) : PrintTCatalogOpType(catalog_op_type()); - summary_profile_.AddInfoString("DDL Type", op_type); - - if (catalog_op_type() != TCatalogOpType::DDL && - catalog_op_type() != TCatalogOpType::RESET_METADATA) { - Status status = ExecLocalCatalogOp(exec_request_.catalog_op_request); - lock_guard<mutex> l(lock_); - return UpdateQueryStatus(status); - } - - if (ddl_type() == TDdlType::COMPUTE_STATS) { - TComputeStatsParams& compute_stats_params = - exec_request_.catalog_op_request.ddl_params.compute_stats_params; - // Add child queries for computing table and column stats. - vector<ChildQuery> child_queries; - if (compute_stats_params.__isset.tbl_stats_query) { - child_queries.push_back( - ChildQuery(compute_stats_params.tbl_stats_query, this, parent_server_)); - } - if (compute_stats_params.__isset.col_stats_query) { - child_queries.push_back( - ChildQuery(compute_stats_params.col_stats_query, this, parent_server_)); - } - - if (child_queries.size() > 0) child_query_executor_->ExecAsync(move(child_queries)); - return Status::OK(); - } - - catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_, - &server_profile_)); - Status status = catalog_op_executor_->Exec(exec_request_.catalog_op_request); - { - lock_guard<mutex> l(lock_); - RETURN_IF_ERROR(UpdateQueryStatus(status)); - } - - // If this is a CTAS request, there will usually be more work to do - // after executing the CREATE TABLE statement (the INSERT portion of the operation). - // The exception is if the user specified IF NOT EXISTS and the table already - // existed, in which case we do not execute the INSERT. - if (catalog_op_type() == TCatalogOpType::DDL && - ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT && - !catalog_op_executor_->ddl_exec_response()->new_table_created) { - DCHECK(exec_request_.catalog_op_request. - ddl_params.create_table_params.if_not_exists); - return Status::OK(); - } - - // Add newly created table to catalog cache. - RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult( - *catalog_op_executor_->update_catalog_result(), - exec_request_.query_options.sync_ddl)); - - if (catalog_op_type() == TCatalogOpType::DDL && - ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT) { - // At this point, the remainder of the CTAS request executes - // like a normal DML request. As with other DML requests, it will - // wait for another catalog update if any partitions were altered as a result - // of the operation. - DCHECK(exec_request_.__isset.query_exec_request); - RETURN_IF_ERROR(ExecQueryOrDmlRequest(exec_request_.query_exec_request)); - } - - // Set the results to be reported to the client. - SetResultSet(catalog_op_executor_->ddl_exec_response()); - return Status::OK(); -} - -void ImpalaServer::QueryExecState::Done() { - MarkActive(); - // Make sure we join on wait_thread_ before we finish (and especially before this object - // is destroyed). - BlockOnWait(); - - // Update latest observed Kudu timestamp stored in the session from the coordinator. - // Needs to take the session_ lock which must not be taken while holding lock_, so this - // must happen before taking lock_ below. - if (coord_.get() != NULL) { - // This is safe to access on coord_ after Wait() has been called. - uint64_t latest_kudu_ts = coord_->GetLatestKuduInsertTimestamp(); - if (latest_kudu_ts > 0) { - VLOG_RPC << "Updating session (id=" << session_id() << ") with latest " - << "observed Kudu timestamp: " << latest_kudu_ts; - lock_guard<mutex> session_lock(session_->lock); - session_->kudu_latest_observed_ts = std::max<uint64_t>( - session_->kudu_latest_observed_ts, latest_kudu_ts); - } - } - - unique_lock<mutex> l(lock_); - end_time_ = TimestampValue::LocalTime(); - summary_profile_.AddInfoString("End Time", end_time().DebugString()); - summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_)); - query_events_->MarkEvent("Unregister query"); - - // Update result set cache metrics, and update mem limit accounting before tearing - // down the coordinator. - ClearResultCache(); - - if (coord_.get() != NULL) { - // Release any reserved resources. - if (exec_env_->admission_controller() != nullptr) { - Status status = exec_env_->admission_controller()->ReleaseQuery(schedule_.get()); - if (!status.ok()) { - LOG(WARNING) << "Failed to release resources of query " << schedule_->query_id() - << " because of error: " << status.GetDetail(); - } - } - coord_->TearDown(); - } -} - -Status ImpalaServer::QueryExecState::Exec(const TMetadataOpRequest& exec_request) { - TResultSet metadata_op_result; - // Like the other Exec(), fill out as much profile information as we're able to. - summary_profile_.AddInfoString("Query Type", PrintTStmtType(TStmtType::DDL)); - summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_)); - RETURN_IF_ERROR(frontend_->ExecHiveServer2MetadataOp(exec_request, - &metadata_op_result)); - result_metadata_ = metadata_op_result.schema; - request_result_set_.reset(new vector<TResultRow>(metadata_op_result.rows)); - return Status::OK(); -} - -void ImpalaServer::QueryExecState::WaitAsync() { - wait_thread_.reset(new Thread( - "query-exec-state", "wait-thread", &ImpalaServer::QueryExecState::Wait, this)); -} - -void ImpalaServer::QueryExecState::BlockOnWait() { - unique_lock<mutex> l(lock_); - if (wait_thread_.get() == NULL) return; - if (!is_block_on_wait_joining_) { - // No other thread is already joining on wait_thread_, so this thread needs to do - // it. Other threads will need to block on the cond-var. - is_block_on_wait_joining_ = true; - l.unlock(); - wait_thread_->Join(); - l.lock(); - is_block_on_wait_joining_ = false; - wait_thread_.reset(); - block_on_wait_cv_.notify_all(); - } else { - // Another thread is already joining with wait_thread_. Block on the cond-var - // until the Join() executed in the other thread has completed. - do { - block_on_wait_cv_.wait(l); - } while (is_block_on_wait_joining_); - } -} - -void ImpalaServer::QueryExecState::Wait() { - // block until results are ready - Status status = WaitInternal(); - { - lock_guard<mutex> l(lock_); - if (returns_result_set()) { - query_events()->MarkEvent("Rows available"); - } else { - query_events()->MarkEvent("Request finished"); - } - UpdateQueryStatus(status); - } - if (status.ok()) { - UpdateNonErrorQueryState(beeswax::QueryState::FINISHED); - } -} - -Status ImpalaServer::QueryExecState::WaitInternal() { - // Explain requests have already populated the result set. Nothing to do here. - if (exec_request_.stmt_type == TStmtType::EXPLAIN) { - MarkInactive(); - return Status::OK(); - } - - vector<ChildQuery*> child_queries; - Status child_queries_status = child_query_executor_->WaitForAll(&child_queries); - { - lock_guard<mutex> l(lock_); - RETURN_IF_ERROR(query_status_); - RETURN_IF_ERROR(UpdateQueryStatus(child_queries_status)); - } - if (!child_queries.empty()) query_events_->MarkEvent("Child queries finished"); - - if (coord_.get() != NULL) { - RETURN_IF_ERROR(coord_->Wait()); - RETURN_IF_ERROR(UpdateCatalog()); - } - - if (catalog_op_type() == TCatalogOpType::DDL && - ddl_type() == TDdlType::COMPUTE_STATS && child_queries.size() > 0) { - RETURN_IF_ERROR(UpdateTableAndColumnStats(child_queries)); - } - - if (!returns_result_set()) { - // Queries that do not return a result are finished at this point. This includes - // DML operations and a subset of the DDL operations. - eos_ = true; - } else if (catalog_op_type() == TCatalogOpType::DDL && - ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT) { - SetCreateTableAsSelectResultSet(); - } - // Rows are available now (for SELECT statement), so start the 'wait' timer that tracks - // how long Impala waits for the client to fetch rows. For other statements, track the - // time until a Close() is received. - MarkInactive(); - return Status::OK(); -} - -Status ImpalaServer::QueryExecState::FetchRows(const int32_t max_rows, - QueryResultSet* fetched_rows) { - // Pause the wait timer, since the client has instructed us to do work on its behalf. - MarkActive(); - - // ImpalaServer::FetchInternal has already taken our lock_ - UpdateQueryStatus(FetchRowsInternal(max_rows, fetched_rows)); - - MarkInactive(); - return query_status_; -} - -Status ImpalaServer::QueryExecState::RestartFetch() { - // No result caching for this query. Restart is invalid. - if (result_cache_max_size_ <= 0) { - return Status(ErrorMsg(TErrorCode::RECOVERABLE_ERROR, - "Restarting of fetch requires enabling of query result caching.")); - } - // The cache overflowed on a previous fetch. - if (result_cache_.get() == NULL) { - stringstream ss; - ss << "The query result cache exceeded its limit of " << result_cache_max_size_ - << " rows. Restarting the fetch is not possible."; - return Status(ErrorMsg(TErrorCode::RECOVERABLE_ERROR, ss.str())); - } - // Reset fetch state to start over. - eos_ = false; - num_rows_fetched_ = 0; - return Status::OK(); -} - -void ImpalaServer::QueryExecState::UpdateNonErrorQueryState( - beeswax::QueryState::type query_state) { - lock_guard<mutex> l(lock_); - DCHECK(query_state != beeswax::QueryState::EXCEPTION); - if (query_state_ < query_state) query_state_ = query_state; -} - -Status ImpalaServer::QueryExecState::UpdateQueryStatus(const Status& status) { - // Preserve the first non-ok status - if (!status.ok() && query_status_.ok()) { - query_state_ = beeswax::QueryState::EXCEPTION; - query_status_ = status; - summary_profile_.AddInfoString("Query Status", query_status_.GetDetail()); - } - - return status; -} - -Status ImpalaServer::QueryExecState::FetchRowsInternal(const int32_t max_rows, - QueryResultSet* fetched_rows) { - DCHECK(query_state_ != beeswax::QueryState::EXCEPTION); - - if (eos_) return Status::OK(); - - if (request_result_set_ != NULL) { - query_state_ = beeswax::QueryState::FINISHED; - int num_rows = 0; - const vector<TResultRow>& all_rows = (*(request_result_set_.get())); - // max_rows <= 0 means no limit - while ((num_rows < max_rows || max_rows <= 0) - && num_rows_fetched_ < all_rows.size()) { - fetched_rows->AddOneRow(all_rows[num_rows_fetched_]); - ++num_rows_fetched_; - ++num_rows; - } - eos_ = (num_rows_fetched_ == all_rows.size()); - return Status::OK(); - } - - if (coord_.get() == nullptr) { - return Status("Client tried to fetch rows on a query that produces no results."); - } - - int32_t num_rows_fetched_from_cache = 0; - if (result_cache_max_size_ > 0 && result_cache_ != NULL) { - // Satisfy the fetch from the result cache if possible. - int cache_fetch_size = (max_rows <= 0) ? result_cache_->size() : max_rows; - num_rows_fetched_from_cache = - fetched_rows->AddRows(result_cache_.get(), num_rows_fetched_, cache_fetch_size); - num_rows_fetched_ += num_rows_fetched_from_cache; - if (num_rows_fetched_from_cache >= max_rows) return Status::OK(); - } - - query_state_ = beeswax::QueryState::FINISHED; // results will be ready after this call - - // Maximum number of rows to be fetched from the coord. - int32_t max_coord_rows = max_rows; - if (max_rows > 0) { - DCHECK_LE(num_rows_fetched_from_cache, max_rows); - max_coord_rows = max_rows - num_rows_fetched_from_cache; - } - { - SCOPED_TIMER(row_materialization_timer_); - size_t before = fetched_rows->size(); - // Temporarily release lock so calls to Cancel() are not blocked. fetch_rows_lock_ - // (already held) ensures that we do not call coord_->GetNext() multiple times - // concurrently. - // TODO: Simplify this. - lock_.unlock(); - Status status = coord_->GetNext(fetched_rows, max_coord_rows, &eos_); - lock_.lock(); - int num_fetched = fetched_rows->size() - before; - DCHECK(max_coord_rows <= 0 || num_fetched <= max_coord_rows) << Substitute( - "Fetched more rows ($0) than asked for ($1)", num_fetched, max_coord_rows); - num_rows_fetched_ += num_fetched; - - RETURN_IF_ERROR(status); - // Check if query status has changed during GetNext() call - if (!query_status_.ok()) { - eos_ = true; - return query_status_; - } - } - - // Update the result cache if necessary. - if (result_cache_max_size_ > 0 && result_cache_.get() != NULL) { - int rows_fetched_from_coord = fetched_rows->size() - num_rows_fetched_from_cache; - if (result_cache_->size() + rows_fetched_from_coord > result_cache_max_size_) { - // Set the cache to NULL to indicate that adding the rows fetched from the coord - // would exceed the bound of the cache, and therefore, RestartFetch() should fail. - ClearResultCache(); - return Status::OK(); - } - - // We guess the size of the cache after adding fetched_rows by looking at the size of - // fetched_rows itself, and using this estimate to confirm that the memtracker will - // allow us to use this much extra memory. In fact, this might be an overestimate, as - // the size of two result sets combined into one is not always the size of both result - // sets added together (the best example is the null bitset for each column: it might - // have only one entry in each result set, and as a result consume two bytes, but when - // the result sets are combined, only one byte is needed). Therefore after we add the - // new result set into the cache, we need to fix up the memory consumption to the - // actual levels to ensure we don't 'leak' bytes that we aren't using. - int64_t before = result_cache_->ByteSize(); - - // Upper-bound on memory required to add fetched_rows to the cache. - int64_t delta_bytes = - fetched_rows->ByteSize(num_rows_fetched_from_cache, fetched_rows->size()); - MemTracker* query_mem_tracker = coord_->query_mem_tracker(); - // Count the cached rows towards the mem limit. - if (UNLIKELY(!query_mem_tracker->TryConsume(delta_bytes))) { - string details("Failed to allocate memory for result cache."); - return query_mem_tracker->MemLimitExceeded(coord_->runtime_state(), details, - delta_bytes); - } - // Append all rows fetched from the coordinator into the cache. - int num_rows_added = result_cache_->AddRows( - fetched_rows, num_rows_fetched_from_cache, fetched_rows->size()); - - int64_t after = result_cache_->ByteSize(); - - // Confirm that this was not an underestimate of the memory required. - DCHECK_GE(before + delta_bytes, after) - << "Combined result sets consume more memory than both individually " - << Substitute("(before: $0, delta_bytes: $1, after: $2)", - before, delta_bytes, after); - - // Fix up the tracked values - if (before + delta_bytes > after) { - query_mem_tracker->Release(before + delta_bytes - after); - delta_bytes = after - before; - } - - // Update result set cache metrics. - ImpaladMetrics::RESULTSET_CACHE_TOTAL_NUM_ROWS->Increment(num_rows_added); - ImpaladMetrics::RESULTSET_CACHE_TOTAL_BYTES->Increment(delta_bytes); - } - - return Status::OK(); -} - -Status ImpalaServer::QueryExecState::Cancel(bool check_inflight, const Status* cause) { - if (check_inflight) { - // If the query is in 'inflight_queries' it means that the query has actually started - // executing. It is ok if the query is removed from 'inflight_queries' during - // cancellation, so we can release the session lock before starting the cancellation - // work. - lock_guard<mutex> session_lock(session_->lock); - if (session_->inflight_queries.find(query_id()) == session_->inflight_queries.end()) { - return Status("Query not yet running"); - } - } - - Coordinator* coord; - { - lock_guard<mutex> lock(lock_); - // If the query is completed or cancelled, no need to update state. - bool already_done = eos_ || query_state_ == beeswax::QueryState::EXCEPTION; - if (!already_done && cause != NULL) { - DCHECK(!cause->ok()); - UpdateQueryStatus(*cause); - query_events_->MarkEvent("Cancelled"); - DCHECK_EQ(query_state_, beeswax::QueryState::EXCEPTION); - } - // Get a copy of the coordinator pointer while holding 'lock_'. - coord = coord_.get(); - is_cancelled_ = true; - } // Release lock_ before doing cancellation work. - - // Cancel and close child queries before cancelling parent. 'lock_' should not be held - // because a) ChildQuery::Cancel() calls back into ImpalaServer and b) cancellation - // involves RPCs and can take quite some time. - child_query_executor_->Cancel(); - - // Cancel the parent query. 'lock_' should not be held because cancellation involves - // RPCs and can block for a long time. - if (coord != NULL) coord->Cancel(cause); - return Status::OK(); -} - -Status ImpalaServer::QueryExecState::UpdateCatalog() { - if (!exec_request().__isset.query_exec_request || - exec_request().query_exec_request.stmt_type != TStmtType::DML) { - return Status::OK(); - } - - query_events_->MarkEvent("DML data written"); - SCOPED_TIMER(ADD_TIMER(&server_profile_, "MetastoreUpdateTimer")); - - TQueryExecRequest query_exec_request = exec_request().query_exec_request; - if (query_exec_request.__isset.finalize_params) { - const TFinalizeParams& finalize_params = query_exec_request.finalize_params; - TUpdateCatalogRequest catalog_update; - catalog_update.__set_header(TCatalogServiceRequestHeader()); - catalog_update.header.__set_requesting_user(effective_user()); - if (!coord()->PrepareCatalogUpdate(&catalog_update)) { - VLOG_QUERY << "No partitions altered, not updating metastore (query id: " - << query_id() << ")"; - } else { - // TODO: We track partitions written to, not created, which means - // that we do more work than is necessary, because written-to - // partitions don't always require a metastore change. - VLOG_QUERY << "Updating metastore with " << catalog_update.created_partitions.size() - << " altered partitions (" - << join (catalog_update.created_partitions, ", ") << ")"; - - catalog_update.target_table = finalize_params.table_name; - catalog_update.db_name = finalize_params.table_db; - - Status cnxn_status; - const TNetworkAddress& address = - MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port); - CatalogServiceConnection client( - exec_env_->catalogd_client_cache(), address, &cnxn_status); - RETURN_IF_ERROR(cnxn_status); - - VLOG_QUERY << "Executing FinalizeDml() using CatalogService"; - TUpdateCatalogResponse resp; - RETURN_IF_ERROR( - client.DoRpc(&CatalogServiceClient::UpdateCatalog, catalog_update, &resp)); - - Status status(resp.result.status); - if (!status.ok()) LOG(ERROR) << "ERROR Finalizing DML: " << status.GetDetail(); - RETURN_IF_ERROR(status); - RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(resp.result, - exec_request_.query_options.sync_ddl)); - } - } - query_events_->MarkEvent("DML Metastore update finished"); - return Status::OK(); -} - -void ImpalaServer::QueryExecState::SetResultSet(const TDdlExecResponse* ddl_resp) { - if (ddl_resp != NULL && ddl_resp->__isset.result_set) { - result_metadata_ = ddl_resp->result_set.schema; - request_result_set_.reset(new vector<TResultRow>(ddl_resp->result_set.rows)); - } -} - -void ImpalaServer::QueryExecState::SetResultSet(const vector<string>& results) { - request_result_set_.reset(new vector<TResultRow>); - request_result_set_->resize(results.size()); - for (int i = 0; i < results.size(); ++i) { - (*request_result_set_.get())[i].__isset.colVals = true; - (*request_result_set_.get())[i].colVals.resize(1); - (*request_result_set_.get())[i].colVals[0].__set_string_val(results[i]); - } -} - -void ImpalaServer::QueryExecState::SetResultSet(const vector<string>& col1, - const vector<string>& col2) { - DCHECK_EQ(col1.size(), col2.size()); - - request_result_set_.reset(new vector<TResultRow>); - request_result_set_->resize(col1.size()); - for (int i = 0; i < col1.size(); ++i) { - (*request_result_set_.get())[i].__isset.colVals = true; - (*request_result_set_.get())[i].colVals.resize(2); - (*request_result_set_.get())[i].colVals[0].__set_string_val(col1[i]); - (*request_result_set_.get())[i].colVals[1].__set_string_val(col2[i]); - } -} - -void ImpalaServer::QueryExecState::SetResultSet(const vector<string>& col1, - const vector<string>& col2, const vector<string>& col3, const vector<string>& col4) { - DCHECK_EQ(col1.size(), col2.size()); - DCHECK_EQ(col1.size(), col3.size()); - DCHECK_EQ(col1.size(), col4.size()); - - request_result_set_.reset(new vector<TResultRow>); - request_result_set_->resize(col1.size()); - for (int i = 0; i < col1.size(); ++i) { - (*request_result_set_.get())[i].__isset.colVals = true; - (*request_result_set_.get())[i].colVals.resize(4); - (*request_result_set_.get())[i].colVals[0].__set_string_val(col1[i]); - (*request_result_set_.get())[i].colVals[1].__set_string_val(col2[i]); - (*request_result_set_.get())[i].colVals[2].__set_string_val(col3[i]); - (*request_result_set_.get())[i].colVals[3].__set_string_val(col4[i]); - } -} - -void ImpalaServer::QueryExecState::SetCreateTableAsSelectResultSet() { - DCHECK(ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT); - int64_t total_num_rows_inserted = 0; - // There will only be rows inserted in the case a new table was created as part of this - // operation. - if (catalog_op_executor_->ddl_exec_response()->new_table_created) { - DCHECK(coord_.get()); - for (const PartitionStatusMap::value_type& p: coord_->per_partition_status()) { - total_num_rows_inserted += p.second.num_modified_rows; - } - } - const string& summary_msg = Substitute("Inserted $0 row(s)", total_num_rows_inserted); - VLOG_QUERY << summary_msg; - vector<string> results(1, summary_msg); - SetResultSet(results); -} - -void ImpalaServer::QueryExecState::MarkInactive() { - client_wait_sw_.Start(); - lock_guard<mutex> l(expiration_data_lock_); - last_active_time_ms_ = UnixMillis(); - DCHECK(ref_count_ > 0) << "Invalid MarkInactive()"; - --ref_count_; -} - -void ImpalaServer::QueryExecState::MarkActive() { - client_wait_sw_.Stop(); - int64_t elapsed_time = client_wait_sw_.ElapsedTime(); - client_wait_timer_->Set(elapsed_time); - lock_guard<mutex> l(expiration_data_lock_); - last_active_time_ms_ = UnixMillis(); - ++ref_count_; -} - -Status ImpalaServer::QueryExecState::UpdateTableAndColumnStats( - const vector<ChildQuery*>& child_queries) { - DCHECK_GE(child_queries.size(), 1); - DCHECK_LE(child_queries.size(), 2); - catalog_op_executor_.reset( - new CatalogOpExecutor(exec_env_, frontend_, &server_profile_)); - - // If there was no column stats query, pass in empty thrift structures to - // ExecComputeStats(). Otherwise pass in the column stats result. - TTableSchema col_stats_schema; - TRowSet col_stats_data; - if (child_queries.size() > 1) { - col_stats_schema = child_queries[1]->result_schema(); - col_stats_data = child_queries[1]->result_data(); - } - - Status status = catalog_op_executor_->ExecComputeStats( - exec_request_.catalog_op_request.ddl_params.compute_stats_params, - child_queries[0]->result_schema(), - child_queries[0]->result_data(), - col_stats_schema, - col_stats_data); - { - lock_guard<mutex> l(lock_); - RETURN_IF_ERROR(UpdateQueryStatus(status)); - } - RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult( - *catalog_op_executor_->update_catalog_result(), - exec_request_.query_options.sync_ddl)); - - // Set the results to be reported to the client. - SetResultSet(catalog_op_executor_->ddl_exec_response()); - query_events_->MarkEvent("Metastore update finished"); - return Status::OK(); -} - -void ImpalaServer::QueryExecState::ClearResultCache() { - if (result_cache_ == NULL) return; - // Update result set cache metrics and mem limit accounting. - ImpaladMetrics::RESULTSET_CACHE_TOTAL_NUM_ROWS->Increment(-result_cache_->size()); - int64_t total_bytes = result_cache_->ByteSize(); - ImpaladMetrics::RESULTSET_CACHE_TOTAL_BYTES->Increment(-total_bytes); - if (coord_ != NULL) { - DCHECK(coord_->query_mem_tracker() != NULL); - coord_->query_mem_tracker()->Release(total_bytes); - } - result_cache_.reset(NULL); -} -}
