This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit c529b855e95e5b4fe4fed504afa4e34b3e2481dc
Author: Michael Smith <[email protected]>
AuthorDate: Wed Jan 10 16:57:17 2024 -0800

    IMPALA-12626: Add Tables Queried to profile/history
    
    Adds "Tables Queried" to the query profile, enumerating a
    comma-separated list of tables accessed during a query:
    
      Tables Queried: tpch.customer,tpch.lineitem
    
    Also adds "tables_queried" to impala_query_log and impala_query_live
    with the same content.
    
    Requires 'drop table sys.impala_query_log' to recreate it with the new
    column.
    
    Change-Id: I9c9c80b2adf7f3e44225a191fe8eb9df3c4bc5aa
    Reviewed-on: http://gerrit.cloudera.org:8080/20886
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/system-table-scanner.cc                |  5 +++
 be/src/service/client-request-state.cc             | 36 +++++++---------------
 be/src/service/client-request-state.h              |  1 +
 be/src/service/query-state-record.cc               |  1 +
 be/src/service/query-state-record.h                |  3 ++
 be/src/service/workload-management-fields.cc       |  6 ++++
 be/src/util/debug-util.cc                          |  9 ++++++
 be/src/util/debug-util.h                           |  3 ++
 be/src/util/error-util.cc                          |  8 ++---
 common/thrift/Frontend.thrift                      |  2 ++
 common/thrift/SystemTables.thrift                  |  1 +
 .../java/org/apache/impala/service/Frontend.java   |  3 ++
 tests/custom_cluster/test_query_live.py            | 12 ++++++++
 tests/query_test/test_observability.py             | 14 +++++++++
 tests/util/workload_management.py                  | 15 ++++++++-
 15 files changed, 87 insertions(+), 32 deletions(-)

diff --git a/be/src/exec/system-table-scanner.cc 
b/be/src/exec/system-table-scanner.cc
index 3d0dbad6b..fa91fb165 100644
--- a/be/src/exec/system-table-scanner.cc
+++ b/be/src/exec/system-table-scanner.cc
@@ -335,6 +335,11 @@ Status QueryScanner::MaterializeNextTuple(
         RETURN_IF_ERROR(WriteStringSlot(
             trim_left_copy_if(record.plan, is_any_of("\n")), pool, slot));
         break;
+      case TQueryTableColumn::TABLES_QUERIED:
+        if (!query.tables.empty()) {
+          RETURN_IF_ERROR(WriteStringSlot(PrintTableList(query.tables), pool, 
slot));
+        }
+        break;
       default:
         DCHECK(false) << "Unknown column position " << slot_desc->col_pos();
     }
diff --git a/be/src/service/client-request-state.cc 
b/be/src/service/client-request-state.cc
index 538fde8c1..3704716a3 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -229,12 +229,16 @@ void ClientRequestState::SetBlacklistedExecutorAddresses(
 Status ClientRequestState::Exec() {
   MarkActive();
 
+  const TExecRequest& exec_req = exec_request();
   profile_->AddChild(server_profile_);
   summary_profile_->AddInfoString("Query Type", PrintValue(stmt_type()));
   summary_profile_->AddInfoString("Query Options (set by configuration)",
       DebugQueryOptions(query_ctx_.client_request.query_options));
   summary_profile_->AddInfoString("Query Options (set by configuration and 
planner)",
-      DebugQueryOptions(exec_request().query_options));
+      DebugQueryOptions(exec_req.query_options));
+  if (!exec_req.tables.empty()) {
+    summary_profile_->AddInfoString("Tables Queried", 
PrintTableList(exec_req.tables));
+  }
   if (query_ctx_.__isset.overridden_mt_dop_value) {
     DCHECK(query_ctx_.client_request.query_options.__isset.mt_dop);
     summary_profile_->AddInfoString("MT_DOP limited by admission control",
@@ -243,7 +247,6 @@ Status ClientRequestState::Exec() {
             query_ctx_.client_request.query_options.mt_dop));
   }
 
-  const TExecRequest& exec_req = exec_request();
   switch (exec_req.stmt_type) {
     case TStmtType::QUERY:
     case TStmtType::DML:
@@ -533,38 +536,21 @@ Status ClientRequestState::ExecQueryOrDmlRequest(
   if (!query_exec_request.query_ctx.__isset.parent_query_id &&
       query_exec_request.query_ctx.__isset.tables_missing_stats &&
       !query_exec_request.query_ctx.tables_missing_stats.empty()) {
-    stringstream ss;
-    const vector<TTableName>& tbls = 
query_exec_request.query_ctx.tables_missing_stats;
-    for (int i = 0; i < tbls.size(); ++i) {
-      if (i != 0) ss << ",";
-      ss << tbls[i].db_name << "." << tbls[i].table_name;
-    }
-    summary_profile_->AddInfoString(TABLES_MISSING_STATS_KEY, ss.str());
+    summary_profile_->AddInfoString(TABLES_MISSING_STATS_KEY,
+        PrintTableList(query_exec_request.query_ctx.tables_missing_stats));
   }
 
   if (!query_exec_request.query_ctx.__isset.parent_query_id &&
       query_exec_request.query_ctx.__isset.tables_with_corrupt_stats &&
       !query_exec_request.query_ctx.tables_with_corrupt_stats.empty()) {
-    stringstream ss;
-    const vector<TTableName>& tbls =
-        query_exec_request.query_ctx.tables_with_corrupt_stats;
-    for (int i = 0; i < tbls.size(); ++i) {
-      if (i != 0) ss << ",";
-      ss << tbls[i].db_name << "." << tbls[i].table_name;
-    }
-    summary_profile_->AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, ss.str());
+    summary_profile_->AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY,
+        
PrintTableList(query_exec_request.query_ctx.tables_with_corrupt_stats));
   }
 
   if (query_exec_request.query_ctx.__isset.tables_missing_diskids &&
       !query_exec_request.query_ctx.tables_missing_diskids.empty()) {
-    stringstream ss;
-    const vector<TTableName>& tbls =
-        query_exec_request.query_ctx.tables_missing_diskids;
-    for (int i = 0; i < tbls.size(); ++i) {
-      if (i != 0) ss << ",";
-      ss << tbls[i].db_name << "." << tbls[i].table_name;
-    }
-    summary_profile_->AddInfoString(TABLES_WITH_MISSING_DISK_IDS_KEY, 
ss.str());
+    summary_profile_->AddInfoString(TABLES_WITH_MISSING_DISK_IDS_KEY,
+        PrintTableList(query_exec_request.query_ctx.tables_missing_diskids));
   }
 
   {
diff --git a/be/src/service/client-request-state.h 
b/be/src/service/client-request-state.h
index d0d1b3d78..c96fcc73e 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -334,6 +334,7 @@ class ClientRequestState {
   }
   /// Returns 0:0 if this is a root query
   TUniqueId parent_query_id() const { return query_ctx_.parent_query_id; }
+  const vector<TTableName>& tables() const { return exec_request().tables; }
 
   const std::vector<std::string>& GetAnalysisWarnings() const {
     return exec_request().analysis_warnings;
diff --git a/be/src/service/query-state-record.cc 
b/be/src/service/query-state-record.cc
index 0b594df83..8c7458a03 100644
--- a/be/src/service/query-state-record.cc
+++ b/be/src/service/query-state-record.cc
@@ -203,6 +203,7 @@ QueryStateExpanded::QueryStateExpanded(const 
ClientRequestState& exec_state,
       .query_exec_request.dedicated_coord_mem_estimate;
   row_materialization_rate = exec_state.row_materialization_rate();
   row_materialization_time = exec_state.row_materialization_timer();
+  tables = exec_state.tables();
 
   // Update name_rows_fetched with the final count after query close.
   base_state->num_rows_fetched = exec_state.num_rows_fetched_counter();
diff --git a/be/src/service/query-state-record.h 
b/be/src/service/query-state-record.h
index c3fec5518..6ada7a0e9 100644
--- a/be/src/service/query-state-record.h
+++ b/be/src/service/query-state-record.h
@@ -340,6 +340,9 @@ struct QueryStateExpanded {
   /// Events Timeline Iterator
   EventsTimelineIterator EventsTimeline() const;
 
+  // Source tables accessed by this query.
+  std::vector<TTableName> tables;
+
   /// Required data will be copied from the provided ClientRequestState into 
members of
   /// the struct.
   QueryStateExpanded(const ClientRequestState& exec_state,
diff --git a/be/src/service/workload-management-fields.cc 
b/be/src/service/workload-management-fields.cc
index 19ef98fb7..a95bb671c 100644
--- a/be/src/service/workload-management-fields.cc
+++ b/be/src/service/workload-management-fields.cc
@@ -411,6 +411,12 @@ const std::list<FieldDefinition> FIELD_DEFINITIONS = {
               << "'";
         }),
 
+    // Tables Queried
+    FieldDefinition("tables_queried", TPrimitiveType::STRING,
+        [](FieldParserContext& ctx){
+          ctx.sql << "'" << PrintTableList(ctx.record->tables) << "'";
+        }),
+
     }; // FIELDS_PARSERS constant list
 
 } //namespace workload_management
diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc
index 927319de8..005bf1c7e 100644
--- a/be/src/util/debug-util.cc
+++ b/be/src/util/debug-util.cc
@@ -277,6 +277,15 @@ string PrintNumericPath(const SchemaPath& path) {
   return ss.str();
 }
 
+string PrintTableList(const vector<TTableName>& tbls) {
+  stringstream ss;
+  for (int i = 0; i < tbls.size(); ++i) {
+    if (i != 0) ss << ",";
+    ss << tbls[i].db_name << "." << tbls[i].table_name;
+  }
+  return ss.str();
+}
+
 string GetBuildVersion(bool compact) {
   stringstream ss;
   ss << GetDaemonBuildVersion()
diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h
index 4179eefcd..4d7255e45 100644
--- a/be/src/util/debug-util.h
+++ b/be/src/util/debug-util.h
@@ -91,6 +91,9 @@ template<typename ThriftStruct> std::string PrintThrift(const 
ThriftStruct& t) {
   return apache::thrift::ThriftDebugString(t);
 }
 
+/// Return a list of TTableName as a comma-separated string.
+std::string PrintTableList(const std::vector<TTableName>& tbls);
+
 /// Parse 's' into a TUniqueId object.  The format of s needs to be the output 
format
 /// from PrintId.  (<hi_part>:<low_part>)
 /// Returns true if parse succeeded.
diff --git a/be/src/util/error-util.cc b/be/src/util/error-util.cc
index 430f2c833..0d9443b20 100644
--- a/be/src/util/error-util.cc
+++ b/be/src/util/error-util.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include "util/error-util-internal.h"
+#include "util/debug-util.h"
 #include "util/string-util.h"
 
 #include <errno.h>
@@ -47,12 +48,7 @@ string GetTablesMissingStatsWarning(const 
vector<TTableName>& tables_missing_sta
   stringstream ss;
   if (tables_missing_stats.empty()) return string("");
   ss << "WARNING: The following tables are missing relevant table and/or 
column "
-     << "statistics.\n";
-  for (int i = 0; i < tables_missing_stats.size(); ++i) {
-    const TTableName& table_name = tables_missing_stats[i];
-    if (i != 0) ss << ",";
-    ss << table_name.db_name << "." << table_name.table_name;
-  }
+     << "statistics.\n" << PrintTableList(tables_missing_stats);
   return ss.str();
 }
 
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index d564fd631..93a0d92bf 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -678,6 +678,8 @@ struct TExecRequest {
 
   // Request for "ALTER TABLE ... CONVERT TO" statements.
   19: optional TConvertTableRequest convert_table_request
+
+  20: optional list<CatalogObjects.TTableName> tables
 }
 
 // Parameters to FeSupport.cacheJar().
diff --git a/common/thrift/SystemTables.thrift 
b/common/thrift/SystemTables.thrift
index 224fcca41..3ca81db47 100644
--- a/common/thrift/SystemTables.thrift
+++ b/common/thrift/SystemTables.thrift
@@ -68,4 +68,5 @@ enum TQueryTableColumn {
     PERNODE_PEAK_MEM_MEAN
     SQL
     PLAN
+    TABLES_QUERIED
 }
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java 
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index f68ecb2eb..2c7649430 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -2412,6 +2412,9 @@ public class Frontend {
     }
     Preconditions.checkNotNull(analysisResult.getStmt());
     TExecRequest result = createBaseExecRequest(queryCtx, analysisResult);
+    for (TableName table : stmtTableCache.tables.keySet()) {
+      result.addToTables(table.toThrift());
+    }
 
     // Transfer the expected number of executors in executor group set to
     // analyzer's global state. The info is needed to compute the number of 
nodes to be
diff --git a/tests/custom_cluster/test_query_live.py 
b/tests/custom_cluster/test_query_live.py
index ee8c17e0c..ca44122e0 100644
--- a/tests/custom_cluster/test_query_live.py
+++ b/tests/custom_cluster/test_query_live.py
@@ -98,6 +98,18 @@ class TestQueryLive(CustomClusterTestSuite):
         'select * from sys.impala_query_live where cluster_id = 
"test_query_live_0"')
     assert len(result5.data) == 0
 
+    result = self.execute_query("""
+        select count(*) from functional.alltypestiny a
+          inner join functional.alltypes b on a.id = b.id
+          inner join functional.alltypessmall c on b.id = c.id
+    """)
+    result5 = self.execute_query(
+        'select tables_queried from sys.impala_query_live where query_id = "'
+        + result.query_id + '"')
+    assert len(result5.data) == 1
+    assert result5.data[0] == \
+        "functional.alltypes,functional.alltypestiny,functional.alltypessmall"
+
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--cluster_id=test_query_live",
                                     catalogd_args="--enable_workload_mgmt",
diff --git a/tests/query_test/test_observability.py 
b/tests/query_test/test_observability.py
index b11453585..60761e57c 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -188,6 +188,20 @@ class TestObservability(ImpalaTestSuite):
     expected_str = expected_str.format(timezone=server_timezone)
     assert expected_str in profile, profile
 
+  def test_profile(self):
+    """Test that expected fields are populated in the profile."""
+    query = """select count(distinct a.int_col) from functional.alltypes a
+        inner join functional.alltypessmall b on (a.id = b.id + cast(sleep(15) 
as INT))"""
+    result = self.execute_query(query)
+
+    assert "Query Type: QUERY" in result.runtime_profile
+    assert "Query State: " in result.runtime_profile
+    assert "Default Db: default" in result.runtime_profile
+    tables = re.search(r'\n\s+Tables Queried:\s+(.*?)\n', 
result.runtime_profile)
+    assert tables is not None
+    assert sorted(tables.group(1).split(",")) \
+        == ["functional.alltypes", "functional.alltypessmall"]
+
   def test_exec_summary(self):
     """Test that the exec summary is populated correctly in every query 
state"""
     query = "select count(*) from functional.alltypes"
diff --git a/tests/util/workload_management.py 
b/tests/util/workload_management.py
index 900e523b3..2ee3eba3f 100644
--- a/tests/util/workload_management.py
+++ b/tests/util/workload_management.py
@@ -25,7 +25,7 @@ from tests.util.assert_time import assert_time_str, 
convert_to_nanos
 from tests.util.memory import assert_byte_str, convert_to_bytes
 
 DEDICATED_COORD_SAFETY_BUFFER_BYTES = 104857600
-EXPECTED_QUERY_COLS = 48
+EXPECTED_QUERY_COLS = 49
 
 
 CLUSTER_ID = "CLUSTER_ID"
@@ -76,6 +76,7 @@ PERNODE_PEAK_MEM_MAX = "PERNODE_PEAK_MEM_MAX"
 PERNODE_PEAK_MEM_MEAN = "PERNODE_PEAK_MEM_MEAN"
 SQL = "SQL"
 PLAN = "PLAN"
+TABLES_QUERIED = "TABLES_QUERIED"
 
 
 def assert_query(query_tbl, client, expected_cluster_id, raw_profile=None, 
impalad=None,
@@ -699,6 +700,18 @@ def assert_query(query_tbl, client, expected_cluster_id, 
raw_profile=None, impal
     assert plan is not None
     assert data[index] == plan.group(1)
 
+  # Tables Queried
+  index += 1
+  assert sql_results.column_labels[index] == TABLES_QUERIED
+  ret_data[TABLES_QUERIED] = data[index]
+  tables = re.search(r'\n\s+Tables Queried:\s+(.*?)\n', profile_text)
+  if query_state_value == "EXCEPTION" or query_type == "DDL":
+    assert tables is None
+    assert data[index] == ""
+  else:
+    assert tables is not None
+    assert data[index] == tables.group(1)
+
   return ret_data
 # function assert_query
 

Reply via email to