This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 8101b0784ccd1b87b85c1a055e92866614eb36a8
Author: jasonmfehr <[email protected]>
AuthorDate: Fri Mar 21 15:20:43 2025 -0700

    IMPALA-13885: Add Query Id to Workload Management Insert Logs
    
    When the workload management code inserts rows into the completed
    queries table, it logs an entry with information on that insert but
    does not include the query id in the log line. This lack of query id
    causes extra steps to trace cases where the insert DML failed.
    
    This change adds the query id to both the success and failure log
    messages logged by the workload management main processing loop. It
    also adds the error message to the failure log messages.
    
    Additional minor cleanup was done to provide error messages on python
    custom cluster test asserts, add additional asserts for the updated
    log messages, and restore a trailing newline in the workload
    management startup flags definition file.
    
    All test_query_log.py tests passed locally. These tests were the only
    tests that asserted the log messages that were modified.
    
    Change-Id: I3c0816f9eb6bac8c891fd0e249de8863115bf466
    Reviewed-on: http://gerrit.cloudera.org:8080/22656
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/service/workload-management-worker.cc      | 32 +++++++--------
 be/src/workload_mgmt/workload-management-flags.cc |  2 +-
 tests/custom_cluster/test_query_log.py            | 48 ++++++++++++++++++++---
 3 files changed, 57 insertions(+), 25 deletions(-)

diff --git a/be/src/service/workload-management-worker.cc 
b/be/src/service/workload-management-worker.cc
index 6ceb85532..80d161199 100644
--- a/be/src/service/workload-management-worker.cc
+++ b/be/src/service/workload-management-worker.cc
@@ -23,6 +23,7 @@
 #include <list>
 #include <memory>
 #include <mutex>
+#include <sstream>
 #include <string>
 #include <utility>
 
@@ -791,29 +792,24 @@ void ImpalaServer::WorkloadManagementWorker(const 
Version& target_schema_version
 
     uint64_t exec_time = timer.ElapsedTime();
     ImpaladMetrics::COMPLETED_QUERIES_WRITE_DURATIONS->Update(gather_time + 
exec_time);
+
+    stringstream log_msg_props;
+    log_msg_props
+        << " table=\"" << log_table_name << "\""
+        << " record_count=" << queries_to_insert.size()
+        << " bytes=" << PrettyPrinter::PrintBytes(sql.size())
+        << " gather_time=" << PrettyPrinter::Print(gather_time, TUnit::TIME_NS)
+        << " exec_time=" << PrettyPrinter::Print(exec_time, TUnit::TIME_NS)
+        << " query_id=\"" << PrintId(tmp_query_id) << "\"";
+
     if (ret_status.ok()) {
-      LOG(INFO) << "wrote completed queries table=\"" << log_table_name
-                << "\" record_count="
-                << queries_to_insert.size()
-                << " bytes="
-                << PrettyPrinter::PrintBytes(sql.size())
-                << " gather_time="
-                << PrettyPrinter::Print(gather_time, TUnit::TIME_NS)
-                << " exec_time="
-                << PrettyPrinter::Print(exec_time, TUnit::TIME_NS);
+      LOG(INFO) << "wrote completed queries" << log_msg_props.rdbuf();
       
ImpaladMetrics::COMPLETED_QUERIES_QUEUED->Increment(queries_to_insert.size() * 
-1);
       DCHECK(ImpaladMetrics::COMPLETED_QUERIES_QUEUED->GetValue() >= 0);
       
ImpaladMetrics::COMPLETED_QUERIES_WRITTEN->Increment(queries_to_insert.size());
     } else {
-      LOG(WARNING) << "failed to write completed queries table=\"" << 
log_table_name
-                   << "\" record_count="
-                   << queries_to_insert.size()
-                   << " bytes="
-                   << PrettyPrinter::PrintBytes(sql.size())
-                   << " gather_time="
-                   << PrettyPrinter::Print(gather_time, TUnit::TIME_NS)
-                   << " exec_time="
-                   << PrettyPrinter::Print(exec_time, TUnit::TIME_NS);
+      log_msg_props << " msg=\"" << ret_status.msg().msg() << "\"";
+      LOG(WARNING) << "failed to write completed queries" << 
log_msg_props.rdbuf();
       LOG(WARNING) << ret_status.GetDetail();
       
ImpaladMetrics::COMPLETED_QUERIES_FAIL->Increment(queries_to_insert.size());
       {
diff --git a/be/src/workload_mgmt/workload-management-flags.cc 
b/be/src/workload_mgmt/workload-management-flags.cc
index 9d83d82d1..bb65653c7 100644
--- a/be/src/workload_mgmt/workload-management-flags.cc
+++ b/be/src/workload_mgmt/workload-management-flags.cc
@@ -197,4 +197,4 @@ DEFINE_validator(query_log_expression_limit, [](const char* 
name, const int32_t
   }
 
   return true;
-});
\ No newline at end of file
+});
diff --git a/tests/custom_cluster/test_query_log.py 
b/tests/custom_cluster/test_query_log.py
index afe1a6501..6c8069505 100644
--- a/tests/custom_cluster/test_query_log.py
+++ b/tests/custom_cluster/test_query_log.py
@@ -574,13 +574,31 @@ class TestQueryLogTableHS2(WorkloadManagementTestSuite):
           "in_flight_queries", _is_insert_query)
       return self.insert_query_id
 
-    assert retry(func=wait_for_insert_query, max_attempts=10, sleep_time_s=1, 
backoff=1)
+    assert \
+        retry(func=wait_for_insert_query, max_attempts=10, sleep_time_s=1, 
backoff=1), \
+        "did not find completed queries insert dml in the debug web ui"
     self.assert_impalad_log_contains("INFO", "Expiring query {} due to 
execution time "
                                      "limit of 
1s.".format(self.insert_query_id))
-    self.assert_impalad_log_contains("INFO", "failed to write completed 
queries table=\""
-                                     "{}\"".format(QUERY_TBL_LOG))
-    self.assert_impalad_log_contains("INFO", "Query {} expired due to 
execution time "
-                                     "limit of 
1s000ms".format(self.insert_query_id))
+
+    # When a workload management insert DML fails, a warning message is logged 
that
+    # contains information about the DML. These two assertions ensure the 
message is
+    # logged and important fields are correct.
+    res = self.assert_impalad_log_contains(
+        level="WARNING",
+        line_regex=r'failed to write completed queries table="{}" 
record_count=(\d+) '
+                   r'bytes=\S+\s\S+ gather_time=\S+ exec_time=\S+ 
query_id="{}" '
+                   r'msg="(.*?)"'.format(QUERY_TBL_LOG, self.insert_query_id),
+        expected_count=-1)
+    assert res.group(1) == "1", "Invalid record count in the query failed log 
line"
+    assert res.group(2) == "Query {0} expired due to execution time limit of 
1s000ms" \
+                           .format(self.insert_query_id), \
+                           "Found expected query failed log line but the msg 
parameter " \
+                           "was incorrect"
+
+    self.assert_impalad_log_contains(level="INFO",
+                                     line_regex="Query {} expired due to 
execution time "
+                                     "limit of 
1s000ms".format(self.insert_query_id),
+                                     expected_count=2)
 
 
 class TestQueryLogTableAll(WorkloadManagementTestSuite):
@@ -952,7 +970,9 @@ class 
TestQueryLogQueuedQueries(WorkloadManagementTestSuite):
           "in_flight_queries", _is_insert_query)
       return self.insert_query_id
 
-    assert retry(func=wait_for_insert_query, max_attempts=10, sleep_time_s=1, 
backoff=1)
+    assert \
+        retry(func=wait_for_insert_query, max_attempts=10, sleep_time_s=1, 
backoff=1), \
+        "did not find completed queries insert dml in the debug web ui"
 
     # Wait 2 seconds to ensure the insert into DML is not killed by the fetch 
rows
     # timeout (set to 1 second in this test's annotations).
@@ -1089,6 +1109,22 @@ class TestQueryLogTableFlush(CustomClusterTestSuite):
     self.cluster.get_first_impalad().service.wait_for_metric_value(
       "impala-server.completed-queries.written", query_count, 20)
 
+    # Helper function that waits for the workload management insert DML to 
start.
+    def wait_for_insert_query():
+      self.insert_query_id = 
_find_query_in_ui(self.cluster.get_first_impalad().service,
+          "completed_queries", _is_insert_query)
+      return self.insert_query_id
+
+    # Wait for the workload management insert dml to be in the debug ui's 
completed
+    # queries section so that it's query id can be retrieved.
+    assert \
+        retry(func=wait_for_insert_query, max_attempts=10, sleep_time_s=1, 
backoff=1), \
+        "did not find completed queries insert dml in the debug web ui"
+    self.assert_impalad_log_contains("INFO", r"wrote completed queries "
+                                     r"table=\"{}\" record_count=\d+ 
bytes=\S+\s\S+ "
+                                     r"gather_time=\S+ exec_time=\S+ 
query_id=\"{}\""
+                                     .format(QUERY_TBL_LOG, 
self.insert_query_id))
+
   
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=9999
 "
                                                  "--shutdown_grace_period_s=0 "
                                                  "--shutdown_deadline_s=15 "

Reply via email to