This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 8101b0784ccd1b87b85c1a055e92866614eb36a8 Author: jasonmfehr <[email protected]> AuthorDate: Fri Mar 21 15:20:43 2025 -0700 IMPALA-13885: Add Query Id to Workload Management Insert Logs When the workload management code inserts rows into the completed queries table, it logs an entry with information on that insert but does not include the query id in the log line. This lack of query id causes extra steps to trace cases where the insert DML failed. This change adds the query id to both the success and failure log messages logged by the workload management main processing loop. It also adds the error message to the failure log messages. Additional minor cleanup was done to provide error messages on python custom cluster test asserts, add additional asserts for the updated log messages, and restore a trailing newline in the workload management startup flags definition file. All test_query_log.py tests passed locally. These tests were the only tests that asserted the log messages that were modified. Change-Id: I3c0816f9eb6bac8c891fd0e249de8863115bf466 Reviewed-on: http://gerrit.cloudera.org:8080/22656 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/service/workload-management-worker.cc | 32 +++++++-------- be/src/workload_mgmt/workload-management-flags.cc | 2 +- tests/custom_cluster/test_query_log.py | 48 ++++++++++++++++++++--- 3 files changed, 57 insertions(+), 25 deletions(-) diff --git a/be/src/service/workload-management-worker.cc b/be/src/service/workload-management-worker.cc index 6ceb85532..80d161199 100644 --- a/be/src/service/workload-management-worker.cc +++ b/be/src/service/workload-management-worker.cc @@ -23,6 +23,7 @@ #include <list> #include <memory> #include <mutex> +#include <sstream> #include <string> #include <utility> @@ -791,29 +792,24 @@ void ImpalaServer::WorkloadManagementWorker(const Version& target_schema_version uint64_t exec_time = timer.ElapsedTime(); ImpaladMetrics::COMPLETED_QUERIES_WRITE_DURATIONS->Update(gather_time + exec_time); + + stringstream log_msg_props; + log_msg_props + << " table=\"" << log_table_name << "\"" + << " record_count=" << queries_to_insert.size() + << " bytes=" << PrettyPrinter::PrintBytes(sql.size()) + << " gather_time=" << PrettyPrinter::Print(gather_time, TUnit::TIME_NS) + << " exec_time=" << PrettyPrinter::Print(exec_time, TUnit::TIME_NS) + << " query_id=\"" << PrintId(tmp_query_id) << "\""; + if (ret_status.ok()) { - LOG(INFO) << "wrote completed queries table=\"" << log_table_name - << "\" record_count=" - << queries_to_insert.size() - << " bytes=" - << PrettyPrinter::PrintBytes(sql.size()) - << " gather_time=" - << PrettyPrinter::Print(gather_time, TUnit::TIME_NS) - << " exec_time=" - << PrettyPrinter::Print(exec_time, TUnit::TIME_NS); + LOG(INFO) << "wrote completed queries" << log_msg_props.rdbuf(); ImpaladMetrics::COMPLETED_QUERIES_QUEUED->Increment(queries_to_insert.size() * -1); DCHECK(ImpaladMetrics::COMPLETED_QUERIES_QUEUED->GetValue() >= 0); ImpaladMetrics::COMPLETED_QUERIES_WRITTEN->Increment(queries_to_insert.size()); } else { - LOG(WARNING) << "failed to write completed queries table=\"" << log_table_name - << "\" record_count=" - << queries_to_insert.size() - << " bytes=" - << PrettyPrinter::PrintBytes(sql.size()) - << " gather_time=" - << PrettyPrinter::Print(gather_time, TUnit::TIME_NS) - << " exec_time=" - << PrettyPrinter::Print(exec_time, TUnit::TIME_NS); + log_msg_props << " msg=\"" << ret_status.msg().msg() << "\""; + LOG(WARNING) << "failed to write completed queries" << log_msg_props.rdbuf(); LOG(WARNING) << ret_status.GetDetail(); ImpaladMetrics::COMPLETED_QUERIES_FAIL->Increment(queries_to_insert.size()); { diff --git a/be/src/workload_mgmt/workload-management-flags.cc b/be/src/workload_mgmt/workload-management-flags.cc index 9d83d82d1..bb65653c7 100644 --- a/be/src/workload_mgmt/workload-management-flags.cc +++ b/be/src/workload_mgmt/workload-management-flags.cc @@ -197,4 +197,4 @@ DEFINE_validator(query_log_expression_limit, [](const char* name, const int32_t } return true; -}); \ No newline at end of file +}); diff --git a/tests/custom_cluster/test_query_log.py b/tests/custom_cluster/test_query_log.py index afe1a6501..6c8069505 100644 --- a/tests/custom_cluster/test_query_log.py +++ b/tests/custom_cluster/test_query_log.py @@ -574,13 +574,31 @@ class TestQueryLogTableHS2(WorkloadManagementTestSuite): "in_flight_queries", _is_insert_query) return self.insert_query_id - assert retry(func=wait_for_insert_query, max_attempts=10, sleep_time_s=1, backoff=1) + assert \ + retry(func=wait_for_insert_query, max_attempts=10, sleep_time_s=1, backoff=1), \ + "did not find completed queries insert dml in the debug web ui" self.assert_impalad_log_contains("INFO", "Expiring query {} due to execution time " "limit of 1s.".format(self.insert_query_id)) - self.assert_impalad_log_contains("INFO", "failed to write completed queries table=\"" - "{}\"".format(QUERY_TBL_LOG)) - self.assert_impalad_log_contains("INFO", "Query {} expired due to execution time " - "limit of 1s000ms".format(self.insert_query_id)) + + # When a workload management insert DML fails, a warning message is logged that + # contains information about the DML. These two assertions ensure the message is + # logged and important fields are correct. + res = self.assert_impalad_log_contains( + level="WARNING", + line_regex=r'failed to write completed queries table="{}" record_count=(\d+) ' + r'bytes=\S+\s\S+ gather_time=\S+ exec_time=\S+ query_id="{}" ' + r'msg="(.*?)"'.format(QUERY_TBL_LOG, self.insert_query_id), + expected_count=-1) + assert res.group(1) == "1", "Invalid record count in the query failed log line" + assert res.group(2) == "Query {0} expired due to execution time limit of 1s000ms" \ + .format(self.insert_query_id), \ + "Found expected query failed log line but the msg parameter " \ + "was incorrect" + + self.assert_impalad_log_contains(level="INFO", + line_regex="Query {} expired due to execution time " + "limit of 1s000ms".format(self.insert_query_id), + expected_count=2) class TestQueryLogTableAll(WorkloadManagementTestSuite): @@ -952,7 +970,9 @@ class TestQueryLogQueuedQueries(WorkloadManagementTestSuite): "in_flight_queries", _is_insert_query) return self.insert_query_id - assert retry(func=wait_for_insert_query, max_attempts=10, sleep_time_s=1, backoff=1) + assert \ + retry(func=wait_for_insert_query, max_attempts=10, sleep_time_s=1, backoff=1), \ + "did not find completed queries insert dml in the debug web ui" # Wait 2 seconds to ensure the insert into DML is not killed by the fetch rows # timeout (set to 1 second in this test's annotations). @@ -1089,6 +1109,22 @@ class TestQueryLogTableFlush(CustomClusterTestSuite): self.cluster.get_first_impalad().service.wait_for_metric_value( "impala-server.completed-queries.written", query_count, 20) + # Helper function that waits for the workload management insert DML to start. + def wait_for_insert_query(): + self.insert_query_id = _find_query_in_ui(self.cluster.get_first_impalad().service, + "completed_queries", _is_insert_query) + return self.insert_query_id + + # Wait for the workload management insert dml to be in the debug ui's completed + # queries section so that it's query id can be retrieved. + assert \ + retry(func=wait_for_insert_query, max_attempts=10, sleep_time_s=1, backoff=1), \ + "did not find completed queries insert dml in the debug web ui" + self.assert_impalad_log_contains("INFO", r"wrote completed queries " + r"table=\"{}\" record_count=\d+ bytes=\S+\s\S+ " + r"gather_time=\S+ exec_time=\S+ query_id=\"{}\"" + .format(QUERY_TBL_LOG, self.insert_query_id)) + @CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=9999 " "--shutdown_grace_period_s=0 " "--shutdown_deadline_s=15 "
