http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt index c886bf4..cfb0bf3 100644 --- a/be/src/service/CMakeLists.txt +++ b/be/src/service/CMakeLists.txt @@ -30,7 +30,7 @@ add_library(Service impala-hs2-server.cc impala-beeswax-server.cc impala-internal-service.cc - query-exec-state.cc + client-request-state.cc query-options.cc query-result-set.cc child-query.cc
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/child-query.cc ---------------------------------------------------------------------- diff --git a/be/src/service/child-query.cc b/be/src/service/child-query.cc index 5b68de4..f58aacc 100644 --- a/be/src/service/child-query.cc +++ b/be/src/service/child-query.cc @@ -17,7 +17,7 @@ #include "service/child-query.h" #include "service/impala-server.inline.h" -#include "service/query-exec-state.h" +#include "service/client-request-state.h" #include "service/query-options.h" #include "util/debug-util.h" @@ -34,7 +34,7 @@ const string ChildQuery::PARENT_QUERY_OPT = "impala.parent_query_id"; // any HS2 "RPC" into the impala server. It is important not to hold any locks (in // particular the parent query's lock_) while invoking HS2 functions to avoid deadlock. Status ChildQuery::ExecAndFetch() { - const TUniqueId& session_id = parent_exec_state_->session_id(); + const TUniqueId& session_id = parent_request_state_->session_id(); VLOG_QUERY << "Executing child query: " << query_ << " in session " << PrintId(session_id); @@ -45,8 +45,9 @@ Status ChildQuery::ExecAndFetch() { ImpalaServer::TUniqueIdToTHandleIdentifier(session_id, session_id, &exec_stmt_req.sessionHandle.sessionId); exec_stmt_req.__set_statement(query_); - SetQueryOptions(parent_exec_state_->exec_request().query_options, &exec_stmt_req); - exec_stmt_req.confOverlay[PARENT_QUERY_OPT] = PrintId(parent_exec_state_->query_id()); + SetQueryOptions(parent_request_state_->exec_request().query_options, &exec_stmt_req); + exec_stmt_req.confOverlay[PARENT_QUERY_OPT] = + PrintId(parent_request_state_->query_id()); // Starting executing of the child query and setting is_running are not made atomic // because holding a lock while calling into the parent_server_ may result in deadlock. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/child-query.h ---------------------------------------------------------------------- diff --git a/be/src/service/child-query.h b/be/src/service/child-query.h index 1c7a20e..36f6197 100644 --- a/be/src/service/child-query.h +++ b/be/src/service/child-query.h @@ -39,25 +39,25 @@ class ImpalaServer; // /// Parent queries are expected to call ExecAndWait() of a child query in a /// separate thread, and then join that thread to wait for child-query completion. -/// The parent QueryExecState is independent of the child query's QueryExecState, +/// The parent ClientRequestState is independent of the child query's ClientRequestState, /// with the exception that the child query selectively checks the parent's status /// for failure/cancellation detection. Child queries should never call into their -/// parent's QueryExecState to avoid deadlock. +/// parent's ClientRequestState to avoid deadlock. // /// TODO: Compute stats is the only stmt that requires child queries. Once the /// CatalogService performs background stats gathering the concept of child queries /// will likely become obsolete. Remove this class and all child-query related code. class ChildQuery { public: - ChildQuery(const std::string& query, ImpalaServer::QueryExecState* parent_exec_state, + ChildQuery(const std::string& query, ClientRequestState* parent_request_state, ImpalaServer* parent_server) : query_(query), - parent_exec_state_(parent_exec_state), + parent_request_state_(parent_request_state), parent_server_(parent_server), is_running_(false), is_cancelled_(false) { DCHECK(!query_.empty()); - DCHECK(parent_exec_state_ != NULL); + DCHECK(parent_request_state_ != NULL); DCHECK(parent_server_ != NULL); } @@ -65,7 +65,7 @@ class ChildQuery { /// (boost::mutex's operator= and copy c'tor are private) ChildQuery(const ChildQuery& other) : query_(other.query_), - parent_exec_state_(other.parent_exec_state_), + parent_request_state_(other.parent_request_state_), parent_server_(other.parent_server_), is_running_(other.is_running_), is_cancelled_(other.is_cancelled_) {} @@ -74,7 +74,7 @@ class ChildQuery { /// (boost::mutex's operator= and copy c'tor are private) ChildQuery& operator=(const ChildQuery& other) { query_ = other.query_; - parent_exec_state_ = other.parent_exec_state_; + parent_request_state_ = other.parent_request_state_; parent_server_ = other.parent_server_; is_running_ = other.is_running_; is_cancelled_ = other.is_cancelled_; @@ -85,11 +85,11 @@ class ChildQuery { Status ExecAndFetch(); /// Cancels and closes the given child query if it is running. Sets is_cancelled_. - /// Child queries can be cancelled by the parent query through QueryExecState::Cancel(). + /// Child queries can be cancelled by the parent query through ClientRequestState::Cancel(). /// Child queries should never cancel their parent to avoid deadlock (but the parent /// query may decide to cancel itself based on a non-OK status from a child query). - /// Note that child queries have a different QueryExecState than their parent query, - /// so cancellation of a child query does not call into the parent's QueryExecState. + /// Note that child queries have a different ClientRequestState than their parent query, + /// so cancellation of a child query does not call into the parent's ClientRequestState. void Cancel(); const apache::hive::service::cli::thrift::TTableSchema& result_schema() { @@ -119,7 +119,7 @@ class ChildQuery { /// Execution state of parent query. Used to synchronize and propagate parent /// cancellations/failures to this child query. Not owned. - ImpalaServer::QueryExecState* parent_exec_state_; + ClientRequestState* parent_request_state_; /// Parent Impala server used for executing this child query. Not owned. ImpalaServer* parent_server_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/client-request-state.cc ---------------------------------------------------------------------- diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc new file mode 100644 index 0000000..2f447c7 --- /dev/null +++ b/be/src/service/client-request-state.cc @@ -0,0 +1,1085 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "service/client-request-state.h" + +#include <limits> +#include <gutil/strings/substitute.h> + +#include "exprs/expr-context.h" +#include "exprs/expr.h" +#include "runtime/mem-tracker.h" +#include "runtime/row-batch.h" +#include "runtime/runtime-state.h" +#include "runtime/exec-env.h" +#include "scheduling/admission-controller.h" +#include "scheduling/scheduler.h" +#include "service/frontend.h" +#include "service/impala-server.h" +#include "service/query-options.h" +#include "service/query-result-set.h" +#include "util/debug-util.h" +#include "util/impalad-metrics.h" +#include "util/runtime-profile-counters.h" +#include "util/time.h" + +#include "gen-cpp/CatalogService.h" +#include "gen-cpp/CatalogService_types.h" + +#include <thrift/Thrift.h> + +#include "common/names.h" + +using boost::algorithm::join; +using namespace apache::hive::service::cli::thrift; +using namespace apache::thrift; +using namespace beeswax; +using namespace strings; + +DECLARE_int32(catalog_service_port); +DECLARE_string(catalog_service_host); +DECLARE_int64(max_result_cache_size); + +namespace impala { + +// Keys into the info string map of the runtime profile referring to specific +// items used by CM for monitoring purposes. +static const string PER_HOST_MEM_KEY = "Estimated Per-Host Mem"; +static const string PER_HOST_MEMORY_RESERVATION_KEY = "Per-Host Memory Reservation"; +static const string TABLES_MISSING_STATS_KEY = "Tables Missing Stats"; +static const string TABLES_WITH_CORRUPT_STATS_KEY = "Tables With Corrupt Table Stats"; +static const string TABLES_WITH_MISSING_DISK_IDS_KEY = "Tables With Missing Disk Ids"; + +ClientRequestState::ClientRequestState( + const TQueryCtx& query_ctx, ExecEnv* exec_env, Frontend* frontend, + ImpalaServer* server, shared_ptr<ImpalaServer::SessionState> session) + : query_ctx_(query_ctx), + last_active_time_ms_(numeric_limits<int64_t>::max()), + ref_count_(0L), + child_query_executor_(new ChildQueryExecutor), + exec_env_(exec_env), + is_block_on_wait_joining_(false), + session_(session), + schedule_(NULL), + coord_(NULL), + result_cache_max_size_(-1), + profile_(&profile_pool_, "Query"), // assign name w/ id after planning + server_profile_(&profile_pool_, "ImpalaServer"), + summary_profile_(&profile_pool_, "Summary"), + is_cancelled_(false), + eos_(false), + query_state_(beeswax::QueryState::CREATED), + current_batch_(NULL), + current_batch_row_(0), + num_rows_fetched_(0), + fetched_rows_(false), + frontend_(frontend), + parent_server_(server), + start_time_(TimestampValue::LocalTime()) { +#ifndef NDEBUG + profile_.AddInfoString("DEBUG MODE WARNING", "Query profile created while running a " + "DEBUG build of Impala. Use RELEASE builds to measure query performance."); +#endif + row_materialization_timer_ = ADD_TIMER(&server_profile_, "RowMaterializationTimer"); + client_wait_timer_ = ADD_TIMER(&server_profile_, "ClientFetchWaitTimer"); + query_events_ = summary_profile_.AddEventSequence("Query Timeline"); + query_events_->Start(); + profile_.AddChild(&summary_profile_); + + profile_.set_name("Query (id=" + PrintId(query_id()) + ")"); + summary_profile_.AddInfoString("Session ID", PrintId(session_id())); + summary_profile_.AddInfoString("Session Type", PrintTSessionType(session_type())); + if (session_type() == TSessionType::HIVESERVER2) { + summary_profile_.AddInfoString("HiveServer2 Protocol Version", + Substitute("V$0", 1 + session->hs2_version)); + } + summary_profile_.AddInfoString("Start Time", start_time().DebugString()); + summary_profile_.AddInfoString("End Time", ""); + summary_profile_.AddInfoString("Query Type", "N/A"); + summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_)); + summary_profile_.AddInfoString("Query Status", "OK"); + summary_profile_.AddInfoString("Impala Version", GetVersionString(/* compact */ true)); + summary_profile_.AddInfoString("User", effective_user()); + summary_profile_.AddInfoString("Connected User", connected_user()); + summary_profile_.AddInfoString("Delegated User", do_as_user()); + summary_profile_.AddInfoString("Network Address", + lexical_cast<string>(session_->network_address)); + summary_profile_.AddInfoString("Default Db", default_db()); + summary_profile_.AddInfoString("Sql Statement", query_ctx_.client_request.stmt); + summary_profile_.AddInfoString("Coordinator", + TNetworkAddressToString(exec_env->backend_address())); +} + +ClientRequestState::~ClientRequestState() { + DCHECK(wait_thread_.get() == NULL) << "BlockOnWait() needs to be called!"; +} + +Status ClientRequestState::SetResultCache(QueryResultSet* cache, + int64_t max_size) { + lock_guard<mutex> l(lock_); + DCHECK(result_cache_ == NULL); + result_cache_.reset(cache); + if (max_size > FLAGS_max_result_cache_size) { + return Status( + Substitute("Requested result-cache size of $0 exceeds Impala's maximum of $1.", + max_size, FLAGS_max_result_cache_size)); + } + result_cache_max_size_ = max_size; + return Status::OK(); +} + +Status ClientRequestState::Exec(TExecRequest* exec_request) { + MarkActive(); + exec_request_ = *exec_request; + + profile_.AddChild(&server_profile_); + summary_profile_.AddInfoString("Query Type", PrintTStmtType(stmt_type())); + summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_)); + summary_profile_.AddInfoString("Query Options (non default)", + DebugQueryOptions(query_ctx_.client_request.query_options)); + + switch (exec_request->stmt_type) { + case TStmtType::QUERY: + case TStmtType::DML: + DCHECK(exec_request_.__isset.query_exec_request); + return ExecQueryOrDmlRequest(exec_request_.query_exec_request); + case TStmtType::EXPLAIN: { + request_result_set_.reset(new vector<TResultRow>( + exec_request_.explain_result.results)); + return Status::OK(); + } + case TStmtType::DDL: { + DCHECK(exec_request_.__isset.catalog_op_request); + return ExecDdlRequest(); + } + case TStmtType::LOAD: { + DCHECK(exec_request_.__isset.load_data_request); + TLoadDataResp response; + RETURN_IF_ERROR( + frontend_->LoadData(exec_request_.load_data_request, &response)); + request_result_set_.reset(new vector<TResultRow>); + request_result_set_->push_back(response.load_summary); + + // Now refresh the table metadata. + TCatalogOpRequest reset_req; + reset_req.__set_op_type(TCatalogOpType::RESET_METADATA); + reset_req.__set_reset_metadata_params(TResetMetadataRequest()); + reset_req.reset_metadata_params.__set_header(TCatalogServiceRequestHeader()); + reset_req.reset_metadata_params.__set_is_refresh(true); + reset_req.reset_metadata_params.__set_table_name( + exec_request_.load_data_request.table_name); + catalog_op_executor_.reset( + new CatalogOpExecutor(exec_env_, frontend_, &server_profile_)); + RETURN_IF_ERROR(catalog_op_executor_->Exec(reset_req)); + RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult( + *catalog_op_executor_->update_catalog_result(), + exec_request_.query_options.sync_ddl)); + return Status::OK(); + } + case TStmtType::SET: { + DCHECK(exec_request_.__isset.set_query_option_request); + lock_guard<mutex> l(session_->lock); + if (exec_request_.set_query_option_request.__isset.key) { + // "SET key=value" updates the session query options. + DCHECK(exec_request_.set_query_option_request.__isset.value); + RETURN_IF_ERROR(SetQueryOption( + exec_request_.set_query_option_request.key, + exec_request_.set_query_option_request.value, + &session_->default_query_options, + &session_->set_query_options_mask)); + SetResultSet({}, {}); + } else { + // "SET" returns a table of all query options. + map<string, string> config; + TQueryOptionsToMap( + session_->default_query_options, &config); + vector<string> keys, values; + map<string, string>::const_iterator itr = config.begin(); + for (; itr != config.end(); ++itr) { + keys.push_back(itr->first); + values.push_back(itr->second); + } + SetResultSet(keys, values); + } + return Status::OK(); + } + default: + stringstream errmsg; + errmsg << "Unknown exec request stmt type: " << exec_request_.stmt_type; + return Status(errmsg.str()); + } +} + +Status ClientRequestState::ExecLocalCatalogOp( + const TCatalogOpRequest& catalog_op) { + switch (catalog_op.op_type) { + case TCatalogOpType::USE: { + lock_guard<mutex> l(session_->lock); + session_->database = exec_request_.catalog_op_request.use_db_params.db; + return Status::OK(); + } + case TCatalogOpType::SHOW_TABLES: { + const TShowTablesParams* params = &catalog_op.show_tables_params; + // A NULL pattern means match all tables. However, Thrift string types can't + // be NULL in C++, so we have to test if it's set rather than just blindly + // using the value. + const string* table_name = + params->__isset.show_pattern ? &(params->show_pattern) : NULL; + TGetTablesResult table_names; + RETURN_IF_ERROR(frontend_->GetTableNames(params->db, table_name, + &query_ctx_.session, &table_names)); + SetResultSet(table_names.tables); + return Status::OK(); + } + case TCatalogOpType::SHOW_DBS: { + const TShowDbsParams* params = &catalog_op.show_dbs_params; + TGetDbsResult dbs; + const string* db_pattern = + params->__isset.show_pattern ? (¶ms->show_pattern) : NULL; + RETURN_IF_ERROR( + frontend_->GetDbs(db_pattern, &query_ctx_.session, &dbs)); + vector<string> names, comments; + names.reserve(dbs.dbs.size()); + comments.reserve(dbs.dbs.size()); + for (const TDatabase& db: dbs.dbs) { + names.push_back(db.db_name); + comments.push_back(db.metastore_db.description); + } + SetResultSet(names, comments); + return Status::OK(); + } + case TCatalogOpType::SHOW_DATA_SRCS: { + const TShowDataSrcsParams* params = &catalog_op.show_data_srcs_params; + TGetDataSrcsResult result; + const string* pattern = + params->__isset.show_pattern ? (¶ms->show_pattern) : NULL; + RETURN_IF_ERROR( + frontend_->GetDataSrcMetadata(pattern, &result)); + SetResultSet(result.data_src_names, result.locations, result.class_names, + result.api_versions); + return Status::OK(); + } + case TCatalogOpType::SHOW_STATS: { + const TShowStatsParams& params = catalog_op.show_stats_params; + TResultSet response; + RETURN_IF_ERROR(frontend_->GetStats(params, &response)); + // Set the result set and its schema from the response. + request_result_set_.reset(new vector<TResultRow>(response.rows)); + result_metadata_ = response.schema; + return Status::OK(); + } + case TCatalogOpType::SHOW_FUNCTIONS: { + const TShowFunctionsParams* params = &catalog_op.show_fns_params; + TGetFunctionsResult functions; + const string* fn_pattern = + params->__isset.show_pattern ? (¶ms->show_pattern) : NULL; + RETURN_IF_ERROR(frontend_->GetFunctions( + params->category, params->db, fn_pattern, &query_ctx_.session, &functions)); + SetResultSet(functions.fn_ret_types, functions.fn_signatures, + functions.fn_binary_types, functions.fn_persistence); + return Status::OK(); + } + case TCatalogOpType::SHOW_ROLES: { + const TShowRolesParams& params = catalog_op.show_roles_params; + if (params.is_admin_op) { + // Verify the user has privileges to perform this operation by checking against + // the Sentry Service (via the Catalog Server). + catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_, + &server_profile_)); + + TSentryAdminCheckRequest req; + req.__set_header(TCatalogServiceRequestHeader()); + req.header.__set_requesting_user(effective_user()); + RETURN_IF_ERROR(catalog_op_executor_->SentryAdminCheck(req)); + } + + // If we have made it here, the user has privileges to execute this operation. + // Return the results. + TShowRolesResult result; + RETURN_IF_ERROR(frontend_->ShowRoles(params, &result)); + SetResultSet(result.role_names); + return Status::OK(); + } + case TCatalogOpType::SHOW_GRANT_ROLE: { + const TShowGrantRoleParams& params = catalog_op.show_grant_role_params; + if (params.is_admin_op) { + // Verify the user has privileges to perform this operation by checking against + // the Sentry Service (via the Catalog Server). + catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_, + &server_profile_)); + + TSentryAdminCheckRequest req; + req.__set_header(TCatalogServiceRequestHeader()); + req.header.__set_requesting_user(effective_user()); + RETURN_IF_ERROR(catalog_op_executor_->SentryAdminCheck(req)); + } + + TResultSet response; + RETURN_IF_ERROR(frontend_->GetRolePrivileges(params, &response)); + // Set the result set and its schema from the response. + request_result_set_.reset(new vector<TResultRow>(response.rows)); + result_metadata_ = response.schema; + return Status::OK(); + } + case TCatalogOpType::DESCRIBE_DB: { + TDescribeResult response; + RETURN_IF_ERROR(frontend_->DescribeDb(catalog_op.describe_db_params, + &response)); + // Set the result set + request_result_set_.reset(new vector<TResultRow>(response.results)); + return Status::OK(); + } + case TCatalogOpType::DESCRIBE_TABLE: { + TDescribeResult response; + RETURN_IF_ERROR(frontend_->DescribeTable(catalog_op.describe_table_params, + &response)); + // Set the result set + request_result_set_.reset(new vector<TResultRow>(response.results)); + return Status::OK(); + } + case TCatalogOpType::SHOW_CREATE_TABLE: { + string response; + RETURN_IF_ERROR(frontend_->ShowCreateTable(catalog_op.show_create_table_params, + &response)); + SetResultSet(vector<string>(1, response)); + return Status::OK(); + } + case TCatalogOpType::SHOW_CREATE_FUNCTION: { + string response; + RETURN_IF_ERROR(frontend_->ShowCreateFunction(catalog_op.show_create_function_params, + &response)); + SetResultSet(vector<string>(1, response)); + return Status::OK(); + } + case TCatalogOpType::SHOW_FILES: { + TResultSet response; + RETURN_IF_ERROR(frontend_->GetTableFiles(catalog_op.show_files_params, &response)); + // Set the result set and its schema from the response. + request_result_set_.reset(new vector<TResultRow>(response.rows)); + result_metadata_ = response.schema; + return Status::OK(); + } + default: { + stringstream ss; + ss << "Unexpected TCatalogOpType: " << catalog_op.op_type; + return Status(ss.str()); + } + } +} + +Status ClientRequestState::ExecQueryOrDmlRequest( + const TQueryExecRequest& query_exec_request) { + // we always need at least one plan fragment + DCHECK(query_exec_request.plan_exec_info.size() > 0); + + if (query_exec_request.__isset.query_plan) { + stringstream plan_ss; + // Add some delimiters to make it clearer where the plan + // begins and the profile ends + plan_ss << "\n----------------\n" + << query_exec_request.query_plan + << "----------------"; + summary_profile_.AddInfoString("Plan", plan_ss.str()); + } + // Add info strings consumed by CM: Estimated mem and tables missing stats. + if (query_exec_request.__isset.per_host_mem_estimate) { + stringstream ss; + ss << query_exec_request.per_host_mem_estimate; + summary_profile_.AddInfoString(PER_HOST_MEM_KEY, ss.str()); + } + if (query_exec_request.__isset.per_host_min_reservation) { + stringstream ss; + ss << query_exec_request.per_host_min_reservation; + summary_profile_.AddInfoString(PER_HOST_MEMORY_RESERVATION_KEY, ss.str()); + } + if (!query_exec_request.query_ctx.__isset.parent_query_id && + query_exec_request.query_ctx.__isset.tables_missing_stats && + !query_exec_request.query_ctx.tables_missing_stats.empty()) { + stringstream ss; + const vector<TTableName>& tbls = query_exec_request.query_ctx.tables_missing_stats; + for (int i = 0; i < tbls.size(); ++i) { + if (i != 0) ss << ","; + ss << tbls[i].db_name << "." << tbls[i].table_name; + } + summary_profile_.AddInfoString(TABLES_MISSING_STATS_KEY, ss.str()); + } + + if (!query_exec_request.query_ctx.__isset.parent_query_id && + query_exec_request.query_ctx.__isset.tables_with_corrupt_stats && + !query_exec_request.query_ctx.tables_with_corrupt_stats.empty()) { + stringstream ss; + const vector<TTableName>& tbls = + query_exec_request.query_ctx.tables_with_corrupt_stats; + for (int i = 0; i < tbls.size(); ++i) { + if (i != 0) ss << ","; + ss << tbls[i].db_name << "." << tbls[i].table_name; + } + summary_profile_.AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, ss.str()); + } + + if (query_exec_request.query_ctx.__isset.tables_missing_diskids && + !query_exec_request.query_ctx.tables_missing_diskids.empty()) { + stringstream ss; + const vector<TTableName>& tbls = + query_exec_request.query_ctx.tables_missing_diskids; + for (int i = 0; i < tbls.size(); ++i) { + if (i != 0) ss << ","; + ss << tbls[i].db_name << "." << tbls[i].table_name; + } + summary_profile_.AddInfoString(TABLES_WITH_MISSING_DISK_IDS_KEY, ss.str()); + } + + { + lock_guard<mutex> l(lock_); + // Don't start executing the query if Cancel() was called concurrently with Exec(). + if (is_cancelled_) return Status::CANCELLED; + // TODO: make schedule local to coordinator and move schedule_->Release() into + // Coordinator::TearDown() + schedule_.reset(new QuerySchedule(query_id(), query_exec_request, + exec_request_.query_options, &summary_profile_, query_events_)); + } + Status status = exec_env_->scheduler()->Schedule(schedule_.get()); + { + lock_guard<mutex> l(lock_); + RETURN_IF_ERROR(UpdateQueryStatus(status)); + } + + if (exec_env_->admission_controller() != nullptr) { + status = exec_env_->admission_controller()->AdmitQuery(schedule_.get()); + { + lock_guard<mutex> l(lock_); + RETURN_IF_ERROR(UpdateQueryStatus(status)); + } + } + + coord_.reset(new Coordinator(*schedule_, query_events_)); + status = coord_->Exec(); + { + lock_guard<mutex> l(lock_); + RETURN_IF_ERROR(UpdateQueryStatus(status)); + } + + profile_.AddChild(coord_->query_profile()); + return Status::OK(); +} + +Status ClientRequestState::ExecDdlRequest() { + string op_type = catalog_op_type() == TCatalogOpType::DDL ? + PrintTDdlType(ddl_type()) : PrintTCatalogOpType(catalog_op_type()); + summary_profile_.AddInfoString("DDL Type", op_type); + + if (catalog_op_type() != TCatalogOpType::DDL && + catalog_op_type() != TCatalogOpType::RESET_METADATA) { + Status status = ExecLocalCatalogOp(exec_request_.catalog_op_request); + lock_guard<mutex> l(lock_); + return UpdateQueryStatus(status); + } + + if (ddl_type() == TDdlType::COMPUTE_STATS) { + TComputeStatsParams& compute_stats_params = + exec_request_.catalog_op_request.ddl_params.compute_stats_params; + // Add child queries for computing table and column stats. + vector<ChildQuery> child_queries; + if (compute_stats_params.__isset.tbl_stats_query) { + child_queries.push_back( + ChildQuery(compute_stats_params.tbl_stats_query, this, parent_server_)); + } + if (compute_stats_params.__isset.col_stats_query) { + child_queries.push_back( + ChildQuery(compute_stats_params.col_stats_query, this, parent_server_)); + } + + if (child_queries.size() > 0) child_query_executor_->ExecAsync(move(child_queries)); + return Status::OK(); + } + + catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_, + &server_profile_)); + Status status = catalog_op_executor_->Exec(exec_request_.catalog_op_request); + { + lock_guard<mutex> l(lock_); + RETURN_IF_ERROR(UpdateQueryStatus(status)); + } + + // If this is a CTAS request, there will usually be more work to do + // after executing the CREATE TABLE statement (the INSERT portion of the operation). + // The exception is if the user specified IF NOT EXISTS and the table already + // existed, in which case we do not execute the INSERT. + if (catalog_op_type() == TCatalogOpType::DDL && + ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT && + !catalog_op_executor_->ddl_exec_response()->new_table_created) { + DCHECK(exec_request_.catalog_op_request. + ddl_params.create_table_params.if_not_exists); + return Status::OK(); + } + + // Add newly created table to catalog cache. + RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult( + *catalog_op_executor_->update_catalog_result(), + exec_request_.query_options.sync_ddl)); + + if (catalog_op_type() == TCatalogOpType::DDL && + ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT) { + // At this point, the remainder of the CTAS request executes + // like a normal DML request. As with other DML requests, it will + // wait for another catalog update if any partitions were altered as a result + // of the operation. + DCHECK(exec_request_.__isset.query_exec_request); + RETURN_IF_ERROR(ExecQueryOrDmlRequest(exec_request_.query_exec_request)); + } + + // Set the results to be reported to the client. + SetResultSet(catalog_op_executor_->ddl_exec_response()); + return Status::OK(); +} + +void ClientRequestState::Done() { + MarkActive(); + // Make sure we join on wait_thread_ before we finish (and especially before this object + // is destroyed). + BlockOnWait(); + + // Update latest observed Kudu timestamp stored in the session from the coordinator. + // Needs to take the session_ lock which must not be taken while holding lock_, so this + // must happen before taking lock_ below. + if (coord_.get() != NULL) { + // This is safe to access on coord_ after Wait() has been called. + uint64_t latest_kudu_ts = coord_->GetLatestKuduInsertTimestamp(); + if (latest_kudu_ts > 0) { + VLOG_RPC << "Updating session (id=" << session_id() << ") with latest " + << "observed Kudu timestamp: " << latest_kudu_ts; + lock_guard<mutex> session_lock(session_->lock); + session_->kudu_latest_observed_ts = std::max<uint64_t>( + session_->kudu_latest_observed_ts, latest_kudu_ts); + } + } + + unique_lock<mutex> l(lock_); + end_time_ = TimestampValue::LocalTime(); + summary_profile_.AddInfoString("End Time", end_time().DebugString()); + summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_)); + query_events_->MarkEvent("Unregister query"); + + // Update result set cache metrics, and update mem limit accounting before tearing + // down the coordinator. + ClearResultCache(); + + if (coord_.get() != NULL) { + // Release any reserved resources. + if (exec_env_->admission_controller() != nullptr) { + Status status = exec_env_->admission_controller()->ReleaseQuery(schedule_.get()); + if (!status.ok()) { + LOG(WARNING) << "Failed to release resources of query " << schedule_->query_id() + << " because of error: " << status.GetDetail(); + } + } + coord_->TearDown(); + } +} + +Status ClientRequestState::Exec(const TMetadataOpRequest& exec_request) { + TResultSet metadata_op_result; + // Like the other Exec(), fill out as much profile information as we're able to. + summary_profile_.AddInfoString("Query Type", PrintTStmtType(TStmtType::DDL)); + summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_)); + RETURN_IF_ERROR(frontend_->ExecHiveServer2MetadataOp(exec_request, + &metadata_op_result)); + result_metadata_ = metadata_op_result.schema; + request_result_set_.reset(new vector<TResultRow>(metadata_op_result.rows)); + return Status::OK(); +} + +void ClientRequestState::WaitAsync() { + wait_thread_.reset(new Thread( + "query-exec-state", "wait-thread", &ClientRequestState::Wait, this)); +} + +void ClientRequestState::BlockOnWait() { + unique_lock<mutex> l(lock_); + if (wait_thread_.get() == NULL) return; + if (!is_block_on_wait_joining_) { + // No other thread is already joining on wait_thread_, so this thread needs to do + // it. Other threads will need to block on the cond-var. + is_block_on_wait_joining_ = true; + l.unlock(); + wait_thread_->Join(); + l.lock(); + is_block_on_wait_joining_ = false; + wait_thread_.reset(); + block_on_wait_cv_.notify_all(); + } else { + // Another thread is already joining with wait_thread_. Block on the cond-var + // until the Join() executed in the other thread has completed. + do { + block_on_wait_cv_.wait(l); + } while (is_block_on_wait_joining_); + } +} + +void ClientRequestState::Wait() { + // block until results are ready + Status status = WaitInternal(); + { + lock_guard<mutex> l(lock_); + if (returns_result_set()) { + query_events()->MarkEvent("Rows available"); + } else { + query_events()->MarkEvent("Request finished"); + } + (void) UpdateQueryStatus(status); + } + if (status.ok()) { + UpdateNonErrorQueryState(beeswax::QueryState::FINISHED); + } +} + +Status ClientRequestState::WaitInternal() { + // Explain requests have already populated the result set. Nothing to do here. + if (exec_request_.stmt_type == TStmtType::EXPLAIN) { + MarkInactive(); + return Status::OK(); + } + + vector<ChildQuery*> child_queries; + Status child_queries_status = child_query_executor_->WaitForAll(&child_queries); + { + lock_guard<mutex> l(lock_); + RETURN_IF_ERROR(query_status_); + RETURN_IF_ERROR(UpdateQueryStatus(child_queries_status)); + } + if (!child_queries.empty()) query_events_->MarkEvent("Child queries finished"); + + if (coord_.get() != NULL) { + RETURN_IF_ERROR(coord_->Wait()); + RETURN_IF_ERROR(UpdateCatalog()); + } + + if (catalog_op_type() == TCatalogOpType::DDL && + ddl_type() == TDdlType::COMPUTE_STATS && child_queries.size() > 0) { + RETURN_IF_ERROR(UpdateTableAndColumnStats(child_queries)); + } + + if (!returns_result_set()) { + // Queries that do not return a result are finished at this point. This includes + // DML operations and a subset of the DDL operations. + eos_ = true; + } else if (catalog_op_type() == TCatalogOpType::DDL && + ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT) { + SetCreateTableAsSelectResultSet(); + } + // Rows are available now (for SELECT statement), so start the 'wait' timer that tracks + // how long Impala waits for the client to fetch rows. For other statements, track the + // time until a Close() is received. + MarkInactive(); + return Status::OK(); +} + +Status ClientRequestState::FetchRows(const int32_t max_rows, + QueryResultSet* fetched_rows) { + // Pause the wait timer, since the client has instructed us to do work on its behalf. + MarkActive(); + + // ImpalaServer::FetchInternal has already taken our lock_ + (void) UpdateQueryStatus(FetchRowsInternal(max_rows, fetched_rows)); + + MarkInactive(); + return query_status_; +} + +Status ClientRequestState::RestartFetch() { + // No result caching for this query. Restart is invalid. + if (result_cache_max_size_ <= 0) { + return Status(ErrorMsg(TErrorCode::RECOVERABLE_ERROR, + "Restarting of fetch requires enabling of query result caching.")); + } + // The cache overflowed on a previous fetch. + if (result_cache_.get() == NULL) { + stringstream ss; + ss << "The query result cache exceeded its limit of " << result_cache_max_size_ + << " rows. Restarting the fetch is not possible."; + return Status(ErrorMsg(TErrorCode::RECOVERABLE_ERROR, ss.str())); + } + // Reset fetch state to start over. + eos_ = false; + num_rows_fetched_ = 0; + return Status::OK(); +} + +void ClientRequestState::UpdateNonErrorQueryState( + beeswax::QueryState::type query_state) { + lock_guard<mutex> l(lock_); + DCHECK(query_state != beeswax::QueryState::EXCEPTION); + if (query_state_ < query_state) query_state_ = query_state; +} + +Status ClientRequestState::UpdateQueryStatus(const Status& status) { + // Preserve the first non-ok status + if (!status.ok() && query_status_.ok()) { + query_state_ = beeswax::QueryState::EXCEPTION; + query_status_ = status; + summary_profile_.AddInfoString("Query Status", query_status_.GetDetail()); + } + + return status; +} + +Status ClientRequestState::FetchRowsInternal(const int32_t max_rows, + QueryResultSet* fetched_rows) { + DCHECK(query_state_ != beeswax::QueryState::EXCEPTION); + + if (eos_) return Status::OK(); + + if (request_result_set_ != NULL) { + query_state_ = beeswax::QueryState::FINISHED; + int num_rows = 0; + const vector<TResultRow>& all_rows = (*(request_result_set_.get())); + // max_rows <= 0 means no limit + while ((num_rows < max_rows || max_rows <= 0) + && num_rows_fetched_ < all_rows.size()) { + fetched_rows->AddOneRow(all_rows[num_rows_fetched_]); + ++num_rows_fetched_; + ++num_rows; + } + eos_ = (num_rows_fetched_ == all_rows.size()); + return Status::OK(); + } + + if (coord_.get() == nullptr) { + return Status("Client tried to fetch rows on a query that produces no results."); + } + + int32_t num_rows_fetched_from_cache = 0; + if (result_cache_max_size_ > 0 && result_cache_ != NULL) { + // Satisfy the fetch from the result cache if possible. + int cache_fetch_size = (max_rows <= 0) ? result_cache_->size() : max_rows; + num_rows_fetched_from_cache = + fetched_rows->AddRows(result_cache_.get(), num_rows_fetched_, cache_fetch_size); + num_rows_fetched_ += num_rows_fetched_from_cache; + if (num_rows_fetched_from_cache >= max_rows) return Status::OK(); + } + + query_state_ = beeswax::QueryState::FINISHED; // results will be ready after this call + + // Maximum number of rows to be fetched from the coord. + int32_t max_coord_rows = max_rows; + if (max_rows > 0) { + DCHECK_LE(num_rows_fetched_from_cache, max_rows); + max_coord_rows = max_rows - num_rows_fetched_from_cache; + } + { + SCOPED_TIMER(row_materialization_timer_); + size_t before = fetched_rows->size(); + // Temporarily release lock so calls to Cancel() are not blocked. fetch_rows_lock_ + // (already held) ensures that we do not call coord_->GetNext() multiple times + // concurrently. + // TODO: Simplify this. + lock_.unlock(); + Status status = coord_->GetNext(fetched_rows, max_coord_rows, &eos_); + lock_.lock(); + int num_fetched = fetched_rows->size() - before; + DCHECK(max_coord_rows <= 0 || num_fetched <= max_coord_rows) << Substitute( + "Fetched more rows ($0) than asked for ($1)", num_fetched, max_coord_rows); + num_rows_fetched_ += num_fetched; + + RETURN_IF_ERROR(status); + // Check if query status has changed during GetNext() call + if (!query_status_.ok()) { + eos_ = true; + return query_status_; + } + } + + // Update the result cache if necessary. + if (result_cache_max_size_ > 0 && result_cache_.get() != NULL) { + int rows_fetched_from_coord = fetched_rows->size() - num_rows_fetched_from_cache; + if (result_cache_->size() + rows_fetched_from_coord > result_cache_max_size_) { + // Set the cache to NULL to indicate that adding the rows fetched from the coord + // would exceed the bound of the cache, and therefore, RestartFetch() should fail. + ClearResultCache(); + return Status::OK(); + } + + // We guess the size of the cache after adding fetched_rows by looking at the size of + // fetched_rows itself, and using this estimate to confirm that the memtracker will + // allow us to use this much extra memory. In fact, this might be an overestimate, as + // the size of two result sets combined into one is not always the size of both result + // sets added together (the best example is the null bitset for each column: it might + // have only one entry in each result set, and as a result consume two bytes, but when + // the result sets are combined, only one byte is needed). Therefore after we add the + // new result set into the cache, we need to fix up the memory consumption to the + // actual levels to ensure we don't 'leak' bytes that we aren't using. + int64_t before = result_cache_->ByteSize(); + + // Upper-bound on memory required to add fetched_rows to the cache. + int64_t delta_bytes = + fetched_rows->ByteSize(num_rows_fetched_from_cache, fetched_rows->size()); + MemTracker* query_mem_tracker = coord_->query_mem_tracker(); + // Count the cached rows towards the mem limit. + if (UNLIKELY(!query_mem_tracker->TryConsume(delta_bytes))) { + string details("Failed to allocate memory for result cache."); + return query_mem_tracker->MemLimitExceeded(coord_->runtime_state(), details, + delta_bytes); + } + // Append all rows fetched from the coordinator into the cache. + int num_rows_added = result_cache_->AddRows( + fetched_rows, num_rows_fetched_from_cache, fetched_rows->size()); + + int64_t after = result_cache_->ByteSize(); + + // Confirm that this was not an underestimate of the memory required. + DCHECK_GE(before + delta_bytes, after) + << "Combined result sets consume more memory than both individually " + << Substitute("(before: $0, delta_bytes: $1, after: $2)", + before, delta_bytes, after); + + // Fix up the tracked values + if (before + delta_bytes > after) { + query_mem_tracker->Release(before + delta_bytes - after); + delta_bytes = after - before; + } + + // Update result set cache metrics. + ImpaladMetrics::RESULTSET_CACHE_TOTAL_NUM_ROWS->Increment(num_rows_added); + ImpaladMetrics::RESULTSET_CACHE_TOTAL_BYTES->Increment(delta_bytes); + } + + return Status::OK(); +} + +Status ClientRequestState::Cancel(bool check_inflight, const Status* cause) { + if (check_inflight) { + // If the query is in 'inflight_queries' it means that the query has actually started + // executing. It is ok if the query is removed from 'inflight_queries' during + // cancellation, so we can release the session lock before starting the cancellation + // work. + lock_guard<mutex> session_lock(session_->lock); + if (session_->inflight_queries.find(query_id()) == session_->inflight_queries.end()) { + return Status("Query not yet running"); + } + } + + Coordinator* coord; + { + lock_guard<mutex> lock(lock_); + // If the query is completed or cancelled, no need to update state. + bool already_done = eos_ || query_state_ == beeswax::QueryState::EXCEPTION; + if (!already_done && cause != NULL) { + DCHECK(!cause->ok()); + (void) UpdateQueryStatus(*cause); + query_events_->MarkEvent("Cancelled"); + DCHECK_EQ(query_state_, beeswax::QueryState::EXCEPTION); + } + // Get a copy of the coordinator pointer while holding 'lock_'. + coord = coord_.get(); + is_cancelled_ = true; + } // Release lock_ before doing cancellation work. + + // Cancel and close child queries before cancelling parent. 'lock_' should not be held + // because a) ChildQuery::Cancel() calls back into ImpalaServer and b) cancellation + // involves RPCs and can take quite some time. + child_query_executor_->Cancel(); + + // Cancel the parent query. 'lock_' should not be held because cancellation involves + // RPCs and can block for a long time. + if (coord != NULL) coord->Cancel(cause); + return Status::OK(); +} + +Status ClientRequestState::UpdateCatalog() { + if (!exec_request().__isset.query_exec_request || + exec_request().query_exec_request.stmt_type != TStmtType::DML) { + return Status::OK(); + } + + query_events_->MarkEvent("DML data written"); + SCOPED_TIMER(ADD_TIMER(&server_profile_, "MetastoreUpdateTimer")); + + TQueryExecRequest query_exec_request = exec_request().query_exec_request; + if (query_exec_request.__isset.finalize_params) { + const TFinalizeParams& finalize_params = query_exec_request.finalize_params; + TUpdateCatalogRequest catalog_update; + catalog_update.__set_header(TCatalogServiceRequestHeader()); + catalog_update.header.__set_requesting_user(effective_user()); + if (!coord()->PrepareCatalogUpdate(&catalog_update)) { + VLOG_QUERY << "No partitions altered, not updating metastore (query id: " + << query_id() << ")"; + } else { + // TODO: We track partitions written to, not created, which means + // that we do more work than is necessary, because written-to + // partitions don't always require a metastore change. + VLOG_QUERY << "Updating metastore with " << catalog_update.created_partitions.size() + << " altered partitions (" + << join (catalog_update.created_partitions, ", ") << ")"; + + catalog_update.target_table = finalize_params.table_name; + catalog_update.db_name = finalize_params.table_db; + + Status cnxn_status; + const TNetworkAddress& address = + MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port); + CatalogServiceConnection client( + exec_env_->catalogd_client_cache(), address, &cnxn_status); + RETURN_IF_ERROR(cnxn_status); + + VLOG_QUERY << "Executing FinalizeDml() using CatalogService"; + TUpdateCatalogResponse resp; + RETURN_IF_ERROR( + client.DoRpc(&CatalogServiceClient::UpdateCatalog, catalog_update, &resp)); + + Status status(resp.result.status); + if (!status.ok()) LOG(ERROR) << "ERROR Finalizing DML: " << status.GetDetail(); + RETURN_IF_ERROR(status); + RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(resp.result, + exec_request_.query_options.sync_ddl)); + } + } + query_events_->MarkEvent("DML Metastore update finished"); + return Status::OK(); +} + +void ClientRequestState::SetResultSet(const TDdlExecResponse* ddl_resp) { + if (ddl_resp != NULL && ddl_resp->__isset.result_set) { + result_metadata_ = ddl_resp->result_set.schema; + request_result_set_.reset(new vector<TResultRow>(ddl_resp->result_set.rows)); + } +} + +void ClientRequestState::SetResultSet(const vector<string>& results) { + request_result_set_.reset(new vector<TResultRow>); + request_result_set_->resize(results.size()); + for (int i = 0; i < results.size(); ++i) { + (*request_result_set_.get())[i].__isset.colVals = true; + (*request_result_set_.get())[i].colVals.resize(1); + (*request_result_set_.get())[i].colVals[0].__set_string_val(results[i]); + } +} + +void ClientRequestState::SetResultSet(const vector<string>& col1, + const vector<string>& col2) { + DCHECK_EQ(col1.size(), col2.size()); + + request_result_set_.reset(new vector<TResultRow>); + request_result_set_->resize(col1.size()); + for (int i = 0; i < col1.size(); ++i) { + (*request_result_set_.get())[i].__isset.colVals = true; + (*request_result_set_.get())[i].colVals.resize(2); + (*request_result_set_.get())[i].colVals[0].__set_string_val(col1[i]); + (*request_result_set_.get())[i].colVals[1].__set_string_val(col2[i]); + } +} + +void ClientRequestState::SetResultSet(const vector<string>& col1, + const vector<string>& col2, const vector<string>& col3, const vector<string>& col4) { + DCHECK_EQ(col1.size(), col2.size()); + DCHECK_EQ(col1.size(), col3.size()); + DCHECK_EQ(col1.size(), col4.size()); + + request_result_set_.reset(new vector<TResultRow>); + request_result_set_->resize(col1.size()); + for (int i = 0; i < col1.size(); ++i) { + (*request_result_set_.get())[i].__isset.colVals = true; + (*request_result_set_.get())[i].colVals.resize(4); + (*request_result_set_.get())[i].colVals[0].__set_string_val(col1[i]); + (*request_result_set_.get())[i].colVals[1].__set_string_val(col2[i]); + (*request_result_set_.get())[i].colVals[2].__set_string_val(col3[i]); + (*request_result_set_.get())[i].colVals[3].__set_string_val(col4[i]); + } +} + +void ClientRequestState::SetCreateTableAsSelectResultSet() { + DCHECK(ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT); + int64_t total_num_rows_inserted = 0; + // There will only be rows inserted in the case a new table was created as part of this + // operation. + if (catalog_op_executor_->ddl_exec_response()->new_table_created) { + DCHECK(coord_.get()); + for (const PartitionStatusMap::value_type& p: coord_->per_partition_status()) { + total_num_rows_inserted += p.second.num_modified_rows; + } + } + const string& summary_msg = Substitute("Inserted $0 row(s)", total_num_rows_inserted); + VLOG_QUERY << summary_msg; + vector<string> results(1, summary_msg); + SetResultSet(results); +} + +void ClientRequestState::MarkInactive() { + client_wait_sw_.Start(); + lock_guard<mutex> l(expiration_data_lock_); + last_active_time_ms_ = UnixMillis(); + DCHECK(ref_count_ > 0) << "Invalid MarkInactive()"; + --ref_count_; +} + +void ClientRequestState::MarkActive() { + client_wait_sw_.Stop(); + int64_t elapsed_time = client_wait_sw_.ElapsedTime(); + client_wait_timer_->Set(elapsed_time); + lock_guard<mutex> l(expiration_data_lock_); + last_active_time_ms_ = UnixMillis(); + ++ref_count_; +} + +Status ClientRequestState::UpdateTableAndColumnStats( + const vector<ChildQuery*>& child_queries) { + DCHECK_GE(child_queries.size(), 1); + DCHECK_LE(child_queries.size(), 2); + catalog_op_executor_.reset( + new CatalogOpExecutor(exec_env_, frontend_, &server_profile_)); + + // If there was no column stats query, pass in empty thrift structures to + // ExecComputeStats(). Otherwise pass in the column stats result. + TTableSchema col_stats_schema; + TRowSet col_stats_data; + if (child_queries.size() > 1) { + col_stats_schema = child_queries[1]->result_schema(); + col_stats_data = child_queries[1]->result_data(); + } + + Status status = catalog_op_executor_->ExecComputeStats( + exec_request_.catalog_op_request.ddl_params.compute_stats_params, + child_queries[0]->result_schema(), + child_queries[0]->result_data(), + col_stats_schema, + col_stats_data); + { + lock_guard<mutex> l(lock_); + RETURN_IF_ERROR(UpdateQueryStatus(status)); + } + RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult( + *catalog_op_executor_->update_catalog_result(), + exec_request_.query_options.sync_ddl)); + + // Set the results to be reported to the client. + SetResultSet(catalog_op_executor_->ddl_exec_response()); + query_events_->MarkEvent("Metastore update finished"); + return Status::OK(); +} + +void ClientRequestState::ClearResultCache() { + if (result_cache_ == NULL) return; + // Update result set cache metrics and mem limit accounting. + ImpaladMetrics::RESULTSET_CACHE_TOTAL_NUM_ROWS->Increment(-result_cache_->size()); + int64_t total_bytes = result_cache_->ByteSize(); + ImpaladMetrics::RESULTSET_CACHE_TOTAL_BYTES->Increment(-total_bytes); + if (coord_ != NULL) { + DCHECK(coord_->query_mem_tracker() != NULL); + coord_->query_mem_tracker()->Release(total_bytes); + } + result_cache_.reset(NULL); +} +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/client-request-state.h ---------------------------------------------------------------------- diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h new file mode 100644 index 0000000..0e18957 --- /dev/null +++ b/be/src/service/client-request-state.h @@ -0,0 +1,413 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_SERVICE_CLIENT_REQUEST_STATE_H +#define IMPALA_SERVICE_CLIENT_REQUEST_STATE_H + +#include "common/status.h" +#include "exec/catalog-op-executor.h" +#include "runtime/timestamp-value.h" +#include "scheduling/query-schedule.h" +#include "service/child-query.h" +#include "service/impala-server.h" +#include "util/auth-util.h" +#include "util/runtime-profile.h" +#include "gen-cpp/Frontend_types.h" +#include "gen-cpp/Frontend_types.h" + +#include <boost/thread.hpp> +#include <boost/unordered_set.hpp> +#include <vector> + +namespace impala { + +class ExecEnv; +class Coordinator; +class RuntimeState; +class RowBatch; +class Expr; +class TupleRow; +class Frontend; +class ClientRequestStateCleaner; + +/// Execution state of the client-facing side a query. This captures everything +/// necessary to convert row batches received by the coordinator into results +/// we can return to the client. It also captures all state required for +/// servicing query-related requests from the client. +/// Thread safety: this class is generally not thread-safe, callers need to +/// synchronize access explicitly via lock(). See the ImpalaServer class comment for +/// the required lock acquisition order. +/// +/// TODO: Compute stats is the only stmt that requires child queries. Once the +/// CatalogService performs background stats gathering the concept of child queries +/// will likely become obsolete. Remove all child-query related code from this class. +class ClientRequestState { + public: + ClientRequestState(const TQueryCtx& query_ctx, ExecEnv* exec_env, Frontend* frontend, + ImpalaServer* server, std::shared_ptr<ImpalaServer::SessionState> session); + + ~ClientRequestState(); + + /// Initiates execution of a exec_request. + /// Non-blocking. + /// Must *not* be called with lock_ held. + Status Exec(TExecRequest* exec_request) WARN_UNUSED_RESULT; + + /// Execute a HiveServer2 metadata operation + /// TODO: This is likely a superset of GetTableNames/GetDbs. Coalesce these different + /// code paths. + Status Exec(const TMetadataOpRequest& exec_request) WARN_UNUSED_RESULT; + + /// Call this to ensure that rows are ready when calling FetchRows(). Updates the + /// query_status_, and advances query_state_ to FINISHED or EXCEPTION. Must be preceded + /// by call to Exec(). Waits for all child queries to complete. Takes lock_. + void Wait(); + + /// Calls Wait() asynchronously in a thread and returns immediately. + void WaitAsync(); + + /// BlockOnWait() may be called after WaitAsync() has been called in order to wait + /// for the asynchronous thread (wait_thread_) to complete. It is safe to call this + /// from multiple threads (all threads will block until wait_thread_ has completed) + /// and multiple times (non-blocking once wait_thread_ has completed). Do not call + /// while holding lock_. + void BlockOnWait(); + + /// Return at most max_rows from the current batch. If the entire current batch has + /// been returned, fetch another batch first. + /// Caller needs to hold fetch_rows_lock_ and lock_. + /// Caller should verify that EOS has not be reached before calling. + /// Must be preceeded by call to Wait() (or WaitAsync()/BlockOnWait()). + /// Also updates query_state_/status_ in case of error. + Status FetchRows(const int32_t max_rows, QueryResultSet* fetched_rows) + WARN_UNUSED_RESULT; + + /// Resets the state of this query such that the next fetch() returns results from the + /// beginning of the query result set (by using the using result_cache_). + /// It is valid to call this function for any type of statement that returns a result + /// set, including queries, show stmts, compute stats, etc. + /// Returns a recoverable error status if the restart is not possible, ok() otherwise. + /// The error is recoverable to allow clients to resume fetching. + /// The caller must hold fetch_rows_lock_ and lock_. + Status RestartFetch() WARN_UNUSED_RESULT; + + /// Update query state if the requested state isn't already obsolete. This is only for + /// non-error states - if the query encounters an error the query status needs to be set + /// with information about the error so UpdateQueryStatus must be used instead. + /// Takes lock_. + void UpdateNonErrorQueryState(beeswax::QueryState::type query_state); + + /// Update the query status and the "Query Status" summary profile string. + /// If current status is already != ok, no update is made (we preserve the first error) + /// If called with a non-ok argument, the expectation is that the query will be aborted + /// quickly. + /// Returns the status argument (so we can write + /// RETURN_IF_ERROR(UpdateQueryStatus(SomeOperation())). + /// Does not take lock_, but requires it: caller must ensure lock_ + /// is taken before calling UpdateQueryStatus + Status UpdateQueryStatus(const Status& status) WARN_UNUSED_RESULT; + + /// Cancels the child queries and the coordinator with the given cause. + /// If cause is NULL, assume this was deliberately cancelled by the user. + /// Otherwise, sets state to EXCEPTION. + /// Does nothing if the query has reached EOS or already cancelled. + /// + /// Only returns an error if 'check_inflight' is true and the query is not yet + /// in-flight. Otherwise, proceed and return Status::OK() even if the query isn't + /// in-flight (for cleaning up after an error on the query issuing path). + Status Cancel(bool check_inflight, const Status* cause) WARN_UNUSED_RESULT; + + /// This is called when the query is done (finished, cancelled, or failed). + /// Takes lock_: callers must not hold lock() before calling. + void Done(); + + /// Sets the API-specific (Beeswax, HS2) result cache and its size bound. + /// The given cache is owned by this query exec state, even if an error is returned. + /// Returns a non-ok status if max_size exceeds the per-impalad allowed maximum. + Status SetResultCache(QueryResultSet* cache, int64_t max_size) WARN_UNUSED_RESULT; + + ImpalaServer::SessionState* session() const { return session_.get(); } + + /// Queries are run and authorized on behalf of the effective_user. + const std::string& effective_user() const { + return GetEffectiveUser(query_ctx_.session); + } + const std::string& connected_user() const { return query_ctx_.session.connected_user; } + const std::string& do_as_user() const { return query_ctx_.session.delegated_user; } + TSessionType::type session_type() const { return query_ctx_.session.session_type; } + const TUniqueId& session_id() const { return query_ctx_.session.session_id; } + const std::string& default_db() const { return query_ctx_.session.database; } + bool eos() const { return eos_; } + Coordinator* coord() const { return coord_.get(); } + QuerySchedule* schedule() { return schedule_.get(); } + + /// Resource pool associated with this query, or an empty string if the schedule has not + /// been created and had the pool set yet, or this StmtType doesn't go through admission + /// control. + std::string request_pool() const { + return schedule_ == nullptr ? "" : schedule_->request_pool(); + } + int num_rows_fetched() const { return num_rows_fetched_; } + void set_fetched_rows() { fetched_rows_ = true; } + bool fetched_rows() const { return fetched_rows_; } + bool returns_result_set() { return !result_metadata_.columns.empty(); } + const TResultSetMetadata* result_metadata() { return &result_metadata_; } + const TUniqueId& query_id() const { return query_ctx_.query_id; } + const TExecRequest& exec_request() const { return exec_request_; } + TStmtType::type stmt_type() const { return exec_request_.stmt_type; } + TCatalogOpType::type catalog_op_type() const { + return exec_request_.catalog_op_request.op_type; + } + TDdlType::type ddl_type() const { + return exec_request_.catalog_op_request.ddl_params.ddl_type; + } + boost::mutex* lock() { return &lock_; } + boost::mutex* fetch_rows_lock() { return &fetch_rows_lock_; } + beeswax::QueryState::type query_state() const { return query_state_; } + const Status& query_status() const { return query_status_; } + void set_result_metadata(const TResultSetMetadata& md) { result_metadata_ = md; } + const RuntimeProfile& profile() const { return profile_; } + const RuntimeProfile& summary_profile() const { return summary_profile_; } + const TimestampValue& start_time() const { return start_time_; } + const TimestampValue& end_time() const { return end_time_; } + const std::string& sql_stmt() const { return query_ctx_.client_request.stmt; } + const TQueryOptions& query_options() const { + return query_ctx_.client_request.query_options; + } + /// Returns 0:0 if this is a root query + TUniqueId parent_query_id() const { return query_ctx_.parent_query_id; } + + const std::vector<std::string>& GetAnalysisWarnings() const { + return exec_request_.analysis_warnings; + } + + inline int64_t last_active_ms() const { + boost::lock_guard<boost::mutex> l(expiration_data_lock_); + return last_active_time_ms_; + } + + /// Returns true if Impala is actively processing this query. + inline bool is_active() const { + boost::lock_guard<boost::mutex> l(expiration_data_lock_); + return ref_count_ > 0; + } + + RuntimeProfile::EventSequence* query_events() const { return query_events_; } + RuntimeProfile* summary_profile() { return &summary_profile_; } + + private: + const TQueryCtx query_ctx_; + + /// Ensures single-threaded execution of FetchRows(). Callers of FetchRows() are + /// responsible for acquiring this lock. To avoid deadlocks, callers must not hold lock_ + /// while acquiring this lock (since FetchRows() will release and re-acquire lock_ during + /// its execution). + /// See "Locking" in the class comment for lock acquisition order. + boost::mutex fetch_rows_lock_; + + /// Protects last_active_time_ms_ and ref_count_. Only held during short function calls + /// - no other locks should be acquired while holding this lock. + mutable boost::mutex expiration_data_lock_; + + /// Stores the last time that the query was actively doing work, in Unix milliseconds. + int64_t last_active_time_ms_; + + /// ref_count_ > 0 if Impala is currently performing work on this query's behalf. Every + /// time a client instructs Impala to do work on behalf of this query, the ref count is + /// increased, and decreased once that work is completed. + uint32_t ref_count_; + + /// Executor for any child queries (e.g. compute stats subqueries). Always non-NULL. + const boost::scoped_ptr<ChildQueryExecutor> child_query_executor_; + + /// Protects all following fields. Acquirers should be careful not to hold it for too + /// long, e.g. during RPCs because this lock is required to make progress on various + /// ImpalaServer requests. If held for too long it can block progress of client + /// requests for this query, e.g. query status and cancellation. Furthermore, until + /// IMPALA-3882 is fixed, it can indirectly block progress on all other queries. + /// See "Locking" in the class comment for lock acquisition order. + boost::mutex lock_; + + /// TODO: remove and use ExecEnv::GetInstance() instead + ExecEnv* exec_env_; + + /// Thread for asynchronously running Wait(). + boost::scoped_ptr<Thread> wait_thread_; + + /// Condition variable to make BlockOnWait() thread-safe. One thread joins + /// wait_thread_, and all other threads block on this cv. Used with lock_. + boost::condition_variable block_on_wait_cv_; + + /// Used in conjunction with block_on_wait_cv_ to make BlockOnWait() thread-safe. + bool is_block_on_wait_joining_; + + /// Session that this query is from + std::shared_ptr<ImpalaServer::SessionState> session_; + + /// Resource assignment determined by scheduler. Owned by obj_pool_. + boost::scoped_ptr<QuerySchedule> schedule_; + + /// Not set for ddl queries. + boost::scoped_ptr<Coordinator> coord_; + + /// Runs statements that query or modify the catalog via the CatalogService. + boost::scoped_ptr<CatalogOpExecutor> catalog_op_executor_; + + /// Result set used for requests that return results and are not QUERY + /// statements. For example, EXPLAIN, LOAD, and SHOW use this. + boost::scoped_ptr<std::vector<TResultRow>> request_result_set_; + + /// Cache of the first result_cache_max_size_ query results to allow clients to restart + /// fetching from the beginning of the result set. This cache is appended to in + /// FetchInternal(), and set to NULL if its bound is exceeded. If the bound is exceeded, + /// then clients cannot restart fetching because some results have been lost since the + /// last fetch. Only set if result_cache_max_size_ > 0. + boost::scoped_ptr<QueryResultSet> result_cache_; + + /// Max size of the result_cache_ in number of rows. A value <= 0 means no caching. + int64_t result_cache_max_size_; + + ObjectPool profile_pool_; + + /// The ClientRequestState builds three separate profiles. + /// * profile_ is the top-level profile which houses the other + /// profiles, plus the query timeline + /// * summary_profile_ contains mostly static information about the + /// query, including the query statement, the plan and the user who submitted it. + /// * server_profile_ tracks time spent inside the ImpalaServer, + /// but not inside fragment execution, i.e. the time taken to + /// register and set-up the query and for rows to be fetched. + // + /// There's a fourth profile which is not built here (but is a + /// child of profile_); the execution profile which tracks the + /// actual fragment execution. + RuntimeProfile profile_; + RuntimeProfile server_profile_; + RuntimeProfile summary_profile_; + RuntimeProfile::Counter* row_materialization_timer_; + + /// Tracks how long we are idle waiting for a client to fetch rows. + RuntimeProfile::Counter* client_wait_timer_; + /// Timer to track idle time for the above counter. + MonotonicStopWatch client_wait_sw_; + + RuntimeProfile::EventSequence* query_events_; + + bool is_cancelled_; // if true, Cancel() was called. + bool eos_; // if true, there are no more rows to return + // We enforce the invariant that query_status_ is not OK iff query_state_ + // is EXCEPTION, given that lock_ is held. + beeswax::QueryState::type query_state_; + Status query_status_; + TExecRequest exec_request_; + + TResultSetMetadata result_metadata_; // metadata for select query + RowBatch* current_batch_; // the current row batch; only applicable if coord is set + int current_batch_row_; // number of rows fetched within the current batch + int num_rows_fetched_; // number of rows fetched by client for the entire query + + /// True if a fetch was attempted by a client, regardless of whether a result set + /// (or error) was returned to the client. + bool fetched_rows_; + + /// To get access to UpdateCatalog, LOAD, and DDL methods. Not owned. + Frontend* frontend_; + + /// The parent ImpalaServer; called to wait until the the impalad has processed a + /// catalog update request. Not owned. + ImpalaServer* parent_server_; + + /// Start/end time of the query + TimestampValue start_time_, end_time_; + + /// Executes a local catalog operation (an operation that does not need to execute + /// against the catalog service). Includes USE, SHOW, DESCRIBE, and EXPLAIN statements. + Status ExecLocalCatalogOp(const TCatalogOpRequest& catalog_op) WARN_UNUSED_RESULT; + + /// Updates last_active_time_ms_ and ref_count_ to reflect that query is currently not + /// doing any work. Takes expiration_data_lock_. + void MarkInactive(); + + /// Updates last_active_time_ms_ and ref_count_ to reflect that query is currently being + /// actively processed. Takes expiration_data_lock_. + void MarkActive(); + + /// Core logic of initiating a query or dml execution request. + /// Initiates execution of plan fragments, if there are any, and sets + /// up the output exprs for subsequent calls to FetchRows(). + /// 'coord_' is only valid after this method is called, and may be invalid if it + /// returns an error. + /// Also sets up profile and pre-execution counters. + /// Non-blocking. + Status ExecQueryOrDmlRequest(const TQueryExecRequest& query_exec_request) + WARN_UNUSED_RESULT; + + /// Core logic of executing a ddl statement. May internally initiate execution of + /// queries (e.g., compute stats) or dml (e.g., create table as select) + Status ExecDdlRequest() WARN_UNUSED_RESULT; + + /// Executes a LOAD DATA + Status ExecLoadDataRequest() WARN_UNUSED_RESULT; + + /// Core logic of Wait(). Does not update query_state_/status_. + Status WaitInternal() WARN_UNUSED_RESULT; + + /// Core logic of FetchRows(). Does not update query_state_/status_. + /// Caller needs to hold fetch_rows_lock_ and lock_. + Status FetchRowsInternal(const int32_t max_rows, QueryResultSet* fetched_rows) + WARN_UNUSED_RESULT; + + /// Evaluates 'output_expr_ctxs_' against 'row' and output the evaluated row in + /// 'result'. The values' scales (# of digits after decimal) are stored in 'scales'. + /// result and scales must have been resized to the number of columns before call. + Status GetRowValue(TupleRow* row, std::vector<void*>* result, std::vector<int>* scales) + WARN_UNUSED_RESULT; + + /// Gather and publish all required updates to the metastore + Status UpdateCatalog() WARN_UNUSED_RESULT; + + /// Copies results into request_result_set_ + /// TODO: Have the FE return list<Data.TResultRow> so that this isn't necessary + void SetResultSet(const TDdlExecResponse* ddl_resp); + void SetResultSet(const std::vector<std::string>& results); + void SetResultSet(const std::vector<std::string>& col1, + const std::vector<std::string>& col2); + void SetResultSet(const std::vector<std::string>& col1, + const std::vector<std::string>& col2, const std::vector<std::string>& col3, + const std::vector<std::string>& col4); + + /// Sets the result set for a CREATE TABLE AS SELECT statement. The results will not be + /// ready until all BEs complete execution. This can be called as part of Wait(), + /// at which point results will be avilable. + void SetCreateTableAsSelectResultSet(); + + /// Updates the metastore's table and column statistics based on the child-query results + /// of a compute stats command. + /// TODO: Unify the various ways that the Metastore is updated for DDL/DML. + /// For example, INSERT queries update partition metadata in UpdateCatalog() using a + /// TUpdateCatalogRequest, whereas our DDL uses a TCatalogOpRequest for very similar + /// purposes. Perhaps INSERT should use a TCatalogOpRequest as well. + Status UpdateTableAndColumnStats(const std::vector<ChildQuery*>& child_queries) + WARN_UNUSED_RESULT; + + /// Sets result_cache_ to NULL and updates its associated metrics and mem consumption. + /// This function is a no-op if the cache has already been cleared. + void ClearResultCache(); +}; + +} +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/fe-support.cc ---------------------------------------------------------------------- diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc index 8f52352..bb35089 100644 --- a/be/src/service/fe-support.cc +++ b/be/src/service/fe-support.cc @@ -103,10 +103,11 @@ Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow( // Allow logging of at least one error, so we can detect and convert it into a // Java exception. query_ctx.client_request.query_options.max_errors = 1; - // Track memory against a dummy "fe-eval-exprs" resource pool - we don't // know what resource pool the query has been assigned to yet. - RuntimeState state(query_ctx, ExecEnv::GetInstance(), "fe-eval-exprs"); + query_ctx.request_pool = "fe-eval-exprs"; + + RuntimeState state(query_ctx, ExecEnv::GetInstance()); // Make sure to close the runtime state no matter how this scope is exited. const auto close_runtime_state = MakeScopeExitTrigger([&state]() { state.ReleaseResources(); }); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/impala-beeswax-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc index bd4ee67..67ecc79 100644 --- a/be/src/service/impala-beeswax-server.cc +++ b/be/src/service/impala-beeswax-server.cc @@ -25,7 +25,7 @@ #include "runtime/exec-env.h" #include "runtime/raw-value.inline.h" #include "runtime/timestamp-value.h" -#include "service/query-exec-state.h" +#include "service/client-request-state.h" #include "service/query-options.h" #include "service/query-result-set.h" #include "util/impalad-metrics.h" @@ -64,22 +64,22 @@ void ImpalaServer::query(QueryHandle& query_handle, const Query& query) { // raise Syntax error or access violation; it's likely to be syntax/analysis error // TODO: that may not be true; fix this - shared_ptr<QueryExecState> exec_state; - RAISE_IF_ERROR(Execute(&query_ctx, session, &exec_state), + shared_ptr<ClientRequestState> request_state; + RAISE_IF_ERROR(Execute(&query_ctx, session, &request_state), SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION); - exec_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING); + request_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING); // start thread to wait for results to become available, which will allow // us to advance query state to FINISHED or EXCEPTION - exec_state->WaitAsync(); + request_state->WaitAsync(); // Once the query is running do a final check for session closure and add it to the // set of in-flight queries. - Status status = SetQueryInflight(session, exec_state); + Status status = SetQueryInflight(session, request_state); if (!status.ok()) { - UnregisterQuery(exec_state->query_id(), false, &status); + (void) UnregisterQuery(request_state->query_id(), false, &status); RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR); } - TUniqueIdToQueryHandle(exec_state->query_id(), &query_handle); + TUniqueIdToQueryHandle(request_state->query_id(), &query_handle); } void ImpalaServer::executeAndWait(QueryHandle& query_handle, const Query& query, @@ -94,7 +94,7 @@ void ImpalaServer::executeAndWait(QueryHandle& query_handle, const Query& query, // raise general error for request conversion error; RAISE_IF_ERROR(QueryToTQueryContext(query, &query_ctx), SQLSTATE_GENERAL_ERROR); - shared_ptr<QueryExecState> exec_state; + shared_ptr<ClientRequestState> request_state; DCHECK(session != NULL); // The session should exist. { // The session is created when the client connects. Depending on the underlying @@ -106,27 +106,27 @@ void ImpalaServer::executeAndWait(QueryHandle& query_handle, const Query& query, // raise Syntax error or access violation; it's likely to be syntax/analysis error // TODO: that may not be true; fix this - RAISE_IF_ERROR(Execute(&query_ctx, session, &exec_state), + RAISE_IF_ERROR(Execute(&query_ctx, session, &request_state), SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION); - exec_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING); + request_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING); // Once the query is running do a final check for session closure and add it to the // set of in-flight queries. - Status status = SetQueryInflight(session, exec_state); + Status status = SetQueryInflight(session, request_state); if (!status.ok()) { - UnregisterQuery(exec_state->query_id(), false, &status); + (void) UnregisterQuery(request_state->query_id(), false, &status); RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR); } // block until results are ready - exec_state->Wait(); - status = exec_state->query_status(); + request_state->Wait(); + status = request_state->query_status(); if (!status.ok()) { - UnregisterQuery(exec_state->query_id(), false, &status); + (void) UnregisterQuery(request_state->query_id(), false, &status); RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR); } - exec_state->UpdateNonErrorQueryState(beeswax::QueryState::FINISHED); - TUniqueIdToQueryHandle(exec_state->query_id(), &query_handle); + request_state->UpdateNonErrorQueryState(beeswax::QueryState::FINISHED); + TUniqueIdToQueryHandle(request_state->query_id(), &query_handle); // If the input log context id is an empty string, then create a new number and // set it to _return. Otherwise, set _return with the input log context @@ -172,7 +172,7 @@ void ImpalaServer::fetch(Results& query_results, const QueryHandle& query_handle VLOG_ROW << "fetch result: #results=" << query_results.data.size() << " has_more=" << (query_results.has_more ? "true" : "false"); if (!status.ok()) { - UnregisterQuery(query_id, false, &status); + (void) UnregisterQuery(query_id, false, &status); RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR); } } @@ -188,18 +188,18 @@ void ImpalaServer::get_results_metadata(ResultsMetadata& results_metadata, TUniqueId query_id; QueryHandleToTUniqueId(handle, &query_id); VLOG_QUERY << "get_results_metadata(): query_id=" << PrintId(query_id); - shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, true); - if (UNLIKELY(exec_state.get() == nullptr)) { + shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, true); + if (UNLIKELY(request_state.get() == nullptr)) { RaiseBeeswaxException(Substitute("Invalid query handle: $0", PrintId(query_id)), SQLSTATE_GENERAL_ERROR); } { - // make sure we release the lock on exec_state if we see any error - lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t()); + // make sure we release the lock on request_state if we see any error + lock_guard<mutex> l(*request_state->lock(), adopt_lock_t()); // Convert TResultSetMetadata to Beeswax.ResultsMetadata - const TResultSetMetadata* result_set_md = exec_state->result_metadata(); + const TResultSetMetadata* result_set_md = request_state->result_metadata(); results_metadata.__isset.schema = true; results_metadata.schema.__isset.fieldSchemas = true; results_metadata.schema.fieldSchemas.resize(result_set_md->columns.size()); @@ -232,7 +232,7 @@ void ImpalaServer::close(const QueryHandle& handle) { QueryHandleToTUniqueId(handle, &query_id); VLOG_QUERY << "close(): query_id=" << PrintId(query_id); // TODO: do we need to raise an exception if the query state is EXCEPTION? - // TODO: use timeout to get rid of unwanted exec_state. + // TODO: use timeout to get rid of unwanted request_state. RAISE_IF_ERROR(UnregisterQuery(query_id, true), SQLSTATE_GENERAL_ERROR); } @@ -244,9 +244,9 @@ beeswax::QueryState::type ImpalaServer::get_state(const QueryHandle& handle) { QueryHandleToTUniqueId(handle, &query_id); VLOG_ROW << "get_state(): query_id=" << PrintId(query_id); - lock_guard<mutex> l(query_exec_state_map_lock_); - QueryExecStateMap::iterator entry = query_exec_state_map_.find(query_id); - if (entry != query_exec_state_map_.end()) { + lock_guard<mutex> l(client_request_state_map_lock_); + ClientRequestStateMap::iterator entry = client_request_state_map_.find(query_id); + if (entry != client_request_state_map_.end()) { return entry->second->query_state(); } else { VLOG_QUERY << "ImpalaServer::get_state invalid handle"; @@ -277,8 +277,8 @@ void ImpalaServer::get_log(string& log, const LogContextId& context) { TUniqueId query_id; QueryHandleToTUniqueId(handle, &query_id); - shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false); - if (exec_state.get() == NULL) { + shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false); + if (request_state.get() == NULL) { stringstream str; str << "unknown query id: " << query_id; LOG(ERROR) << str.str(); @@ -286,17 +286,17 @@ void ImpalaServer::get_log(string& log, const LogContextId& context) { } stringstream error_log_ss; // If the query status is !ok, include the status error message at the top of the log. - if (!exec_state->query_status().ok()) { - error_log_ss << exec_state->query_status().GetDetail() << "\n"; + if (!request_state->query_status().ok()) { + error_log_ss << request_state->query_status().GetDetail() << "\n"; } // Add warnings from analysis - error_log_ss << join(exec_state->GetAnalysisWarnings(), "\n"); + error_log_ss << join(request_state->GetAnalysisWarnings(), "\n"); // Add warnings from execution - if (exec_state->coord() != NULL) { - if (!exec_state->query_status().ok()) error_log_ss << "\n\n"; - error_log_ss << exec_state->coord()->GetErrorLog(); + if (request_state->coord() != NULL) { + if (!request_state->query_status().ok()) error_log_ss << "\n\n"; + error_log_ss << request_state->coord()->GetErrorLog(); } log = error_log_ss.str(); } @@ -455,30 +455,31 @@ inline void ImpalaServer::QueryHandleToTUniqueId(const QueryHandle& handle, Status ImpalaServer::FetchInternal(const TUniqueId& query_id, const bool start_over, const int32_t fetch_size, beeswax::Results* query_results) { - shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false); - if (UNLIKELY(exec_state == nullptr)) { + shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false); + if (UNLIKELY(request_state == nullptr)) { return Status(Substitute("Invalid query handle: $0", PrintId(query_id))); } - // Make sure QueryExecState::Wait() has completed before fetching rows. Wait() ensures - // that rows are ready to be fetched (e.g., Wait() opens QueryExecState::output_exprs_, - // which are evaluated in QueryExecState::FetchRows() below). - exec_state->BlockOnWait(); + // Make sure ClientRequestState::Wait() has completed before fetching rows. Wait() + // ensures that rows are ready to be fetched (e.g., Wait() opens + // ClientRequestState::output_exprs_, which are evaluated in + // ClientRequestState::FetchRows() below). + request_state->BlockOnWait(); - lock_guard<mutex> frl(*exec_state->fetch_rows_lock()); - lock_guard<mutex> l(*exec_state->lock()); + lock_guard<mutex> frl(*request_state->fetch_rows_lock()); + lock_guard<mutex> l(*request_state->lock()); - if (exec_state->num_rows_fetched() == 0) { - exec_state->query_events()->MarkEvent("First row fetched"); - exec_state->set_fetched_rows(); + if (request_state->num_rows_fetched() == 0) { + request_state->query_events()->MarkEvent("First row fetched"); + request_state->set_fetched_rows(); } // Check for cancellation or an error. - RETURN_IF_ERROR(exec_state->query_status()); + RETURN_IF_ERROR(request_state->query_status()); // ODBC-190: set Beeswax's Results.columns to work around bug ODBC-190; // TODO: remove the block of code when ODBC-190 is resolved. - const TResultSetMetadata* result_metadata = exec_state->result_metadata(); + const TResultSetMetadata* result_metadata = request_state->result_metadata(); query_results->columns.resize(result_metadata->columns.size()); for (int i = 0; i < result_metadata->columns.size(); ++i) { // TODO: As of today, the ODBC driver does not support boolean and timestamp data @@ -498,16 +499,16 @@ Status ImpalaServer::FetchInternal(const TUniqueId& query_id, query_results->__set_ready(true); // It's likely that ODBC doesn't care about start_row, but Hue needs it. For Hue, // start_row starts from zero, not one. - query_results->__set_start_row(exec_state->num_rows_fetched()); + query_results->__set_start_row(request_state->num_rows_fetched()); Status fetch_rows_status; query_results->data.clear(); - if (!exec_state->eos()) { + if (!request_state->eos()) { scoped_ptr<QueryResultSet> result_set(QueryResultSet::CreateAsciiQueryResultSet( - *exec_state->result_metadata(), &query_results->data)); - fetch_rows_status = exec_state->FetchRows(fetch_size, result_set.get()); + *request_state->result_metadata(), &query_results->data)); + fetch_rows_status = request_state->FetchRows(fetch_size, result_set.get()); } - query_results->__set_has_more(!exec_state->eos()); + query_results->__set_has_more(!request_state->eos()); query_results->__isset.data = true; return fetch_rows_status; @@ -515,15 +516,15 @@ Status ImpalaServer::FetchInternal(const TUniqueId& query_id, Status ImpalaServer::CloseInsertInternal(const TUniqueId& query_id, TInsertResult* insert_result) { - shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, true); - if (UNLIKELY(exec_state == nullptr)) { + shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, true); + if (UNLIKELY(request_state == nullptr)) { return Status(Substitute("Invalid query handle: $0", PrintId(query_id))); } Status query_status; { - lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t()); - query_status = exec_state->query_status(); + lock_guard<mutex> l(*request_state->lock(), adopt_lock_t()); + query_status = request_state->query_status(); if (query_status.ok()) { // Coord may be NULL for a SELECT with LIMIT 0. // Note that when IMPALA-87 is fixed (INSERT without FROM clause) we might @@ -531,9 +532,9 @@ Status ImpalaServer::CloseInsertInternal(const TUniqueId& query_id, // coordinator, depending on how we choose to drive the table sink. int64_t num_row_errors = 0; bool has_kudu_stats = false; - if (exec_state->coord() != NULL) { + if (request_state->coord() != NULL) { for (const PartitionStatusMap::value_type& v: - exec_state->coord()->per_partition_status()) { + request_state->coord()->per_partition_status()) { const pair<string, TInsertPartitionStatus> partition_status = v; insert_result->rows_modified[partition_status.first] = partition_status.second.num_modified_rows;
