This is an automated email from the ASF dual-hosted git repository. jasonmfehr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 75ca356ed0451e1a98f70433a4d2729018b68d32 Author: jasonmfehr <jf...@cloudera.com> AuthorDate: Tue Aug 12 14:30:16 2025 -0700 IMPALA-13237: [Patch 6] - Match Behavior of Older Impala Versions Older versions of Impala set exec_state_ in ClientRequestState to the error state earlier in the query processing than later versions. This difference translates to when OpenTelemetry trace child spans report an error status. For example, in older Impala versions, if a query specifies a column that does not exist, then the Planning child span has a status of ERROR. However, in the latest version, the Planning span has a status of OK, and only the Close span has a status of ERROR. This difference caused the custom cluster test test_otel_trace.py::TestOtelTrace::test_invalid_sql to fail in the older Impala versions but pass in the latest version. Additionally, older versions of Impala report a different default db. The latest version reports whatever the client set. This difference caused test_otel_trace.py::TestOtelTrace::test_retry_select_success and test_otel_trace.py::TestOtelTrace::test_retry_select_failed to fail in the older Impala versions because Impala used "tpch" as the default db while the latest version used "default". This change causes the OpenTelemetry trace child span where an error actually occurs to report an error status, matching the behavior of older Impala versions. It also modifies test_otel_trace.py to expect the default db in the OpenTelemetry trace "DefaultDb" attribute to match the query profile. Testing accomplished by running the test_otel_trace.py custom cluster tests. Change-Id: If57aaef2da6d6904c66d0150f50ea9ac1c3ebc8c Reviewed-on: http://gerrit.cloudera.org:8080/23293 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- be/src/observe/span-manager.cc | 7 ++- tests/custom_cluster/test_otel_trace.py | 106 +++++++++++++++----------------- tests/util/query_profile_util.py | 7 +++ 3 files changed, 59 insertions(+), 61 deletions(-) diff --git a/be/src/observe/span-manager.cc b/be/src/observe/span-manager.cc index 3b4218937..38f42d820 100644 --- a/be/src/observe/span-manager.cc +++ b/be/src/observe/span-manager.cc @@ -403,9 +403,6 @@ void SpanManager::EndChildSpan(const Status* cause, current_child_->SetAttribute(a.first, a.second); } - current_child_->SetAttribute(ATTR_STATUS, - ClientRequestState::ExecStateToString(client_request_state_->exec_state())); - const Status* query_status; if (cause != nullptr) { query_status = cause; @@ -415,6 +412,8 @@ void SpanManager::EndChildSpan(const Status* cause, if (query_status->ok()) { current_child_->SetAttributeEmpty(ATTR_ERROR_MSG); + current_child_->SetAttribute(ATTR_STATUS, + ClientRequestState::ExecStateToString(client_request_state_->exec_state())); } else { string error_msg = query_status->msg().msg(); @@ -423,6 +422,8 @@ void SpanManager::EndChildSpan(const Status* cause, } current_child_->SetAttribute(ATTR_ERROR_MSG, error_msg); + current_child_->SetAttribute(ATTR_STATUS, + ClientRequestState::ExecStateToString(ClientRequestState::ExecState::ERROR)); } current_child_->End(); diff --git a/tests/custom_cluster/test_otel_trace.py b/tests/custom_cluster/test_otel_trace.py index 0a948d7f1..25647011c 100644 --- a/tests/custom_cluster/test_otel_trace.py +++ b/tests/custom_cluster/test_otel_trace.py @@ -19,14 +19,13 @@ from __future__ import absolute_import, division, print_function from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.file_utils import wait_for_file_line_count -from tests.common.impala_connection import ERROR, RUNNING, FINISHED -from tests.common.test_vector import PROTOCOL, HS2, BEESWAX, ImpalaTestDimension +from tests.common.impala_connection import ERROR, RUNNING, FINISHED, INITIALIZED from tests.util.otel_trace import parse_trace_file, ATTR_VAL_TYPE_STRING, \ ATTR_VAL_TYPE_INT, ATTR_VAL_TYPE_BOOL from tests.util.query_profile_util import parse_db_user, parse_session_id, parse_sql, \ parse_query_type, parse_query_status, parse_impala_query_state, parse_query_id, \ parse_retry_status, parse_original_query_id, parse_retried_query_id, \ - parse_num_rows_fetched, parse_admission_result + parse_num_rows_fetched, parse_admission_result, parse_default_db from tests.util.retry import retry @@ -36,11 +35,6 @@ class TestOtelTrace(CustomClusterTestSuite): OUT_DIR = "out_dir" TRACE_FILE = "export-trace.jsonl" - @classmethod - def add_test_dimensions(cls): - super(TestOtelTrace, cls).add_test_dimensions() - cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension(PROTOCOL, HS2, BEESWAX)) - def setup_method(self, method): super(TestOtelTrace, self).setup_method(method) @@ -49,12 +43,11 @@ class TestOtelTrace(CustomClusterTestSuite): "--otel_trace_exporter=file --otel_file_flush_interval_ms=500 " "--otel_file_pattern={out_dir}/" + TRACE_FILE, cluster_size=1, tmp_dir_placeholders=[OUT_DIR], disable_log_buffering=True) - def test_query_success(self, vector): + def test_query_success(self): """Test that OpenTelemetry tracing is working by running a simple query and checking that the trace file is created and contains spans.""" query = "select count(*) from functional.alltypes" - result = self.execute_query_expect_success( - self.create_impala_client_from_vector(vector), query) + result = self.execute_query_expect_success(self.client, query) self.__assert_trace(result.query_id, result.runtime_profile, "otel_trace") @@ -64,10 +57,9 @@ class TestOtelTrace(CustomClusterTestSuite): "--otel_file_flush_interval_ms=500 " "--otel_file_pattern={out_dir}/" + TRACE_FILE, cluster_size=1, tmp_dir_placeholders=[OUT_DIR], disable_log_buffering=True) - def test_invalid_sql(self, vector): + def test_invalid_sql(self): query = "select * from functional.alltypes where field_does_not_exist=1" - self.execute_query_expect_failure(self.create_impala_client_from_vector(vector), - query) + self.execute_query_expect_failure(self.client, query) # Retrieve the query id and runtime profile from the UI since the query execute call # only returns a HiveServer2Error object and not the query id or profile. @@ -90,12 +82,11 @@ class TestOtelTrace(CustomClusterTestSuite): cluster_size=3, num_exclusive_coordinators=1, tmp_dir_placeholders=[OUT_DIR], disable_log_buffering=True, statestored_args="-statestore_heartbeat_frequency_ms=60000") - def test_retry_select_success(self, vector): + def test_retry_select_success(self): query = "select count(*) from tpch_parquet.lineitem where l_orderkey < 50" self.cluster.impalads[1].kill() - result = self.execute_query_expect_success( - self.create_impala_client_from_vector(vector), query, + result = self.execute_query_expect_success(self.client, query, {"RETRY_FAILED_QUERIES": True}) retried_query_id = parse_query_id(result.runtime_profile) orig_query_profile = self.query_profile_from_ui(result.query_id) @@ -124,36 +115,36 @@ class TestOtelTrace(CustomClusterTestSuite): cluster_size=3, num_exclusive_coordinators=1, tmp_dir_placeholders=[OUT_DIR], disable_log_buffering=True, statestored_args="-statestore_heartbeat_frequency_ms=1000") - def test_retry_select_failed(self, vector): + def test_retry_select_failed(self): # Shuffle heavy query. query = "select * from tpch.lineitem t1, tpch.lineitem t2 where " \ "t1.l_orderkey = t2.l_orderkey order by t1.l_orderkey, t2.l_orderkey limit 1" - vector.set_exec_option("retry_failed_queries", "true") - client = self.create_impala_client_from_vector(vector) + with self.create_impala_client() as client: + client.set_configuration({"retry_failed_queries": "true"}) - # Launch a query, it should be retried. - handle = self.execute_query_async_using_client(client, query, vector) - client.wait_for_impala_state(handle, RUNNING, 60) - query_id = client.handle_id(handle) - self.cluster.impalads[1].kill() + # Launch a query, it should be retried. + handle = client.execute_async(query) + client.wait_for_impala_state(handle, RUNNING, 60) + query_id = client.handle_id(handle) + self.cluster.impalads[1].kill() - # Wait until the retry is running. - def __wait_until_retried(): - return parse_retry_status(self.query_profile_from_ui(query_id)) == "RETRIED" - retry(__wait_until_retried, 60, 1, 1, False) + # Wait until the retry is running. + def __wait_until_retried(): + return parse_retry_status(self.query_profile_from_ui(query_id)) == "RETRIED" + retry(__wait_until_retried, 60, 1, 1, False) - # Kill another impalad so that another retry is attempted. - self.cluster.impalads[2].kill() + # Kill another impalad so that another retry is attempted. + self.cluster.impalads[2].kill() - # Wait until the query fails. - client.wait_for_impala_state(handle, ERROR, 60) + # Wait until the query fails. + client.wait_for_impala_state(handle, ERROR, 60) - retried_query_profile = client.get_runtime_profile(handle) - retried_query_id = parse_query_id(retried_query_profile) - orig_query_profile = self.query_profile_from_ui(query_id) + retried_query_profile = client.get_runtime_profile(handle) + retried_query_id = parse_query_id(retried_query_profile) + orig_query_profile = self.query_profile_from_ui(query_id) - client.close_query(handle) + client.close_query(handle) # Assert the trace from the original query. self.__assert_trace( @@ -179,27 +170,24 @@ class TestOtelTrace(CustomClusterTestSuite): "--otel_file_pattern={out_dir}/" + TRACE_FILE + " " "--default_pool_max_requests=1", cluster_size=1, tmp_dir_placeholders=[OUT_DIR], disable_log_buffering=True) - def test_select_queued(self, vector): + def test_select_queued(self): # Launch two queries, the second will be queued until the first completes. - client = self.create_impala_client_from_vector(vector) - query = "select * from functional.alltypes where id = 1" - handle1 = self.execute_query_async_using_client(client, - "{} and int_col = sleep(5000)".format(query), vector) - client.wait_for_impala_state(handle1, RUNNING, 60) - query_id_1 = client.handle_id(handle1) + handle1 = self.client.execute_async("{} and int_col = sleep(5000)".format(query)) + self.client.wait_for_impala_state(handle1, RUNNING, 60) + query_id_1 = self.client.handle_id(handle1) - handle2 = self.execute_query_async_using_client(client, query, vector) - query_id_2 = client.handle_id(handle2) + handle2 = self.client.execute_async(query) + query_id_2 = self.client.handle_id(handle2) - client.wait_for_impala_state(handle1, FINISHED, 60) - query_profile_1 = client.get_runtime_profile(handle1) - client.close_query(handle1) + self.client.wait_for_impala_state(handle1, FINISHED, 60) + query_profile_1 = self.client.get_runtime_profile(handle1) + self.client.close_query(handle1) - client.wait_for_impala_state(handle2, FINISHED, 60) - query_profile_2 = client.get_runtime_profile(handle2) + self.client.wait_for_impala_state(handle2, FINISHED, 60) + query_profile_2 = self.client.get_runtime_profile(handle2) - client.close_query(handle2) + self.client.close_query(handle2) self.__assert_trace( query_id=query_id_1, @@ -270,8 +258,8 @@ class TestOtelTrace(CustomClusterTestSuite): span_err_msg = query_status in_error = True self.__assert_initspan_attrs(trace.child_spans, root_span_id, query_id, session_id, - cluster_id, db_user, "default-pool", "default", parse_sql(query_profile), - original_query_id) + cluster_id, db_user, "default-pool", parse_default_db(query_profile), + parse_sql(query_profile), original_query_id) # Assert Submitted span. if "Submitted" not in missing_spans: @@ -283,15 +271,17 @@ class TestOtelTrace(CustomClusterTestSuite): # Assert Planning span. if "Planning" not in missing_spans: + status = INITIALIZED span_err_msg = "" if err_span == "Planning" or in_error: span_err_msg = query_status + status = ERROR in_error = True query_type = parse_query_type(query_profile) if query_type == "N/A": query_type = "UNKNOWN" self.__assert_planningspan_attrs(trace.child_spans, root_span_id, query_id, - query_type, span_err_msg) + query_type, span_err_msg, status) # Assert AdmissionControl span. if "AdmissionControl" not in missing_spans: @@ -487,7 +477,7 @@ class TestOtelTrace(CustomClusterTestSuite): # Locate the init span and assert. init_span = self.__find_span(spans, "Init", query_id) - self.__assert_scopespan_common(init_span, query_id, False, "Init", 8, "INITIALIZED", + self.__assert_scopespan_common(init_span, query_id, False, "Init", 8, INITIALIZED, root_span_id) self.__assert_attr(init_span.name, init_span.attributes, "QueryId", query_id) @@ -507,10 +497,10 @@ class TestOtelTrace(CustomClusterTestSuite): submitted_span = self.__find_span(spans, "Submitted", query_id) self.__assert_scopespan_common(submitted_span, query_id, False, "Submitted", 0, - "INITIALIZED", root_span_id) + INITIALIZED, root_span_id) def __assert_planningspan_attrs(self, spans, root_span_id, query_id, query_type, - err_msg="", status="INITIALIZED"): + err_msg="", status=INITIALIZED): """ Helper function that asserts the common and span-specific attributes in the planning execution span. diff --git a/tests/util/query_profile_util.py b/tests/util/query_profile_util.py index 04e76e3d3..2b9827ed7 100644 --- a/tests/util/query_profile_util.py +++ b/tests/util/query_profile_util.py @@ -120,3 +120,10 @@ def parse_admission_result(profile_text): admission_result = re.search(r'\n\s+Admission result:\s+(.*?)\n', profile_text) assert admission_result is not None, "Admission Result not found in query profile" return admission_result.group(1) + + +def parse_default_db(profile_text): + """Parses the default db from the query profile text.""" + default_db = re.search(r'\n\s+Default Db:\s+(.*?)\n', profile_text) + assert default_db is not None, "Default Db not found in query profile" + return default_db.group(1)