http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/impala-hs2-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc index 51a04f2..b428512 100644 --- a/be/src/service/impala-hs2-server.cc +++ b/be/src/service/impala-hs2-server.cc @@ -38,7 +38,7 @@ #include "runtime/raw-value.h" #include "runtime/exec-env.h" #include "service/hs2-util.h" -#include "service/query-exec-state.h" +#include "service/client-request-state.h" #include "service/query-options.h" #include "service/query-result-set.h" #include "util/debug-util.h" @@ -133,7 +133,7 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle, session->ToThrift(session_id, &query_ctx.session); request->__set_session(query_ctx.session); - shared_ptr<QueryExecState> exec_state; + shared_ptr<ClientRequestState> request_state; // There is no user-supplied query text available because this metadata operation comes // from an RPC. As a best effort, we use the type of the operation. map<int, const char*>::const_iterator query_text_it = @@ -141,9 +141,9 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle, const string& query_text = query_text_it == _TMetadataOpcode_VALUES_TO_NAMES.end() ? "N/A" : query_text_it->second; query_ctx.client_request.stmt = query_text; - exec_state.reset(new QueryExecState(query_ctx, exec_env_, + request_state.reset(new ClientRequestState(query_ctx, exec_env_, exec_env_->frontend(), this, session)); - Status register_status = RegisterQuery(session, exec_state); + Status register_status = RegisterQuery(session, request_state); if (!register_status.ok()) { status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS); status->__set_errorMessage(register_status.GetDetail()); @@ -151,20 +151,20 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle, return; } - Status exec_status = exec_state->Exec(*request); + Status exec_status = request_state->Exec(*request); if (!exec_status.ok()) { - UnregisterQuery(exec_state->query_id(), false, &exec_status); + (void) UnregisterQuery(request_state->query_id(), false, &exec_status); status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS); status->__set_errorMessage(exec_status.GetDetail()); status->__set_sqlState(SQLSTATE_GENERAL_ERROR); return; } - exec_state->UpdateNonErrorQueryState(beeswax::QueryState::FINISHED); + request_state->UpdateNonErrorQueryState(beeswax::QueryState::FINISHED); - Status inflight_status = SetQueryInflight(session, exec_state); + Status inflight_status = SetQueryInflight(session, request_state); if (!inflight_status.ok()) { - UnregisterQuery(exec_state->query_id(), false, &inflight_status); + (void) UnregisterQuery(request_state->query_id(), false, &inflight_status); status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS); status->__set_errorMessage(inflight_status.GetDetail()); status->__set_sqlState(SQLSTATE_GENERAL_ERROR); @@ -172,55 +172,56 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle, } handle->__set_hasResultSet(true); // TODO: create secret for operationId - TUniqueId operation_id = exec_state->query_id(); + TUniqueId operation_id = request_state->query_id(); TUniqueIdToTHandleIdentifier(operation_id, operation_id, &(handle->operationId)); status->__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS); } Status ImpalaServer::FetchInternal(const TUniqueId& query_id, int32_t fetch_size, bool fetch_first, TFetchResultsResp* fetch_results) { - shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false); - if (UNLIKELY(exec_state == nullptr)) { + shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false); + if (UNLIKELY(request_state == nullptr)) { return Status(Substitute("Invalid query handle: $0", PrintId(query_id))); } // FetchResults doesn't have an associated session handle, so we presume that this // request should keep alive the same session that orignated the query. ScopedSessionState session_handle(this); - const TUniqueId session_id = exec_state->session_id(); + const TUniqueId session_id = request_state->session_id(); shared_ptr<SessionState> session; RETURN_IF_ERROR(session_handle.WithSession(session_id, &session)); - // Make sure QueryExecState::Wait() has completed before fetching rows. Wait() ensures - // that rows are ready to be fetched (e.g., Wait() opens QueryExecState::output_exprs_, - // which are evaluated in QueryExecState::FetchRows() below). - exec_state->BlockOnWait(); + // Make sure ClientRequestState::Wait() has completed before fetching rows. Wait() + // ensures that rows are ready to be fetched (e.g., Wait() opens + // ClientRequestState::output_exprs_, which are evaluated in + // ClientRequestState::FetchRows() below). + request_state->BlockOnWait(); - lock_guard<mutex> frl(*exec_state->fetch_rows_lock()); - lock_guard<mutex> l(*exec_state->lock()); + lock_guard<mutex> frl(*request_state->fetch_rows_lock()); + lock_guard<mutex> l(*request_state->lock()); // Check for cancellation or an error. - RETURN_IF_ERROR(exec_state->query_status()); + RETURN_IF_ERROR(request_state->query_status()); - if (exec_state->num_rows_fetched() == 0) { - exec_state->query_events()->MarkEvent("First row fetched"); - exec_state->set_fetched_rows(); + if (request_state->num_rows_fetched() == 0) { + request_state->query_events()->MarkEvent("First row fetched"); + request_state->set_fetched_rows(); } - if (fetch_first) RETURN_IF_ERROR(exec_state->RestartFetch()); + if (fetch_first) RETURN_IF_ERROR(request_state->RestartFetch()); - fetch_results->results.__set_startRowOffset(exec_state->num_rows_fetched()); + fetch_results->results.__set_startRowOffset(request_state->num_rows_fetched()); // Child queries should always return their results in row-major format, rather than // inheriting the parent session's setting. - bool is_child_query = exec_state->parent_query_id() != TUniqueId(); + bool is_child_query = request_state->parent_query_id() != TUniqueId(); TProtocolVersion::type version = is_child_query ? TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V1 : session->hs2_version; scoped_ptr<QueryResultSet> result_set(QueryResultSet::CreateHS2ResultSet( - version, *(exec_state->result_metadata()), &(fetch_results->results))); - RETURN_IF_ERROR(exec_state->FetchRows(fetch_size, result_set.get())); + version, *(request_state->result_metadata()), &(fetch_results->results))); + RETURN_IF_ERROR(request_state->FetchRows(fetch_size, result_set.get())); fetch_results->__isset.results = true; - fetch_results->__set_hasMoreRows(!exec_state->eos()); + fetch_results->__set_hasMoreRows(!request_state->eos()); return Status::OK(); } @@ -451,36 +452,36 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val, } } - shared_ptr<QueryExecState> exec_state; - status = Execute(&query_ctx, session, &exec_state); + shared_ptr<ClientRequestState> request_state; + status = Execute(&query_ctx, session, &request_state); HS2_RETURN_IF_ERROR(return_val, status, SQLSTATE_GENERAL_ERROR); - // Optionally enable result caching on the QueryExecState. + // Optionally enable result caching on the ClientRequestState. if (cache_num_rows > 0) { - status = exec_state->SetResultCache( + status = request_state->SetResultCache( QueryResultSet::CreateHS2ResultSet( - session->hs2_version, *exec_state->result_metadata(), nullptr), + session->hs2_version, *request_state->result_metadata(), nullptr), cache_num_rows); if (!status.ok()) { - UnregisterQuery(exec_state->query_id(), false, &status); + (void) UnregisterQuery(request_state->query_id(), false, &status); HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR); } } - exec_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING); + request_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING); // Start thread to wait for results to become available. - exec_state->WaitAsync(); + request_state->WaitAsync(); // Once the query is running do a final check for session closure and add it to the // set of in-flight queries. - status = SetQueryInflight(session, exec_state); + status = SetQueryInflight(session, request_state); if (!status.ok()) { - UnregisterQuery(exec_state->query_id(), false, &status); + (void) UnregisterQuery(request_state->query_id(), false, &status); HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR); } return_val.__isset.operationHandle = true; return_val.operationHandle.__set_operationType(TOperationType::EXECUTE_STATEMENT); - return_val.operationHandle.__set_hasResultSet(exec_state->returns_result_set()); - // TODO: create secret for operationId and store the secret in exec_state - TUniqueIdToTHandleIdentifier(exec_state->query_id(), exec_state->query_id(), + return_val.operationHandle.__set_hasResultSet(request_state->returns_result_set()); + // TODO: create secret for operationId and store the secret in request_state + TUniqueIdToTHandleIdentifier(request_state->query_id(), request_state->query_id(), &return_val.operationHandle.operationId); return_val.status.__set_statusCode( apache::hive::service::cli::thrift::TStatusCode::SUCCESS_STATUS); @@ -633,30 +634,30 @@ void ImpalaServer::GetOperationStatus(TGetOperationStatusResp& return_val, request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR); VLOG_ROW << "GetOperationStatus(): query_id=" << PrintId(query_id); - shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false); - if (UNLIKELY(exec_state.get() == nullptr)) { + shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false); + if (UNLIKELY(request_state.get() == nullptr)) { // No handle was found HS2_RETURN_ERROR(return_val, Substitute("Invalid query handle: $0", PrintId(query_id)), SQLSTATE_GENERAL_ERROR); } ScopedSessionState session_handle(this); - const TUniqueId session_id = exec_state->session_id(); + const TUniqueId session_id = request_state->session_id(); shared_ptr<SessionState> session; HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id, &session), SQLSTATE_GENERAL_ERROR); { - lock_guard<mutex> l(*exec_state->lock()); + lock_guard<mutex> l(*request_state->lock()); TOperationState::type operation_state = QueryStateToTOperationState( - exec_state->query_state()); + request_state->query_state()); return_val.__set_operationState(operation_state); if (operation_state == TOperationState::ERROR_STATE) { - DCHECK(!exec_state->query_status().ok()); - return_val.__set_errorMessage(exec_state->query_status().GetDetail()); + DCHECK(!request_state->query_status().ok()); + return_val.__set_errorMessage(request_state->query_status().GetDetail()); return_val.__set_sqlState(SQLSTATE_GENERAL_ERROR); } else { - DCHECK(exec_state->query_status().ok()); + DCHECK(request_state->query_status().ok()); } } } @@ -669,14 +670,14 @@ void ImpalaServer::CancelOperation(TCancelOperationResp& return_val, request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR); VLOG_QUERY << "CancelOperation(): query_id=" << PrintId(query_id); - shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false); - if (UNLIKELY(exec_state.get() == nullptr)) { + shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false); + if (UNLIKELY(request_state.get() == nullptr)) { // No handle was found HS2_RETURN_ERROR(return_val, Substitute("Invalid query handle: $0", PrintId(query_id)), SQLSTATE_GENERAL_ERROR); } ScopedSessionState session_handle(this); - const TUniqueId session_id = exec_state->session_id(); + const TUniqueId session_id = request_state->session_id(); HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id), SQLSTATE_GENERAL_ERROR); HS2_RETURN_IF_ERROR(return_val, CancelInternal(query_id, true), SQLSTATE_GENERAL_ERROR); @@ -691,17 +692,17 @@ void ImpalaServer::CloseOperation(TCloseOperationResp& return_val, request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR); VLOG_QUERY << "CloseOperation(): query_id=" << PrintId(query_id); - shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false); - if (UNLIKELY(exec_state.get() == nullptr)) { + shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false); + if (UNLIKELY(request_state.get() == nullptr)) { // No handle was found HS2_RETURN_ERROR(return_val, Substitute("Invalid query handle: $0", PrintId(query_id)), SQLSTATE_GENERAL_ERROR); } ScopedSessionState session_handle(this); - const TUniqueId session_id = exec_state->session_id(); + const TUniqueId session_id = request_state->session_id(); HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id), SQLSTATE_GENERAL_ERROR); - // TODO: use timeout to get rid of unwanted exec_state. + // TODO: use timeout to get rid of unwanted request_state. HS2_RETURN_IF_ERROR(return_val, UnregisterQuery(query_id, true), SQLSTATE_GENERAL_ERROR); return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS); @@ -730,19 +731,19 @@ void ImpalaServer::GetResultSetMetadata(TGetResultSetMetadataResp& return_val, HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id), SQLSTATE_GENERAL_ERROR); - shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, true); - if (UNLIKELY(exec_state.get() == nullptr)) { + shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, true); + if (UNLIKELY(request_state.get() == nullptr)) { VLOG_QUERY << "GetResultSetMetadata(): invalid query handle"; // No handle was found HS2_RETURN_ERROR(return_val, Substitute("Invalid query handle: $0", PrintId(query_id)), SQLSTATE_GENERAL_ERROR); } { - // make sure we release the lock on exec_state if we see any error - lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t()); + // make sure we release the lock on request_state if we see any error + lock_guard<mutex> l(*request_state->lock(), adopt_lock_t()); // Convert TResultSetMetadata to TGetResultSetMetadataResp - const TResultSetMetadata* result_set_md = exec_state->result_metadata(); + const TResultSetMetadata* result_set_md = request_state->result_metadata(); DCHECK(result_set_md != NULL); if (result_set_md->columns.size() > 0) { return_val.__isset.schema = true; @@ -793,7 +794,7 @@ void ImpalaServer::FetchResults(TFetchResultsResp& return_val, if (status.IsRecoverableError()) { DCHECK(fetch_first); } else { - UnregisterQuery(query_id, false, &status); + (void) UnregisterQuery(query_id, false, &status); } HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR); } @@ -806,8 +807,8 @@ void ImpalaServer::GetLog(TGetLogResp& return_val, const TGetLogReq& request) { HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId( request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR); - shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false); - if (UNLIKELY(exec_state.get() == nullptr)) { + shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false); + if (UNLIKELY(request_state.get() == nullptr)) { // No handle was found HS2_RETURN_ERROR(return_val, Substitute("Invalid query handle: $0", PrintId(query_id)), SQLSTATE_GENERAL_ERROR); @@ -816,20 +817,20 @@ void ImpalaServer::GetLog(TGetLogResp& return_val, const TGetLogReq& request) { // GetLog doesn't have an associated session handle, so we presume that this request // should keep alive the same session that orignated the query. ScopedSessionState session_handle(this); - const TUniqueId session_id = exec_state->session_id(); + const TUniqueId session_id = request_state->session_id(); HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id), SQLSTATE_GENERAL_ERROR); stringstream ss; - if (exec_state->coord() != NULL) { + if (request_state->coord() != NULL) { // Report progress - ss << exec_state->coord()->progress().ToString() << "\n"; + ss << request_state->coord()->progress().ToString() << "\n"; } // Report analysis errors - ss << join(exec_state->GetAnalysisWarnings(), "\n"); - if (exec_state->coord() != NULL) { + ss << join(request_state->GetAnalysisWarnings(), "\n"); + if (request_state->coord() != NULL) { // Report execution errors - ss << exec_state->coord()->GetErrorLog(); + ss << request_state->coord()->GetErrorLog(); } return_val.log = ss.str(); return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/impala-http-handler.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc index b9f3382..6ccb6df 100644 --- a/be/src/service/impala-http-handler.cc +++ b/be/src/service/impala-http-handler.cc @@ -28,7 +28,7 @@ #include "runtime/mem-tracker.h" #include "runtime/query-state.h" #include "service/impala-server.h" -#include "service/query-exec-state.h" +#include "service/client-request-state.h" #include "thrift/protocol/TDebugProtocol.h" #include "util/coding-util.h" #include "util/logging-support.h" @@ -237,11 +237,11 @@ void ImpalaHttpHandler::QueryProfileEncodedHandler(const Webserver::ArgumentMap& void ImpalaHttpHandler::InflightQueryIdsHandler(const Webserver::ArgumentMap& args, Document* document) { - lock_guard<mutex> l(server_->query_exec_state_map_lock_); + lock_guard<mutex> l(server_->client_request_state_map_lock_); stringstream ss; - for (const ImpalaServer::QueryExecStateMap::value_type& exec_state: - server_->query_exec_state_map_) { - ss << exec_state.second->query_id() << "\n"; + for (const ImpalaServer::ClientRequestStateMap::value_type& request_state: + server_->client_request_state_map_) { + ss << request_state.second->query_id() << "\n"; } document->AddMember(Webserver::ENABLE_RAW_JSON_KEY, true, document->GetAllocator()); Value query_ids(ss.str().c_str(), document->GetAllocator()); @@ -361,11 +361,11 @@ void ImpalaHttpHandler::QueryStateHandler(const Webserver::ArgumentMap& args, set<ImpalaServer::QueryStateRecord, ImpalaServer::QueryStateRecordLessThan> sorted_query_records; { - lock_guard<mutex> l(server_->query_exec_state_map_lock_); - for (const ImpalaServer::QueryExecStateMap::value_type& exec_state: - server_->query_exec_state_map_) { + lock_guard<mutex> l(server_->client_request_state_map_lock_); + for (const ImpalaServer::ClientRequestStateMap::value_type& request_state: + server_->client_request_state_map_) { // TODO: Do this in the browser so that sorts on other keys are possible. - sorted_query_records.insert(ImpalaServer::QueryStateRecord(*exec_state.second)); + sorted_query_records.insert(ImpalaServer::QueryStateRecord(*request_state.second)); } } @@ -708,27 +708,26 @@ void ImpalaHttpHandler::QuerySummaryHandler(bool include_json_plan, bool include // Search the in-flight queries first, followed by the archived ones. { - shared_ptr<ImpalaServer::QueryExecState> exec_state = - server_->GetQueryExecState(query_id, true); - if (exec_state != NULL) { + shared_ptr<ClientRequestState> request_state = + server_->GetClientRequestState(query_id, true); + if (request_state != NULL) { found = true; - lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t()); - if (exec_state->coord() == NULL) { + lock_guard<mutex> l(*request_state->lock(), adopt_lock_t()); + if (request_state->coord() == NULL) { const string& err = Substitute("Invalid query id: $0", PrintId(query_id)); Value json_error(err.c_str(), document->GetAllocator()); document->AddMember("error", json_error, document->GetAllocator()); return; } - query_status = exec_state->query_status(); - stmt = exec_state->sql_stmt(); - plan = exec_state->exec_request().query_exec_request.query_plan; + query_status = request_state->query_status(); + stmt = request_state->sql_stmt(); + plan = request_state->exec_request().query_exec_request.query_plan; if (include_json_plan || include_summary) { - lock_guard<SpinLock> lock(exec_state->coord()->GetExecSummaryLock()); - summary = exec_state->coord()->exec_summary(); + request_state->coord()->GetTExecSummary(&summary); } if (include_json_plan) { for (const TPlanExecInfo& plan_exec_info: - exec_state->exec_request().query_exec_request.plan_exec_info) { + request_state->exec_request().query_exec_request.plan_exec_info) { for (const TPlanFragment& fragment: plan_exec_info.fragments) { fragments.push_back(fragment); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/impala-internal-service.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-internal-service.cc b/be/src/service/impala-internal-service.cc index a9f7f01..d8a2a4a 100644 --- a/be/src/service/impala-internal-service.cc +++ b/be/src/service/impala-internal-service.cc @@ -39,12 +39,15 @@ ImpalaInternalService::ImpalaInternalService() { DCHECK(query_exec_mgr_ != nullptr); } -void ImpalaInternalService::ExecPlanFragment(TExecPlanFragmentResult& return_val, - const TExecPlanFragmentParams& params) { - VLOG_QUERY << "ExecPlanFragment():" - << " instance_id=" << params.fragment_instance_ctx.fragment_instance_id; - FAULT_INJECTION_RPC_DELAY(RPC_EXECPLANFRAGMENT); - query_exec_mgr_->StartFInstance(params).SetTStatus(&return_val); +void ImpalaInternalService::ExecQueryFInstances(TExecQueryFInstancesResult& return_val, + const TExecQueryFInstancesParams& params) { + VLOG_QUERY << "ExecQueryFInstances():" << " query_id=" << params.query_ctx.query_id; + FAULT_INJECTION_RPC_DELAY(RPC_EXECQUERYFINSTANCES); + DCHECK(params.__isset.coord_state_idx); + DCHECK(params.__isset.query_ctx); + DCHECK(params.__isset.fragment_ctxs); + DCHECK(params.__isset.fragment_instance_ctxs); + query_exec_mgr_->StartQuery(params).SetTStatus(&return_val); } template <typename T> void SetUnknownIdError( @@ -54,53 +57,54 @@ template <typename T> void SetUnknownIdError( status.SetTStatus(status_container); } -void ImpalaInternalService::CancelPlanFragment(TCancelPlanFragmentResult& return_val, - const TCancelPlanFragmentParams& params) { - VLOG_QUERY << "CancelPlanFragment(): instance_id=" << params.fragment_instance_id; - FAULT_INJECTION_RPC_DELAY(RPC_CANCELPLANFRAGMENT); - QueryState::ScopedRef qs(GetQueryId(params.fragment_instance_id)); +void ImpalaInternalService::CancelQueryFInstances( + TCancelQueryFInstancesResult& return_val, + const TCancelQueryFInstancesParams& params) { + VLOG_QUERY << "CancelQueryFInstances(): query_id=" << params.query_id; + FAULT_INJECTION_RPC_DELAY(RPC_CANCELQUERYFINSTANCES); + DCHECK(params.__isset.query_id); + QueryState::ScopedRef qs(params.query_id); if (qs.get() == nullptr) { - SetUnknownIdError("query", GetQueryId(params.fragment_instance_id), &return_val); + SetUnknownIdError("query", params.query_id, &return_val); return; } - FragmentInstanceState* fis = qs->GetFInstanceState(params.fragment_instance_id); - if (fis == nullptr) { - SetUnknownIdError("instance", params.fragment_instance_id, &return_val); - return; - } - Status status = fis->Cancel(); - status.SetTStatus(&return_val); + qs->Cancel(); } void ImpalaInternalService::ReportExecStatus(TReportExecStatusResult& return_val, const TReportExecStatusParams& params) { - VLOG_QUERY << "ReportExecStatus(): instance_id=" << params.fragment_instance_id; FAULT_INJECTION_RPC_DELAY(RPC_REPORTEXECSTATUS); + DCHECK(params.__isset.query_id); + DCHECK(params.__isset.coord_state_idx); impala_server_->ReportExecStatus(return_val, params); } void ImpalaInternalService::TransmitData(TTransmitDataResult& return_val, const TTransmitDataParams& params) { FAULT_INJECTION_RPC_DELAY(RPC_TRANSMITDATA); + DCHECK(params.__isset.dest_fragment_instance_id); + DCHECK(params.__isset.sender_id); + DCHECK(params.__isset.dest_node_id); impala_server_->TransmitData(return_val, params); } void ImpalaInternalService::UpdateFilter(TUpdateFilterResult& return_val, const TUpdateFilterParams& params) { - VLOG_QUERY << "UpdateFilter(): filter=" << params.filter_id - << " query_id=" << PrintId(params.query_id); FAULT_INJECTION_RPC_DELAY(RPC_UPDATEFILTER); + DCHECK(params.__isset.filter_id); + DCHECK(params.__isset.query_id); + DCHECK(params.__isset.bloom_filter); impala_server_->UpdateFilter(return_val, params); } void ImpalaInternalService::PublishFilter(TPublishFilterResult& return_val, const TPublishFilterParams& params) { - VLOG_QUERY << "PublishFilter(): filter=" << params.filter_id - << " instance_id=" << PrintId(params.dst_instance_id); FAULT_INJECTION_RPC_DELAY(RPC_PUBLISHFILTER); - QueryState::ScopedRef qs(GetQueryId(params.dst_instance_id)); + DCHECK(params.__isset.filter_id); + DCHECK(params.__isset.dst_query_id); + DCHECK(params.__isset.dst_fragment_idx); + DCHECK(params.__isset.bloom_filter); + QueryState::ScopedRef qs(params.dst_query_id); if (qs.get() == nullptr) return; - FragmentInstanceState* fis = qs->GetFInstanceState(params.dst_instance_id); - if (fis == nullptr) return; - fis->PublishFilter(params.filter_id, params.bloom_filter); + qs->PublishFilter(params.filter_id, params.dst_fragment_idx, params.bloom_filter); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/impala-internal-service.h ---------------------------------------------------------------------- diff --git a/be/src/service/impala-internal-service.h b/be/src/service/impala-internal-service.h index 372d617..3285d9d 100644 --- a/be/src/service/impala-internal-service.h +++ b/be/src/service/impala-internal-service.h @@ -31,10 +31,10 @@ class QueryExecMgr; class ImpalaInternalService : public ImpalaInternalServiceIf { public: ImpalaInternalService(); - virtual void ExecPlanFragment(TExecPlanFragmentResult& return_val, - const TExecPlanFragmentParams& params); - virtual void CancelPlanFragment(TCancelPlanFragmentResult& return_val, - const TCancelPlanFragmentParams& params); + virtual void ExecQueryFInstances(TExecQueryFInstancesResult& return_val, + const TExecQueryFInstancesParams& params); + virtual void CancelQueryFInstances(TCancelQueryFInstancesResult& return_val, + const TCancelQueryFInstancesParams& params); virtual void ReportExecStatus(TReportExecStatusResult& return_val, const TReportExecStatusParams& params); virtual void TransmitData(TTransmitDataResult& return_val, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/impala-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index 6132918..1b2e8fe 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -57,7 +57,7 @@ #include "scheduling/scheduler.h" #include "service/impala-http-handler.h" #include "service/impala-internal-service.h" -#include "service/query-exec-state.h" +#include "service/client-request-state.h" #include "util/bit-util.h" #include "util/container-util.h" #include "util/debug-util.h" @@ -381,8 +381,8 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env) exec_env_->SetImpalaServer(this); } -Status ImpalaServer::LogLineageRecord(const QueryExecState& query_exec_state) { - const TExecRequest& request = query_exec_state.exec_request(); +Status ImpalaServer::LogLineageRecord(const ClientRequestState& client_request_state) { + const TExecRequest& request = client_request_state.exec_request(); if (!request.__isset.query_exec_request && !request.__isset.catalog_op_request) { return Status::OK(); } @@ -397,7 +397,7 @@ Status ImpalaServer::LogLineageRecord(const QueryExecState& query_exec_state) { return Status::OK(); } // Set the query end time in TLineageGraph. Must use UNIX time directly rather than - // e.g. converting from query_exec_state.end_time() (IMPALA-4440). + // e.g. converting from client_request_state.end_time() (IMPALA-4440). lineage_graph.__set_ended(UnixMillis() / 1000); string lineage_record; LineageUtil::TLineageToJSON(lineage_graph, &lineage_record); @@ -436,7 +436,7 @@ Status ImpalaServer::InitLineageLogging() { return Status::OK(); } -Status ImpalaServer::LogAuditRecord(const ImpalaServer::QueryExecState& exec_state, +Status ImpalaServer::LogAuditRecord(const ClientRequestState& request_state, const TExecRequest& request) { stringstream ss; rapidjson::StringBuffer buffer; @@ -448,24 +448,24 @@ Status ImpalaServer::LogAuditRecord(const ImpalaServer::QueryExecState& exec_sta writer.String(ss.str().c_str()); writer.StartObject(); writer.String("query_id"); - writer.String(PrintId(exec_state.query_id()).c_str()); + writer.String(PrintId(request_state.query_id()).c_str()); writer.String("session_id"); - writer.String(PrintId(exec_state.session_id()).c_str()); + writer.String(PrintId(request_state.session_id()).c_str()); writer.String("start_time"); - writer.String(exec_state.start_time().DebugString().c_str()); + writer.String(request_state.start_time().DebugString().c_str()); writer.String("authorization_failure"); - writer.Bool(Frontend::IsAuthorizationError(exec_state.query_status())); + writer.Bool(Frontend::IsAuthorizationError(request_state.query_status())); writer.String("status"); - writer.String(exec_state.query_status().GetDetail().c_str()); + writer.String(request_state.query_status().GetDetail().c_str()); writer.String("user"); - writer.String(exec_state.effective_user().c_str()); + writer.String(request_state.effective_user().c_str()); writer.String("impersonator"); - if (exec_state.do_as_user().empty()) { + if (request_state.do_as_user().empty()) { // If there is no do_as_user() is empty, the "impersonator" field should be Null. writer.Null(); } else { // Otherwise, the delegator is the current connected user. - writer.String(exec_state.connected_user().c_str()); + writer.String(request_state.connected_user().c_str()); } writer.String("statement_type"); if (request.stmt_type == TStmtType::DDL) { @@ -480,9 +480,9 @@ Status ImpalaServer::LogAuditRecord(const ImpalaServer::QueryExecState& exec_sta } writer.String("network_address"); writer.String( - lexical_cast<string>(exec_state.session()->network_address).c_str()); + lexical_cast<string>(request_state.session()->network_address).c_str()); writer.String("sql_statement"); - string stmt = replace_all_copy(exec_state.sql_stmt(), "\n", " "); + string stmt = replace_all_copy(request_state.sql_stmt(), "\n", " "); Redact(&stmt); writer.String(stmt.c_str()); writer.String("catalog_objects"); @@ -528,14 +528,14 @@ Status ImpalaServer::InitAuditEventLogging() { return Status::OK(); } -void ImpalaServer::LogQueryEvents(const QueryExecState& exec_state) { - Status status = exec_state.query_status(); +void ImpalaServer::LogQueryEvents(const ClientRequestState& request_state) { + Status status = request_state.query_status(); bool log_events = true; - switch (exec_state.stmt_type()) { + switch (request_state.stmt_type()) { case TStmtType::QUERY: { // If the query didn't finish, log audit and lineage events only if the // the client issued at least one fetch. - if (!status.ok() && !exec_state.fetched_rows()) log_events = false; + if (!status.ok() && !request_state.fetched_rows()) log_events = false; break; } case TStmtType::DML: { @@ -543,13 +543,13 @@ void ImpalaServer::LogQueryEvents(const QueryExecState& exec_state) { break; } case TStmtType::DDL: { - if (exec_state.catalog_op_type() == TCatalogOpType::DDL) { + if (request_state.catalog_op_type() == TCatalogOpType::DDL) { // For a DDL operation, log audit and lineage events only if the // operation finished. if (!status.ok()) log_events = false; } else { // This case covers local catalog operations such as SHOW and DESCRIBE. - if (!status.ok() && !exec_state.fetched_rows()) log_events = false; + if (!status.ok() && !request_state.fetched_rows()) log_events = false; } break; } @@ -561,11 +561,13 @@ void ImpalaServer::LogQueryEvents(const QueryExecState& exec_state) { } // Log audit events that are due to an AuthorizationException. if (IsAuditEventLoggingEnabled() && - (Frontend::IsAuthorizationError(exec_state.query_status()) || log_events)) { - LogAuditRecord(exec_state, exec_state.exec_request()); + (Frontend::IsAuthorizationError(request_state.query_status()) || log_events)) { + // TODO: deal with an error status + (void) LogAuditRecord(request_state, request_state.exec_request()); } if (IsLineageLoggingEnabled() && log_events) { - LogLineageRecord(exec_state); + // TODO: deal with an error status + (void) LogLineageRecord(request_state); } } @@ -592,13 +594,13 @@ Status ImpalaServer::GetRuntimeProfileStr(const TUniqueId& query_id, DCHECK(output != nullptr); // Search for the query id in the active query map { - shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false); - if (exec_state.get() != nullptr) { - lock_guard<mutex> l(*exec_state->lock()); + shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false); + if (request_state.get() != nullptr) { + lock_guard<mutex> l(*request_state->lock()); if (base64_encoded) { - exec_state->profile().SerializeToArchiveString(output); + request_state->profile().SerializeToArchiveString(output); } else { - exec_state->profile().PrettyPrint(output); + request_state->profile().PrettyPrint(output); } return Status::OK(); } @@ -625,20 +627,17 @@ Status ImpalaServer::GetRuntimeProfileStr(const TUniqueId& query_id, Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, TExecSummary* result) { // Search for the query id in the active query map. { - shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, true); - if (exec_state != nullptr) { - lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t()); - if (exec_state->coord() != nullptr) { + shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, true); + if (request_state != nullptr) { + lock_guard<mutex> l(*request_state->lock(), adopt_lock_t()); + if (request_state->coord() != nullptr) { + request_state->coord()->GetTExecSummary(result); + TExecProgress progress; - { - lock_guard<SpinLock> lock(exec_state->coord()->GetExecSummaryLock()); - *result = exec_state->coord()->exec_summary(); - - // Update the current scan range progress for the summary. - progress.__set_num_completed_scan_ranges( - exec_state->coord()->progress().num_complete()); - progress.__set_total_scan_ranges(exec_state->coord()->progress().total()); - } + progress.__set_num_completed_scan_ranges( + request_state->coord()->progress().num_complete()); + progress.__set_total_scan_ranges(request_state->coord()->progress().total()); + // TODO: does this not need to be synchronized? result->__set_progress(progress); return Status::OK(); } @@ -694,7 +693,7 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, TExecSummary* res } } -void ImpalaServer::ArchiveQuery(const QueryExecState& query) { +void ImpalaServer::ArchiveQuery(const ClientRequestState& query) { const string& encoded_profile_str = query.profile().SerializeToArchiveString(); // If there was an error initialising archival (e.g. directory is not writeable), @@ -714,10 +713,7 @@ void ImpalaServer::ArchiveQuery(const QueryExecState& query) { if (FLAGS_query_log_size == 0) return; QueryStateRecord record(query, true, encoded_profile_str); - if (query.coord() != nullptr) { - lock_guard<SpinLock> lock(query.coord()->GetExecSummaryLock()); - record.exec_summary = query.coord()->exec_summary(); - } + if (query.coord() != nullptr) query.coord()->GetTExecSummary(&record.exec_summary); { lock_guard<mutex> l(query_log_lock_); // Add record to the beginning of the log, and to the lookup index. @@ -775,7 +771,7 @@ void ImpalaServer::AddPoolQueryOptions(TQueryCtx* ctx, Status ImpalaServer::Execute(TQueryCtx* query_ctx, shared_ptr<SessionState> session_state, - shared_ptr<QueryExecState>* exec_state) { + shared_ptr<ClientRequestState>* request_state) { PrepareQueryContext(query_ctx); ImpaladMetrics::IMPALA_SERVER_NUM_QUERIES->Increment(1L); @@ -784,11 +780,11 @@ Status ImpalaServer::Execute(TQueryCtx* query_ctx, Redact(&stmt); query_ctx->client_request.__set_redacted_stmt((const string) stmt); - bool registered_exec_state; - Status status = ExecuteInternal(*query_ctx, session_state, ®istered_exec_state, - exec_state); - if (!status.ok() && registered_exec_state) { - UnregisterQuery((*exec_state)->query_id(), false, &status); + bool registered_request_state; + Status status = ExecuteInternal(*query_ctx, session_state, ®istered_request_state, + request_state); + if (!status.ok() && registered_request_state) { + (void) UnregisterQuery((*request_state)->query_id(), false, &status); } return status; } @@ -796,50 +792,50 @@ Status ImpalaServer::Execute(TQueryCtx* query_ctx, Status ImpalaServer::ExecuteInternal( const TQueryCtx& query_ctx, shared_ptr<SessionState> session_state, - bool* registered_exec_state, - shared_ptr<QueryExecState>* exec_state) { + bool* registered_request_state, + shared_ptr<ClientRequestState>* request_state) { DCHECK(session_state != nullptr); - *registered_exec_state = false; + *registered_request_state = false; - exec_state->reset(new QueryExecState(query_ctx, exec_env_, exec_env_->frontend(), + request_state->reset(new ClientRequestState(query_ctx, exec_env_, exec_env_->frontend(), this, session_state)); - (*exec_state)->query_events()->MarkEvent("Query submitted"); + (*request_state)->query_events()->MarkEvent("Query submitted"); TExecRequest result; { - // Keep a lock on exec_state so that registration and setting + // Keep a lock on request_state so that registration and setting // result_metadata are atomic. // - // Note: this acquires the exec_state lock *before* the - // query_exec_state_map_ lock. This is the opposite of - // GetQueryExecState(..., true), and therefore looks like a + // Note: this acquires the request_state lock *before* the + // client_request_state_map_ lock. This is the opposite of + // GetClientRequestState(..., true), and therefore looks like a // candidate for deadlock. The reason this works here is that - // GetQueryExecState cannot find exec_state (under the exec state + // GetClientRequestState cannot find request_state (under the exec state // map lock) and take it's lock until RegisterQuery has // finished. By that point, the exec state map lock will have been // given up, so the classic deadlock interleaving is not possible. - lock_guard<mutex> l(*(*exec_state)->lock()); + lock_guard<mutex> l(*(*request_state)->lock()); // register exec state as early as possible so that queries that // take a long time to plan show up, and to handle incoming status // reports before execution starts. - RETURN_IF_ERROR(RegisterQuery(session_state, *exec_state)); - *registered_exec_state = true; + RETURN_IF_ERROR(RegisterQuery(session_state, *request_state)); + *registered_request_state = true; - RETURN_IF_ERROR((*exec_state)->UpdateQueryStatus( + RETURN_IF_ERROR((*request_state)->UpdateQueryStatus( exec_env_->frontend()->GetExecRequest(query_ctx, &result))); - (*exec_state)->query_events()->MarkEvent("Planning finished"); - (*exec_state)->summary_profile()->AddEventSequence( + (*request_state)->query_events()->MarkEvent("Planning finished"); + (*request_state)->summary_profile()->AddEventSequence( result.timeline.name, result.timeline); if (result.__isset.result_set_metadata) { - (*exec_state)->set_result_metadata(result.result_set_metadata); + (*request_state)->set_result_metadata(result.result_set_metadata); } } VLOG(2) << "Execution request: " << ThriftDebugString(result); // start execution of query; also starts fragment status reports - RETURN_IF_ERROR((*exec_state)->Exec(&result)); + RETURN_IF_ERROR((*request_state)->Exec(&result)); if (result.stmt_type == TStmtType::DDL) { Status status = UpdateCatalogMetrics(); if (!status.ok()) { @@ -847,13 +843,13 @@ Status ImpalaServer::ExecuteInternal( } } - if ((*exec_state)->coord() != nullptr) { + if ((*request_state)->coord() != nullptr) { const unordered_set<TNetworkAddress>& unique_hosts = - (*exec_state)->schedule()->unique_hosts(); + (*request_state)->schedule()->unique_hosts(); if (!unique_hosts.empty()) { lock_guard<mutex> l(query_locations_lock_); for (const TNetworkAddress& port: unique_hosts) { - query_locations_[port].insert((*exec_state)->query_id()); + query_locations_[port].insert((*request_state)->query_id()); } } } @@ -874,32 +870,32 @@ void ImpalaServer::PrepareQueryContext(TQueryCtx* query_ctx) { } Status ImpalaServer::RegisterQuery(shared_ptr<SessionState> session_state, - const shared_ptr<QueryExecState>& exec_state) { + const shared_ptr<ClientRequestState>& request_state) { lock_guard<mutex> l2(session_state->lock); // The session wasn't expired at the time it was checked out and it isn't allowed to // expire while checked out, so it must not be expired. DCHECK(session_state->ref_count > 0 && !session_state->expired); // The session may have been closed after it was checked out. if (session_state->closed) return Status("Session has been closed, ignoring query."); - const TUniqueId& query_id = exec_state->query_id(); + const TUniqueId& query_id = request_state->query_id(); { - lock_guard<mutex> l(query_exec_state_map_lock_); - QueryExecStateMap::iterator entry = query_exec_state_map_.find(query_id); - if (entry != query_exec_state_map_.end()) { + lock_guard<mutex> l(client_request_state_map_lock_); + ClientRequestStateMap::iterator entry = client_request_state_map_.find(query_id); + if (entry != client_request_state_map_.end()) { // There shouldn't be an active query with that same id. // (query_id is globally unique) stringstream ss; ss << "query id " << PrintId(query_id) << " already exists"; return Status(ErrorMsg(TErrorCode::INTERNAL_ERROR, ss.str())); } - query_exec_state_map_.insert(make_pair(query_id, exec_state)); + client_request_state_map_.insert(make_pair(query_id, request_state)); } return Status::OK(); } Status ImpalaServer::SetQueryInflight(shared_ptr<SessionState> session_state, - const shared_ptr<QueryExecState>& exec_state) { - const TUniqueId& query_id = exec_state->query_id(); + const shared_ptr<ClientRequestState>& request_state) { + const TUniqueId& query_id = request_state->query_id(); lock_guard<mutex> l(session_state->lock); // The session wasn't expired at the time it was checked out and it isn't allowed to // expire while checked out, so it must not be expired. @@ -911,7 +907,7 @@ Status ImpalaServer::SetQueryInflight(shared_ptr<SessionState> session_state, session_state->inflight_queries.insert(query_id); ++session_state->total_queries; // Set query expiration. - int32_t timeout_s = exec_state->query_options().query_timeout_s; + int32_t timeout_s = request_state->query_options().query_timeout_s; if (FLAGS_idle_query_timeout > 0 && timeout_s > 0) { timeout_s = min(FLAGS_idle_query_timeout, timeout_s); } else { @@ -935,55 +931,52 @@ Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_infli RETURN_IF_ERROR(CancelInternal(query_id, check_inflight, cause)); - shared_ptr<QueryExecState> exec_state; + shared_ptr<ClientRequestState> request_state; { - lock_guard<mutex> l(query_exec_state_map_lock_); - QueryExecStateMap::iterator entry = query_exec_state_map_.find(query_id); - if (entry == query_exec_state_map_.end()) { + lock_guard<mutex> l(client_request_state_map_lock_); + ClientRequestStateMap::iterator entry = client_request_state_map_.find(query_id); + if (entry == client_request_state_map_.end()) { return Status("Invalid or unknown query handle"); } else { - exec_state = entry->second; + request_state = entry->second; } - query_exec_state_map_.erase(entry); + client_request_state_map_.erase(entry); } - exec_state->Done(); + request_state->Done(); double ut_end_time, ut_start_time; double duration_ms = 0.0; - if (LIKELY(exec_state->end_time().ToSubsecondUnixTime(&ut_end_time)) - && LIKELY(exec_state->start_time().ToSubsecondUnixTime(&ut_start_time))) { + if (LIKELY(request_state->end_time().ToSubsecondUnixTime(&ut_end_time)) + && LIKELY(request_state->start_time().ToSubsecondUnixTime(&ut_start_time))) { duration_ms = 1000 * (ut_end_time - ut_start_time); } // duration_ms can be negative when the local timezone changes during query execution. if (duration_ms >= 0) { - if (exec_state->stmt_type() == TStmtType::DDL) { + if (request_state->stmt_type() == TStmtType::DDL) { ImpaladMetrics::DDL_DURATIONS->Update(duration_ms); } else { ImpaladMetrics::QUERY_DURATIONS->Update(duration_ms); } } - LogQueryEvents(*exec_state.get()); + LogQueryEvents(*request_state.get()); { - lock_guard<mutex> l(exec_state->session()->lock); - exec_state->session()->inflight_queries.erase(query_id); + lock_guard<mutex> l(request_state->session()->lock); + request_state->session()->inflight_queries.erase(query_id); } - if (exec_state->coord() != nullptr) { - string exec_summary; - { - lock_guard<SpinLock> lock(exec_state->coord()->GetExecSummaryLock()); - const TExecSummary& summary = exec_state->coord()->exec_summary(); - exec_summary = PrintExecSummary(summary); - } - exec_state->summary_profile()->AddInfoString("ExecSummary", exec_summary); - exec_state->summary_profile()->AddInfoString("Errors", - exec_state->coord()->GetErrorLog()); + if (request_state->coord() != nullptr) { + TExecSummary t_exec_summary; + request_state->coord()->GetTExecSummary(&t_exec_summary); + string exec_summary = PrintExecSummary(t_exec_summary); + request_state->summary_profile()->AddInfoString("ExecSummary", exec_summary); + request_state->summary_profile()->AddInfoString("Errors", + request_state->coord()->GetErrorLog()); const unordered_set<TNetworkAddress>& unique_hosts = - exec_state->schedule()->unique_hosts(); + request_state->schedule()->unique_hosts(); if (!unique_hosts.empty()) { lock_guard<mutex> l(query_locations_lock_); for (const TNetworkAddress& hostport: unique_hosts) { @@ -993,12 +986,12 @@ Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_infli // thing. They will harmlessly race to remove the query from this map. QueryLocations::iterator it = query_locations_.find(hostport); if (it != query_locations_.end()) { - it->second.erase(exec_state->query_id()); + it->second.erase(request_state->query_id()); } } } } - ArchiveQuery(*exec_state); + ArchiveQuery(*request_state); return Status::OK(); } @@ -1020,9 +1013,9 @@ Status ImpalaServer::UpdateCatalogMetrics() { Status ImpalaServer::CancelInternal(const TUniqueId& query_id, bool check_inflight, const Status* cause) { VLOG_QUERY << "Cancel(): query_id=" << PrintId(query_id); - shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false); - if (exec_state == nullptr) return Status("Invalid or unknown query handle"); - exec_state->Cancel(check_inflight, cause); + shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false); + if (request_state == nullptr) return Status("Invalid or unknown query handle"); + RETURN_IF_ERROR(request_state->Cancel(check_inflight, cause)); return Status::OK(); } @@ -1061,7 +1054,8 @@ Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id, // Unregister all open queries from this session. Status status("Session closed"); for (const TUniqueId& query_id: inflight_queries) { - UnregisterQuery(query_id, false, &status); + // TODO: deal with an error status + (void) UnregisterQuery(query_id, false, &status); } // Reconfigure the poll period of session_timeout_thread_ if necessary. int32_t session_timeout = session_state->session_timeout; @@ -1102,25 +1096,23 @@ Status ImpalaServer::GetSessionState(const TUniqueId& session_id, void ImpalaServer::ReportExecStatus( TReportExecStatusResult& return_val, const TReportExecStatusParams& params) { - VLOG_FILE << "ReportExecStatus()" - << " instance_id=" << PrintId(params.fragment_instance_id) - << " done=" << (params.done ? "true" : "false"); + VLOG_FILE << "ReportExecStatus() coord_state_idx=" << params.coord_state_idx; // TODO: implement something more efficient here, we're currently // acquiring/releasing the map lock and doing a map lookup for // every report (assign each query a local int32_t id and use that to index into a - // vector of QueryExecStates, w/o lookup or locking?) - shared_ptr<QueryExecState> exec_state = GetQueryExecState(params.query_id, false); - if (exec_state.get() == nullptr) { + // vector of ClientRequestStates, w/o lookup or locking?) + shared_ptr<ClientRequestState> request_state = + GetClientRequestState(params.query_id, false); + if (request_state.get() == nullptr) { // This is expected occasionally (since a report RPC might be in flight while // cancellation is happening). Return an error to the caller to get it to stop. const string& err = Substitute("ReportExecStatus(): Received report for unknown " - "query ID (probably closed or cancelled). (instance: $0 done: $1)", - PrintId(params.fragment_instance_id), params.done); + "query ID (probably closed or cancelled): $0", PrintId(params.query_id)); Status(TErrorCode::INTERNAL_ERROR, err).SetTStatus(&return_val); - VLOG_QUERY << err; + //VLOG_QUERY << err; return; } - exec_state->coord()->UpdateFragmentExecStatus(params).SetTStatus(&return_val); + request_state->coord()->UpdateBackendExecStatus(params).SetTStatus(&return_val); } void ImpalaServer::TransmitData( @@ -1365,7 +1357,8 @@ void ImpalaServer::CatalogUpdateCallback( catalog_update_info_.catalog_service_id = resp.catalog_service_id; } ImpaladMetrics::CATALOG_READY->set_value(new_catalog_version > 0); - UpdateCatalogMetrics(); + // TODO: deal with an error status + (void) UpdateCatalogMetrics(); // Remove all dropped objects from the library cache. // TODO: is this expensive? We'd like to process heartbeats promptly. for (TCatalogObject& object: dropped_objects) { @@ -1629,39 +1622,39 @@ void ImpalaServer::AddLocalBackendToStatestore( } } -ImpalaServer::QueryStateRecord::QueryStateRecord(const QueryExecState& exec_state, +ImpalaServer::QueryStateRecord::QueryStateRecord(const ClientRequestState& request_state, bool copy_profile, const string& encoded_profile) { - id = exec_state.query_id(); - const TExecRequest& request = exec_state.exec_request(); + id = request_state.query_id(); + const TExecRequest& request = request_state.exec_request(); - const string* plan_str = exec_state.summary_profile().GetInfoString("Plan"); + const string* plan_str = request_state.summary_profile().GetInfoString("Plan"); if (plan_str != nullptr) plan = *plan_str; - stmt = exec_state.sql_stmt(); + stmt = request_state.sql_stmt(); stmt_type = request.stmt_type; - effective_user = exec_state.effective_user(); - default_db = exec_state.default_db(); - start_time = exec_state.start_time(); - end_time = exec_state.end_time(); + effective_user = request_state.effective_user(); + default_db = request_state.default_db(); + start_time = request_state.start_time(); + end_time = request_state.end_time(); has_coord = false; - Coordinator* coord = exec_state.coord(); + Coordinator* coord = request_state.coord(); if (coord != nullptr) { num_complete_fragments = coord->progress().num_complete(); total_fragments = coord->progress().total(); has_coord = true; } - query_state = exec_state.query_state(); - num_rows_fetched = exec_state.num_rows_fetched(); - query_status = exec_state.query_status(); + query_state = request_state.query_state(); + num_rows_fetched = request_state.num_rows_fetched(); + query_status = request_state.query_status(); - exec_state.query_events()->ToThrift(&event_sequence); + request_state.query_events()->ToThrift(&event_sequence); if (copy_profile) { stringstream ss; - exec_state.profile().PrettyPrint(&ss); + request_state.profile().PrettyPrint(&ss); profile_str = ss.str(); if (encoded_profile.empty()) { - encoded_profile_str = exec_state.profile().SerializeToArchiveString(); + encoded_profile_str = request_state.profile().SerializeToArchiveString(); } else { encoded_profile_str = encoded_profile; } @@ -1669,13 +1662,13 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const QueryExecState& exec_stat // Save the query fragments so that the plan can be visualised. for (const TPlanExecInfo& plan_exec_info: - exec_state.exec_request().query_exec_request.plan_exec_info) { + request_state.exec_request().query_exec_request.plan_exec_info) { fragments.insert(fragments.end(), plan_exec_info.fragments.begin(), plan_exec_info.fragments.end()); } - all_rows_returned = exec_state.eos(); - last_active_time_ms = exec_state.last_active_ms(); - request_pool = exec_state.request_pool(); + all_rows_returned = request_state.eos(); + last_active_time_ms = request_state.last_active_ms(); + request_pool = request_state.request_pool(); } bool ImpalaServer::QueryStateRecordLessThan::operator() ( @@ -1816,7 +1809,7 @@ void ImpalaServer::RegisterSessionTimeout(int32_t session_timeout) { // The following block accomplishes three things: // // 1. Update the ordered list of queries by checking the 'idle_time' parameter in - // query_exec_state. We are able to avoid doing this for *every* query in flight + // client_request_state. We are able to avoid doing this for *every* query in flight // thanks to the observation that expiry times never move backwards, only // forwards. Therefore once we find a query that a) hasn't changed its idle time and // b) has not yet expired we can stop moving through the list. If the idle time has @@ -1838,8 +1831,8 @@ void ImpalaServer::RegisterSessionTimeout(int32_t session_timeout) { // know that the true expiration time will be at least that far off. So we can // break here and sleep. if (expiration_event->first > now) break; - shared_ptr<QueryExecState> query_state = - GetQueryExecState(expiration_event->second, false); + shared_ptr<ClientRequestState> query_state = + GetClientRequestState(expiration_event->second, false); if (query_state.get() == nullptr) { // Query was deleted some other way. queries_by_timestamp_.erase(expiration_event++); @@ -1989,9 +1982,9 @@ Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int bool ImpalaServer::GetSessionIdForQuery(const TUniqueId& query_id, TUniqueId* session_id) { DCHECK(session_id != nullptr); - lock_guard<mutex> l(query_exec_state_map_lock_); - QueryExecStateMap::iterator i = query_exec_state_map_.find(query_id); - if (i == query_exec_state_map_.end()) { + lock_guard<mutex> l(client_request_state_map_lock_); + ClientRequestStateMap::iterator i = client_request_state_map_.find(query_id); + if (i == client_request_state_map_.end()) { return false; } else { *session_id = i->second->session_id(); @@ -1999,12 +1992,12 @@ bool ImpalaServer::GetSessionIdForQuery(const TUniqueId& query_id, } } -shared_ptr<ImpalaServer::QueryExecState> ImpalaServer::GetQueryExecState( +shared_ptr<ClientRequestState> ImpalaServer::GetClientRequestState( const TUniqueId& query_id, bool lock) { - lock_guard<mutex> l(query_exec_state_map_lock_); - QueryExecStateMap::iterator i = query_exec_state_map_.find(query_id); - if (i == query_exec_state_map_.end()) { - return shared_ptr<QueryExecState>(); + lock_guard<mutex> l(client_request_state_map_lock_); + ClientRequestStateMap::iterator i = client_request_state_map_.find(query_id); + if (i == client_request_state_map_.end()) { + return shared_ptr<ClientRequestState>(); } else { if (lock) i->second->lock()->lock(); return i->second; @@ -2013,12 +2006,15 @@ shared_ptr<ImpalaServer::QueryExecState> ImpalaServer::GetQueryExecState( void ImpalaServer::UpdateFilter(TUpdateFilterResult& result, const TUpdateFilterParams& params) { - shared_ptr<QueryExecState> query_exec_state = GetQueryExecState(params.query_id, false); - if (query_exec_state.get() == nullptr) { - LOG(INFO) << "Could not find query exec state: " << params.query_id; + DCHECK(params.__isset.query_id); + DCHECK(params.__isset.filter_id); + shared_ptr<ClientRequestState> client_request_state = + GetClientRequestState(params.query_id, false); + if (client_request_state.get() == nullptr) { + LOG(INFO) << "Could not find client request state: " << params.query_id; return; } - query_exec_state->coord()->UpdateFilter(params); + client_request_state->coord()->UpdateFilter(params); } }
