This is an automated email from the ASF dual-hosted git repository.

jasonmfehr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 75ca356ed0451e1a98f70433a4d2729018b68d32
Author: jasonmfehr <jf...@cloudera.com>
AuthorDate: Tue Aug 12 14:30:16 2025 -0700

    IMPALA-13237: [Patch 6] - Match Behavior of Older Impala Versions
    
    Older versions of Impala set exec_state_ in ClientRequestState to
    the error state earlier in the query processing than later versions.
    This difference translates to when OpenTelemetry trace child spans
    report an error status.
    
    For example, in older Impala versions, if a query specifies a column
    that does not exist, then the Planning child span has a status of
    ERROR. However, in the latest version, the Planning span has a status of
    OK, and only the Close span has a status of ERROR.
    
    This difference caused the custom cluster test
    test_otel_trace.py::TestOtelTrace::test_invalid_sql to fail in the
    older Impala versions but pass in the latest version.
    
    Additionally, older versions of Impala report a different default db.
    The latest version reports whatever the client set. This difference
    caused test_otel_trace.py::TestOtelTrace::test_retry_select_success
    and test_otel_trace.py::TestOtelTrace::test_retry_select_failed to
    fail in the older Impala versions because Impala used "tpch" as the
    default db while the latest version used "default".
    
    This change causes the OpenTelemetry trace child span where an error
    actually occurs to report an error status, matching the behavior
    of older Impala versions.
    
    It also modifies test_otel_trace.py to expect the default db in the
    OpenTelemetry trace "DefaultDb" attribute to match the query profile.
    
    Testing accomplished by running the test_otel_trace.py custom cluster
    tests.
    
    Change-Id: If57aaef2da6d6904c66d0150f50ea9ac1c3ebc8c
    Reviewed-on: http://gerrit.cloudera.org:8080/23293
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 be/src/observe/span-manager.cc          |   7 ++-
 tests/custom_cluster/test_otel_trace.py | 106 +++++++++++++++-----------------
 tests/util/query_profile_util.py        |   7 +++
 3 files changed, 59 insertions(+), 61 deletions(-)

diff --git a/be/src/observe/span-manager.cc b/be/src/observe/span-manager.cc
index 3b4218937..38f42d820 100644
--- a/be/src/observe/span-manager.cc
+++ b/be/src/observe/span-manager.cc
@@ -403,9 +403,6 @@ void SpanManager::EndChildSpan(const Status* cause,
       current_child_->SetAttribute(a.first, a.second);
     }
 
-    current_child_->SetAttribute(ATTR_STATUS,
-        
ClientRequestState::ExecStateToString(client_request_state_->exec_state()));
-
     const Status* query_status;
     if (cause != nullptr) {
       query_status = cause;
@@ -415,6 +412,8 @@ void SpanManager::EndChildSpan(const Status* cause,
 
     if (query_status->ok()) {
       current_child_->SetAttributeEmpty(ATTR_ERROR_MSG);
+      current_child_->SetAttribute(ATTR_STATUS,
+        
ClientRequestState::ExecStateToString(client_request_state_->exec_state()));
     } else {
       string error_msg = query_status->msg().msg();
 
@@ -423,6 +422,8 @@ void SpanManager::EndChildSpan(const Status* cause,
       }
 
       current_child_->SetAttribute(ATTR_ERROR_MSG, error_msg);
+      current_child_->SetAttribute(ATTR_STATUS,
+            
ClientRequestState::ExecStateToString(ClientRequestState::ExecState::ERROR));
     }
 
     current_child_->End();
diff --git a/tests/custom_cluster/test_otel_trace.py 
b/tests/custom_cluster/test_otel_trace.py
index 0a948d7f1..25647011c 100644
--- a/tests/custom_cluster/test_otel_trace.py
+++ b/tests/custom_cluster/test_otel_trace.py
@@ -19,14 +19,13 @@ from __future__ import absolute_import, division, 
print_function
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.file_utils import wait_for_file_line_count
-from tests.common.impala_connection import ERROR, RUNNING, FINISHED
-from tests.common.test_vector import PROTOCOL, HS2, BEESWAX, 
ImpalaTestDimension
+from tests.common.impala_connection import ERROR, RUNNING, FINISHED, 
INITIALIZED
 from tests.util.otel_trace import parse_trace_file, ATTR_VAL_TYPE_STRING, \
     ATTR_VAL_TYPE_INT, ATTR_VAL_TYPE_BOOL
 from tests.util.query_profile_util import parse_db_user, parse_session_id, 
parse_sql, \
     parse_query_type, parse_query_status, parse_impala_query_state, 
parse_query_id, \
     parse_retry_status, parse_original_query_id, parse_retried_query_id, \
-    parse_num_rows_fetched, parse_admission_result
+    parse_num_rows_fetched, parse_admission_result, parse_default_db
 from tests.util.retry import retry
 
 
@@ -36,11 +35,6 @@ class TestOtelTrace(CustomClusterTestSuite):
   OUT_DIR = "out_dir"
   TRACE_FILE = "export-trace.jsonl"
 
-  @classmethod
-  def add_test_dimensions(cls):
-    super(TestOtelTrace, cls).add_test_dimensions()
-    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension(PROTOCOL, HS2, 
BEESWAX))
-
   def setup_method(self, method):
     super(TestOtelTrace, self).setup_method(method)
 
@@ -49,12 +43,11 @@ class TestOtelTrace(CustomClusterTestSuite):
                    "--otel_trace_exporter=file 
--otel_file_flush_interval_ms=500 "
                    "--otel_file_pattern={out_dir}/" + TRACE_FILE,
       cluster_size=1, tmp_dir_placeholders=[OUT_DIR], 
disable_log_buffering=True)
-  def test_query_success(self, vector):
+  def test_query_success(self):
     """Test that OpenTelemetry tracing is working by running a simple query and
     checking that the trace file is created and contains spans."""
     query = "select count(*) from functional.alltypes"
-    result = self.execute_query_expect_success(
-        self.create_impala_client_from_vector(vector), query)
+    result = self.execute_query_expect_success(self.client, query)
 
     self.__assert_trace(result.query_id, result.runtime_profile, "otel_trace")
 
@@ -64,10 +57,9 @@ class TestOtelTrace(CustomClusterTestSuite):
                    "--otel_file_flush_interval_ms=500 "
                    "--otel_file_pattern={out_dir}/" + TRACE_FILE,
       cluster_size=1, tmp_dir_placeholders=[OUT_DIR], 
disable_log_buffering=True)
-  def test_invalid_sql(self, vector):
+  def test_invalid_sql(self):
     query = "select * from functional.alltypes where field_does_not_exist=1"
-    
self.execute_query_expect_failure(self.create_impala_client_from_vector(vector),
-        query)
+    self.execute_query_expect_failure(self.client, query)
 
     # Retrieve the query id and runtime profile from the UI since the query 
execute call
     # only returns a HiveServer2Error object and not the query id or profile.
@@ -90,12 +82,11 @@ class TestOtelTrace(CustomClusterTestSuite):
       cluster_size=3, num_exclusive_coordinators=1, 
tmp_dir_placeholders=[OUT_DIR],
       disable_log_buffering=True,
       statestored_args="-statestore_heartbeat_frequency_ms=60000")
-  def test_retry_select_success(self, vector):
+  def test_retry_select_success(self):
     query = "select count(*) from tpch_parquet.lineitem where l_orderkey < 50"
     self.cluster.impalads[1].kill()
 
-    result = self.execute_query_expect_success(
-        self.create_impala_client_from_vector(vector), query,
+    result = self.execute_query_expect_success(self.client, query,
         {"RETRY_FAILED_QUERIES": True})
     retried_query_id = parse_query_id(result.runtime_profile)
     orig_query_profile = self.query_profile_from_ui(result.query_id)
@@ -124,36 +115,36 @@ class TestOtelTrace(CustomClusterTestSuite):
       cluster_size=3, num_exclusive_coordinators=1, 
tmp_dir_placeholders=[OUT_DIR],
       disable_log_buffering=True,
       statestored_args="-statestore_heartbeat_frequency_ms=1000")
-  def test_retry_select_failed(self, vector):
+  def test_retry_select_failed(self):
     # Shuffle heavy query.
     query = "select * from tpch.lineitem t1, tpch.lineitem t2 where " \
       "t1.l_orderkey = t2.l_orderkey order by t1.l_orderkey, t2.l_orderkey 
limit 1"
 
-    vector.set_exec_option("retry_failed_queries", "true")
-    client = self.create_impala_client_from_vector(vector)
+    with self.create_impala_client() as client:
+      client.set_configuration({"retry_failed_queries": "true"})
 
-    # Launch a query, it should be retried.
-    handle = self.execute_query_async_using_client(client, query, vector)
-    client.wait_for_impala_state(handle, RUNNING, 60)
-    query_id = client.handle_id(handle)
-    self.cluster.impalads[1].kill()
+      # Launch a query, it should be retried.
+      handle = client.execute_async(query)
+      client.wait_for_impala_state(handle, RUNNING, 60)
+      query_id = client.handle_id(handle)
+      self.cluster.impalads[1].kill()
 
-    # Wait until the retry is running.
-    def __wait_until_retried():
-      return parse_retry_status(self.query_profile_from_ui(query_id)) == 
"RETRIED"
-    retry(__wait_until_retried, 60, 1, 1, False)
+      # Wait until the retry is running.
+      def __wait_until_retried():
+        return parse_retry_status(self.query_profile_from_ui(query_id)) == 
"RETRIED"
+      retry(__wait_until_retried, 60, 1, 1, False)
 
-    # Kill another impalad so that another retry is attempted.
-    self.cluster.impalads[2].kill()
+      # Kill another impalad so that another retry is attempted.
+      self.cluster.impalads[2].kill()
 
-    # Wait until the query fails.
-    client.wait_for_impala_state(handle, ERROR, 60)
+      # Wait until the query fails.
+      client.wait_for_impala_state(handle, ERROR, 60)
 
-    retried_query_profile = client.get_runtime_profile(handle)
-    retried_query_id = parse_query_id(retried_query_profile)
-    orig_query_profile = self.query_profile_from_ui(query_id)
+      retried_query_profile = client.get_runtime_profile(handle)
+      retried_query_id = parse_query_id(retried_query_profile)
+      orig_query_profile = self.query_profile_from_ui(query_id)
 
-    client.close_query(handle)
+      client.close_query(handle)
 
     # Assert the trace from the original query.
     self.__assert_trace(
@@ -179,27 +170,24 @@ class TestOtelTrace(CustomClusterTestSuite):
                    "--otel_file_pattern={out_dir}/" + TRACE_FILE + " "
                    "--default_pool_max_requests=1",
       cluster_size=1, tmp_dir_placeholders=[OUT_DIR], 
disable_log_buffering=True)
-  def test_select_queued(self, vector):
+  def test_select_queued(self):
     # Launch two queries, the second will be queued until the first completes.
-    client = self.create_impala_client_from_vector(vector)
-
     query = "select * from functional.alltypes where id = 1"
-    handle1 = self.execute_query_async_using_client(client,
-        "{} and int_col = sleep(5000)".format(query), vector)
-    client.wait_for_impala_state(handle1, RUNNING, 60)
-    query_id_1 = client.handle_id(handle1)
+    handle1 = self.client.execute_async("{} and int_col = 
sleep(5000)".format(query))
+    self.client.wait_for_impala_state(handle1, RUNNING, 60)
+    query_id_1 = self.client.handle_id(handle1)
 
-    handle2 = self.execute_query_async_using_client(client, query, vector)
-    query_id_2 = client.handle_id(handle2)
+    handle2 = self.client.execute_async(query)
+    query_id_2 = self.client.handle_id(handle2)
 
-    client.wait_for_impala_state(handle1, FINISHED, 60)
-    query_profile_1 = client.get_runtime_profile(handle1)
-    client.close_query(handle1)
+    self.client.wait_for_impala_state(handle1, FINISHED, 60)
+    query_profile_1 = self.client.get_runtime_profile(handle1)
+    self.client.close_query(handle1)
 
-    client.wait_for_impala_state(handle2, FINISHED, 60)
-    query_profile_2 = client.get_runtime_profile(handle2)
+    self.client.wait_for_impala_state(handle2, FINISHED, 60)
+    query_profile_2 = self.client.get_runtime_profile(handle2)
 
-    client.close_query(handle2)
+    self.client.close_query(handle2)
 
     self.__assert_trace(
         query_id=query_id_1,
@@ -270,8 +258,8 @@ class TestOtelTrace(CustomClusterTestSuite):
         span_err_msg = query_status
         in_error = True
       self.__assert_initspan_attrs(trace.child_spans, root_span_id, query_id, 
session_id,
-          cluster_id, db_user, "default-pool", "default", 
parse_sql(query_profile),
-          original_query_id)
+          cluster_id, db_user, "default-pool", parse_default_db(query_profile),
+          parse_sql(query_profile), original_query_id)
 
     # Assert Submitted span.
     if "Submitted" not in missing_spans:
@@ -283,15 +271,17 @@ class TestOtelTrace(CustomClusterTestSuite):
 
     # Assert Planning span.
     if "Planning" not in missing_spans:
+      status = INITIALIZED
       span_err_msg = ""
       if err_span == "Planning" or in_error:
         span_err_msg = query_status
+        status = ERROR
         in_error = True
       query_type = parse_query_type(query_profile)
       if query_type == "N/A":
         query_type = "UNKNOWN"
       self.__assert_planningspan_attrs(trace.child_spans, root_span_id, 
query_id,
-          query_type, span_err_msg)
+          query_type, span_err_msg, status)
 
     # Assert AdmissionControl span.
     if "AdmissionControl" not in missing_spans:
@@ -487,7 +477,7 @@ class TestOtelTrace(CustomClusterTestSuite):
     # Locate the init span and assert.
     init_span = self.__find_span(spans, "Init", query_id)
 
-    self.__assert_scopespan_common(init_span, query_id, False, "Init", 8, 
"INITIALIZED",
+    self.__assert_scopespan_common(init_span, query_id, False, "Init", 8, 
INITIALIZED,
         root_span_id)
 
     self.__assert_attr(init_span.name, init_span.attributes, "QueryId", 
query_id)
@@ -507,10 +497,10 @@ class TestOtelTrace(CustomClusterTestSuite):
 
     submitted_span = self.__find_span(spans, "Submitted", query_id)
     self.__assert_scopespan_common(submitted_span, query_id, False, 
"Submitted", 0,
-        "INITIALIZED", root_span_id)
+        INITIALIZED, root_span_id)
 
   def __assert_planningspan_attrs(self, spans, root_span_id, query_id, 
query_type,
-      err_msg="", status="INITIALIZED"):
+      err_msg="", status=INITIALIZED):
     """
       Helper function that asserts the common and span-specific attributes in 
the
       planning execution span.
diff --git a/tests/util/query_profile_util.py b/tests/util/query_profile_util.py
index 04e76e3d3..2b9827ed7 100644
--- a/tests/util/query_profile_util.py
+++ b/tests/util/query_profile_util.py
@@ -120,3 +120,10 @@ def parse_admission_result(profile_text):
   admission_result = re.search(r'\n\s+Admission result:\s+(.*?)\n', 
profile_text)
   assert admission_result is not None, "Admission Result not found in query 
profile"
   return admission_result.group(1)
+
+
+def parse_default_db(profile_text):
+  """Parses the default db from the query profile text."""
+  default_db = re.search(r'\n\s+Default Db:\s+(.*?)\n', profile_text)
+  assert default_db is not None, "Default Db not found in query profile"
+  return default_db.group(1)

Reply via email to