IMPALA-7545: Add queuing reason to query log After this change, the HS2 GetLog() function returns the queuing reason for a query when it is queued by the AdmissionController.
Testing: Added an end-to-end test to test_admission_controller.py to verify the query logs returned. Change-Id: I2e5d8de4f6691a9ba2594ca68c54ea4dca760545 Reviewed-on: http://gerrit.cloudera.org:8080/11669 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ec2dabaf Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ec2dabaf Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ec2dabaf Branch: refs/heads/master Commit: ec2dabafb989e2aca0fddb8f6c467e6f551d0424 Parents: 97731e5 Author: poojanilangekar <pooja.nilange...@cloudera.com> Authored: Fri Oct 12 10:48:49 2018 -0700 Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Committed: Tue Oct 16 01:47:05 2018 +0000 ---------------------------------------------------------------------- be/src/scheduling/admission-controller.cc | 28 +++++++------ be/src/scheduling/admission-controller.h | 13 +++++++ be/src/service/impala-hs2-server.cc | 20 +++++++++- .../custom_cluster/test_admission_controller.py | 41 ++++++++++++++++++++ 4 files changed, 89 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/ec2dabaf/be/src/scheduling/admission-controller.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc index e2b5415..000429f 100644 --- a/be/src/scheduling/admission-controller.cc +++ b/be/src/scheduling/admission-controller.cc @@ -102,17 +102,23 @@ const string QUERY_EVENT_QUEUED = "Queued"; const string QUERY_EVENT_COMPLETED_ADMISSION = "Completed admission"; // Profile info strings -const string PROFILE_INFO_KEY_ADMISSION_RESULT = "Admission result"; -const string PROFILE_INFO_VAL_ADMIT_IMMEDIATELY = "Admitted immediately"; -const string PROFILE_INFO_VAL_QUEUED = "Queued"; -const string PROFILE_INFO_VAL_CANCELLED_IN_QUEUE= "Cancelled (queued)"; -const string PROFILE_INFO_VAL_ADMIT_QUEUED = "Admitted (queued)"; -const string PROFILE_INFO_VAL_REJECTED = "Rejected"; -const string PROFILE_INFO_VAL_TIME_OUT = "Timed out (queued)"; -const string PROFILE_INFO_KEY_INITIAL_QUEUE_REASON = "Initial admission queue reason"; -const string PROFILE_INFO_VAL_INITIAL_QUEUE_REASON = "waited $0 ms, reason: $1"; -const string PROFILE_INFO_KEY_LAST_QUEUED_REASON = "Latest admission queue reason"; -const string PROFILE_INFO_KEY_ADMITTED_MEM = "Cluster Memory Admitted"; +const string AdmissionController::PROFILE_INFO_KEY_ADMISSION_RESULT = "Admission result"; +const string AdmissionController::PROFILE_INFO_VAL_ADMIT_IMMEDIATELY = + "Admitted immediately"; +const string AdmissionController::PROFILE_INFO_VAL_QUEUED = "Queued"; +const string AdmissionController::PROFILE_INFO_VAL_CANCELLED_IN_QUEUE = + "Cancelled (queued)"; +const string AdmissionController::PROFILE_INFO_VAL_ADMIT_QUEUED = "Admitted (queued)"; +const string AdmissionController::PROFILE_INFO_VAL_REJECTED = "Rejected"; +const string AdmissionController::PROFILE_INFO_VAL_TIME_OUT = "Timed out (queued)"; +const string AdmissionController::PROFILE_INFO_KEY_INITIAL_QUEUE_REASON = + "Initial admission queue reason"; +const string AdmissionController::PROFILE_INFO_VAL_INITIAL_QUEUE_REASON = + "waited $0 ms, reason: $1"; +const string AdmissionController::PROFILE_INFO_KEY_LAST_QUEUED_REASON = + "Latest admission queue reason"; +const string AdmissionController::PROFILE_INFO_KEY_ADMITTED_MEM = + "Cluster Memory Admitted"; // Error status string details const string REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION = http://git-wip-us.apache.org/repos/asf/impala/blob/ec2dabaf/be/src/scheduling/admission-controller.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h index b88356c..5b741b9 100644 --- a/be/src/scheduling/admission-controller.h +++ b/be/src/scheduling/admission-controller.h @@ -214,6 +214,19 @@ enum class AdmissionOutcome { class AdmissionController { public: + // Profile info strings + static const string PROFILE_INFO_KEY_ADMISSION_RESULT; + static const string PROFILE_INFO_VAL_ADMIT_IMMEDIATELY; + static const string PROFILE_INFO_VAL_QUEUED; + static const string PROFILE_INFO_VAL_CANCELLED_IN_QUEUE; + static const string PROFILE_INFO_VAL_ADMIT_QUEUED; + static const string PROFILE_INFO_VAL_REJECTED; + static const string PROFILE_INFO_VAL_TIME_OUT; + static const string PROFILE_INFO_KEY_INITIAL_QUEUE_REASON; + static const string PROFILE_INFO_VAL_INITIAL_QUEUE_REASON; + static const string PROFILE_INFO_KEY_LAST_QUEUED_REASON; + static const string PROFILE_INFO_KEY_ADMITTED_MEM; + AdmissionController(StatestoreSubscriber* subscriber, RequestPoolService* request_pool_service, MetricGroup* metrics, const TNetworkAddress& host_addr); http://git-wip-us.apache.org/repos/asf/impala/blob/ec2dabaf/be/src/service/impala-hs2-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc index 881b19a..f2e401e 100644 --- a/be/src/service/impala-hs2-server.cc +++ b/be/src/service/impala-hs2-server.cc @@ -35,10 +35,11 @@ #include "common/version.h" #include "rpc/thrift-util.h" #include "runtime/coordinator.h" -#include "runtime/raw-value.h" #include "runtime/exec-env.h" -#include "service/hs2-util.h" +#include "runtime/raw-value.h" +#include "scheduling/admission-controller.h" #include "service/client-request-state.h" +#include "service/hs2-util.h" #include "service/query-options.h" #include "service/query-result-set.h" #include "util/auth-util.h" @@ -826,6 +827,21 @@ void ImpalaServer::GetLog(TGetLogResp& return_val, const TGetLogReq& request) { } // Report analysis errors ss << join(request_state->GetAnalysisWarnings(), "\n"); + // Report queuing reason if the admission controller queued the query. + const string* admission_result = request_state->summary_profile()->GetInfoString( + AdmissionController::PROFILE_INFO_KEY_ADMISSION_RESULT); + if (admission_result != nullptr) { + if (*admission_result == AdmissionController::PROFILE_INFO_VAL_QUEUED) { + ss << AdmissionController::PROFILE_INFO_KEY_ADMISSION_RESULT << " : " + << *admission_result << "\n"; + const string* queued_reason = request_state->summary_profile()->GetInfoString( + AdmissionController::PROFILE_INFO_KEY_LAST_QUEUED_REASON); + if (queued_reason != nullptr) { + ss << AdmissionController::PROFILE_INFO_KEY_LAST_QUEUED_REASON << " : " + << *queued_reason << "\n"; + } + } + } if (coord != nullptr) { // Report execution errors ss << coord->GetErrorLog(); http://git-wip-us.apache.org/repos/asf/impala/blob/ec2dabaf/tests/custom_cluster/test_admission_controller.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py index 56d2b0b..fc74f00 100644 --- a/tests/custom_cluster/test_admission_controller.py +++ b/tests/custom_cluster/test_admission_controller.py @@ -817,6 +817,47 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): assert False, "Timed out waiting for change to profile\nSearch " \ "String: {0}\nProfile:\n{1}".format(search_string, str(profile)) + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=10, + pool_max_mem=1024 * 1024 * 1024)) + @needs_session() + def test_hs2_admission_controller_logs(self): + """Test to verify that the GetLog() function invoked by the HS2 client returns the + reason for queuing of the query.""" + # Start a long running query. + long_query_req = TCLIService.TExecuteStatementReq() + long_query_req.sessionHandle = self.session_handle + long_query_req.statement = "select sleep(1000000)" + long_query_resp = self.hs2_client.ExecuteStatement(long_query_req) + HS2TestSuite.check_response(long_query_resp) + # Ensure that the query is running. + self.wait_for_admission_control(long_query_resp.operationHandle) + # Submit another query. + execute_statement_req = TCLIService.TExecuteStatementReq() + execute_statement_req.sessionHandle = self.session_handle + execute_statement_req.statement = "select 1" + execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req) + HS2TestSuite.check_response(execute_statement_resp) + # Wait until the query is queued. + self.wait_for_operation_state(execute_statement_resp.operationHandle, + TCLIService.TOperationState.PENDING_STATE) + # Ensure that the log message contains the queuing reason. + get_log_req = TCLIService.TGetLogReq() + get_log_req.operationHandle = execute_statement_resp.operationHandle + log = self.hs2_client.GetLog(get_log_req).log + assert "Admission result : Queued" in log, log + assert "Latest admission queue reason : number of running queries 1 is at or over " + "limit 1" in log, log + # Close the running query. + close_operation_req = TCLIService.TCloseOperationReq() + close_operation_req.operationHandle = long_query_resp.operationHandle + HS2TestSuite.check_response(self.hs2_client.CloseOperation(close_operation_req)) + # Close the queued query. + close_operation_req = TCLIService.TCloseOperationReq() + close_operation_req.operationHandle = execute_statement_resp.operationHandle + HS2TestSuite.check_response(self.hs2_client.CloseOperation(close_operation_req)) + class TestAdmissionControllerStress(TestAdmissionControllerBase): """Submits a number of queries (parameterized) with some delay between submissions