This is an automated email from the ASF dual-hosted git repository. jasonmfehr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 789991c6cc01819e93521cb9d7703c9a2ec58ce4 Author: jasonmfehr <jf...@cloudera.com> AuthorDate: Wed Aug 6 16:40:09 2025 -0700 IMPALA-13237: [Patch 8] - OpenTelemetry Traces for DML/DDL Queries and Handle Leading Comments Trace DML/DDL Queries * Adds tracing for alter, compute, create, delete, drop, insert, invalidate metadata, and with queries. * Stops tracing beeswax queries since that protocol is deprecated. * Adds Coordinator attribute to Init and Root spans for identifying where the query is running. Comment Handling * Corrects handling of leading comments, both inline and full line. Previously, queries with comments before the first keyword were always ignored. * Adds be ctest tests for determining whether or not a query should be traced. General Improvements * Handles the case where the first query keyword is followed by a newline character or an inline comment (without or with spaces between). * Corrects traces for errored/cancelled queries. These cases short-circuit the normal query processing code path and have to be handled accordingly. * Ends the root span when the query ends instead of waiting for the ClientRequestState to go out of scope. This change removes use-after-free issues caused by reading from ClientRequestState when the SpanManager went out of scope during that object's dtor. * Simplified minimum tls version handling because the validators on the ssl_minimum_version eliminate invalid values that previously had to be accounted for. * Removes the unnecessary otel_trace_enabled() function. * Fixes IMPALA-14314 by waiting for the full trace to be written to the output file before asserting that trace. Testing * Full test suite passed. * ASAN/TSAN builds passed. * Adds new ctest test. * Adds custom cluster tests to assert traces for the new supported query types. * Adds custom cluster tests to assert traces for errored and cancelled queries. Generated-by: Github Copilot (Claude Sonnet 3.7) Change-Id: Ie9e83d7f761f3d629f067e0a0602224e42cd7184 Reviewed-on: http://gerrit.cloudera.org:8080/23279 Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Reviewed-by: Riza Suminto <riza.sumi...@cloudera.com> --- be/CMakeLists.txt | 1 + be/src/observe/CMakeLists.txt | 12 + be/src/observe/otel-flags-trace.cc | 8 +- be/src/observe/otel-test.cc | 182 ++++++ be/src/observe/otel.cc | 97 ++- be/src/observe/otel.h | 9 +- be/src/observe/span-manager.cc | 143 +++-- be/src/observe/span-manager.h | 2 +- be/src/runtime/coordinator.cc | 3 - be/src/runtime/dml-exec-state.cc | 9 + be/src/runtime/dml-exec-state.h | 3 + be/src/runtime/exec-env.cc | 3 +- be/src/service/client-request-state.cc | 26 +- be/src/service/client-request-state.h | 9 +- be/src/service/impala-server.cc | 3 +- tests/common/custom_cluster_test_suite.py | 11 +- tests/common/file_utils.py | 22 +- tests/custom_cluster/test_otel_trace.py | 1001 +++++++++++++++++++++++------ tests/util/otel_trace.py | 68 +- tests/util/query_profile_util.py | 26 +- 20 files changed, 1343 insertions(+), 295 deletions(-) diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index 9c5fec0ad..88ffb0cc8 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -595,6 +595,7 @@ set (UNIFIED_TEST_LIBS ExprsTests GUtilTests IoTests + OtelTests RpcTests RuntimeTests SchedulingTests diff --git a/be/src/observe/CMakeLists.txt b/be/src/observe/CMakeLists.txt index 73f835e9e..93be4f22a 100644 --- a/be/src/observe/CMakeLists.txt +++ b/be/src/observe/CMakeLists.txt @@ -29,3 +29,15 @@ add_library(Observe timed-span.cc ) add_dependencies(Observe gen-deps) + +if (BUILD_WITH_NO_TESTS) + return() +endif() + +add_library(OtelTests STATIC + otel-test.cc +) + +add_dependencies(OtelTests gen-deps) + +ADD_UNIFIED_BE_LSAN_TEST(otel-test OtelTest.*) diff --git a/be/src/observe/otel-flags-trace.cc b/be/src/observe/otel-flags-trace.cc index 65718fe82..001adb401 100644 --- a/be/src/observe/otel-flags-trace.cc +++ b/be/src/observe/otel-flags-trace.cc @@ -61,6 +61,9 @@ DEFINE_validator(otel_trace_exporter, [](const char* flagname, const string& val return false; }); // flag otel_trace_exporter +DEFINE_bool_hidden(otel_trace_beeswax, false, "Specifies whether or not to trace queries " + "submitted via the Beeswax protocol. This flag is hidden because tracing Beeswax " + "queries is not supported."); // // Start of HTTP related flags. @@ -160,10 +163,11 @@ DEFINE_string(otel_trace_tls_minimum_version, "", "String containing the minimum "TLS version, if not specified, defaults to the overall minimum TLS version."); DEFINE_validator(otel_trace_tls_minimum_version, [](const char* flagname, const string& value) { - if (value.empty() || value == "1.2" || value == "1.3") { + if (value.empty() || value == impala::TLSVersions::TLSV1_2 || value == "tlsv1.3") { return true; } - LOG(ERROR) << "Flag '" << flagname << "' must be empty or one of: '1.2', '1.3'."; + LOG(ERROR) << "Flag '" << flagname << "' must be empty or one of: '" + << impala::TLSVersions::TLSV1_2 << "', 'tlsv1.3'."; return false; }); // flag otel_trace_tls_minimum_version diff --git a/be/src/observe/otel-test.cc b/be/src/observe/otel-test.cc new file mode 100644 index 000000000..2a46a22a6 --- /dev/null +++ b/be/src/observe/otel-test.cc @@ -0,0 +1,182 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "observe/otel.h" + +#include <string> +#include <string_view> + +#include <boost/algorithm/string/replace.hpp> +#include <gtest/gtest.h> +#include "gutil/strings/substitute.h" + +#include "gen-cpp/Query_types.h" +#include "testutil/scoped-flag-setter.h" + +using namespace std; +using namespace impala; + +DECLARE_bool(otel_trace_beeswax); + +TEST(OtelTest, QueriesTraced) { + const auto runtest = [](const string_view sql_str) -> void { + string formatted_sql(sql_str); + boost::replace_all(formatted_sql, "\n", "\\n"); + + EXPECT_TRUE(should_otel_trace_query(sql_str, TSessionType::HIVESERVER2)) + << "Query Not Traced: " << formatted_sql; + }; + + runtest("SELECT * FROM foo"); + runtest("WITH alltypes_tiny1 AS (SELECT * FROM functional.alltypes)"); + runtest("INSERT INTO functional.alltypes (id) VALUES (99999)"); + runtest("CREATE TABLE foo.bar (id int, string_col string)"); + runtest("UPDATE foo.bar SET string_col='a'"); + runtest("ALTER TABLE foo.bar ADD COLUMNS (new_col string)"); + runtest("DELETE FROM foo.bar WHERE id=1"); + runtest("COMPUTE STATS foo.bar"); + runtest("COMPUTE INCREMENTAL STATS foo.bar PARTITION (month=1)"); + runtest("INVALIDATE METADATA foo.bar"); + runtest("DROP TABLE foo.bar PURGE"); + runtest("-- comment1\nSELECT 1"); + runtest("-- comment1\n SELECT 1"); + runtest("-- comment1/*comment2*/\nSELECT 1"); + runtest("-- comment1\n/*comment2*/SELECT 1"); + runtest("--comment1\n--comment2\nSELECT 1"); + runtest("--comment1\n --comment2\nSELECT 1"); + runtest("--comment1 \n --comment2\nSELECT 1"); + runtest("--comment1 \n --comment2 \n SELECT 1"); + runtest("/*comment1*/SELECT 1"); + runtest("/*comment1*/ SELECT 1"); + runtest("/*comment1*/\nSELECT 1"); + runtest("/*comment1*/ \n SELECT 1"); + runtest("/*comment1*/ \n /* comment 2 */ \n SELECT 1"); + runtest("/*comment1*/ /*comment2*/SELECT 1"); + runtest("/*comment1*/ /*comment2*/SELECT 1"); + runtest("/*comment1*/ SELECT /* inline */ 1"); + runtest("/*comment1*/ SELECT /* inline */ 1"); + runtest("/*comment1*/SELECT /* inline */ 1 /* ending */"); + runtest("/*comment1*/ SELECT /* inline */ 1 /* ending */"); + runtest("/*comment1*/ SELECT /* inline */ 1 /* ending */"); + runtest("/*comment1*/ --comment2 \n SELECT 1"); + runtest("--comment1\nSELECT /* inline */ 1 /* ending */"); + runtest("--comment1 \n SELECT /* inline */ 1 /* ending */"); + runtest("--comment1 \n SELECT /* inline */ 1 /* ending */"); + runtest("--comment1 /*inline*/\nSELECT /* inline */ 1 /* ending */"); + runtest("--comment1 /*inline*/ \n SELECT /* inline */ 1 /* ending */"); + runtest("SELECT 'SELECT'"); + runtest("SELECT `SELECT` from tbl"); + runtest("-- comment1 \n SELECT `SELECT` from tbl"); + runtest("-- comment1 \n --comment2\nSELECT `SELECT` from tbl"); + runtest("/*comment1*/SELECT `SELECT` from tbl"); + runtest("/*comment1*/ \n SELECT `SELECT` from tbl"); + runtest("/*comment1*/ --comment2 \n SELECT `SELECT` from tbl"); + + auto run_newline_test = [&runtest](const string keyword, const string rest) -> void { + runtest(strings::Substitute("$0\n$1", keyword, rest)); + runtest(strings::Substitute("$0 \n$1", keyword, rest)); + runtest(strings::Substitute("$0\n $1", keyword, rest)); + runtest(strings::Substitute("$0 \n $1", keyword, rest)); + runtest(strings::Substitute("/*/ comment */$0 \n $1", keyword, rest)); + runtest(strings::Substitute("$0/* inline comment*/$1", keyword, rest)); + runtest(strings::Substitute("--comment\n$0/* inline comment*/$1", keyword, rest)); + runtest(strings::Substitute("/*comment 1*/\n$0/* inline comment*/$1", keyword, rest)); + runtest(strings::Substitute("--comment\n$0 \n$1", keyword, rest)); + runtest(strings::Substitute("/*comment1*/ --comment2\n$0\n$1", keyword, rest)); + runtest(strings::Substitute("/*comment1*/ --comment2\n$0 \n$1", keyword, rest)); + runtest(strings::Substitute("/*comment1*/ --comment2\n $0\n$1", keyword, rest)); + runtest(strings::Substitute("/*comment1*/ --comment2\n $0 \n$1", keyword, rest)); + runtest(strings::Substitute("/*comm1*/ --comm2\n$0/*comm3*/\n$1", keyword, rest)); + }; + + run_newline_test("SELECT", "* FROM FOO"); + run_newline_test("ALTER", "TABLE FOO"); + run_newline_test("COMPUTE", "STATS FOO"); + run_newline_test("CREATE", "TABLE FOO"); + run_newline_test("DELETE", "TABLE FOO"); + run_newline_test("DROP", "TABLE FOO"); + run_newline_test("INSERT", "INTO TABLE FOO"); + run_newline_test("INVALIDATE", "METADATA FOO"); + run_newline_test("WITH", "T1 AS SELECT * FROM FOO"); + + // Beeswax queries are traced when the otel_trace_beeswax flag is set. + { + auto trace_beeswax_setter = + ScopedFlagSetter<bool>::Make(&FLAGS_otel_trace_beeswax,true); + + EXPECT_TRUE(should_otel_trace_query("SELECT * FROM foo", TSessionType::BEESWAX)); + } +} + +TEST(OtelTest, QueriesNotTraced) { + const auto runtest = [](string sql_str) -> void { + string formatted_sql(sql_str); + boost::replace_all(formatted_sql, "\n", "\\n"); + + EXPECT_FALSE(should_otel_trace_query(sql_str, TSessionType::HIVESERVER2)) + << "Query traced but should not be: " << formatted_sql; + }; + + runtest("COMMENT ON DATABASE {} IS 'test'"); + runtest("DESCRIBE {}"); + runtest("EXPLAIN SELECT * FROM {}"); + runtest("REFRESH FUNCTIONS functional"); + runtest("REFRESH\nFUNCTIONS functional"); + runtest("REFRESH \nFUNCTIONS functional"); + runtest("REFRESH\n FUNCTIONS functional"); + runtest("REFRESH \n FUNCTIONS functional"); + runtest("REFRESH TABLE functional"); + runtest("REFRESH\nTABLE functional"); + runtest("REFRESH \nTABLE functional"); + runtest("REFRESH\n TABLE functional"); + runtest("REFRESH \n TABLE functional"); + runtest("SET ALL"); + runtest("SHOW TABLES IN {}"); + runtest("SHOW DATABASES"); + runtest("TRUNCATE TABLE {}"); + runtest("USE functional"); + runtest("VALUES (1, 2, 3)"); + runtest("KILL QUERY '1234:5678'"); + runtest("/*comment1*/SET EXPLAIN_LEVEL=0"); + runtest("/*comment1*/ SET EXPLAIN_LEVEL=0"); + runtest("--comment1\nSET EXPLAIN_LEVEL=0"); + runtest("--comment1 \n SET EXPLAIN_LEVEL=0"); + runtest("/* comment1 */--comment1 \n SET EXPLAIN_LEVEL=0"); + runtest("/* comment1 */ --comment1 \n SET EXPLAIN_LEVEL=0"); + runtest("REFRESH AUTHORIZATION"); + runtest("REFRESH\nAUTHORIZATION"); + runtest("REFRESH \nAUTHORIZATION"); + runtest("REFRESH\n AUTHORIZATION"); + runtest("REFRESH \n AUTHORIZATION"); + runtest("/*comment not terminated select 1"); + runtest("/*comment1*/ /*comment 2 not terminated select 1"); + runtest("/*comment only*/"); + runtest("--comment only"); + runtest("--comment only\n"); + runtest("--comment only\n--comment only 2"); + runtest("--comment only\n--comment only 2\n"); + // TODO: Move to the QueriesTraced test case one IMPALA-14370 is fixed. + runtest(strings::Substitute("/*/ comment */select * from tbl")); + + // Beeswax queries are not traced unless the otel_trace_beeswax flag is set. + { + auto trace_beeswax_setter = + ScopedFlagSetter<bool>::Make(&FLAGS_otel_trace_beeswax, false); + + EXPECT_FALSE(should_otel_trace_query("SELECT * FROM foo", TSessionType::BEESWAX)); + } +} diff --git a/be/src/observe/otel.cc b/be/src/observe/otel.cc index 40a870d11..812ce1d8f 100644 --- a/be/src/observe/otel.cc +++ b/be/src/observe/otel.cc @@ -18,8 +18,11 @@ #include "otel.h" #include <chrono> +#include <functional> #include <memory> +#include <regex> #include <string> +#include <string_view> #include <utility> #include <boost/algorithm/string/case_conv.hpp> @@ -50,8 +53,10 @@ #include <opentelemetry/trace/tracer.h> #include <opentelemetry/version.h> +#include "common/compiler-util.h" #include "common/status.h" #include "common/version.h" +#include "gen-cpp/Query_types.h" #include "observe/otel-instrument.h" #include "observe/span-manager.h" #include "service/client-request-state.h" @@ -67,6 +72,7 @@ DECLARE_string(otel_trace_additional_headers); DECLARE_int32(otel_trace_batch_queue_size); DECLARE_int32(otel_trace_batch_max_batch_size); DECLARE_int32(otel_trace_batch_schedule_delay_ms); +DECLARE_bool(otel_trace_beeswax); DECLARE_string(otel_trace_ca_cert_path); DECLARE_string(otel_trace_ca_cert_string); DECLARE_string(otel_trace_collector_url); @@ -89,7 +95,6 @@ DECLARE_int32(otel_trace_timeout_s); DECLARE_string(otel_trace_tls_cipher_suites); DECLARE_bool(otel_trace_tls_insecure_skip_verify); DECLARE_string(otel_trace_tls_minimum_version); -DECLARE_bool(otel_trace_enabled); // Other flags DECLARE_string(ssl_cipher_list); @@ -98,6 +103,26 @@ DECLARE_string(ssl_minimum_version); // Constants static const string SCOPE_SPAN_NAME = "org.apache.impala.impalad.query"; +static const regex query_newline( + "(select|alter|compute|create|delete|drop|insert|invalidate|update|with)\\s*" + "(\n|\\s*\\\\*\\/)", regex::icase | regex::optimize | regex::nosubs); + +// Lambda function to check if SQL starts with relevant keywords for tracing +static const function<bool(std::string_view)> is_traceable_sql = + [](std::string_view sql_str) -> bool { + return + LIKELY(boost::algorithm::istarts_with(sql_str, "select ") + || boost::algorithm::istarts_with(sql_str, "alter ") + || boost::algorithm::istarts_with(sql_str, "compute ") + || boost::algorithm::istarts_with(sql_str, "create ") + || boost::algorithm::istarts_with(sql_str, "delete ") + || boost::algorithm::istarts_with(sql_str, "drop ") + || boost::algorithm::istarts_with(sql_str, "insert ") + || boost::algorithm::istarts_with(sql_str, "invalidate ") + || boost::algorithm::istarts_with(sql_str, "update ") + || boost::algorithm::istarts_with(sql_str, "with ")) + || regex_search(sql_str.cbegin(), sql_str.cend(), query_newline); + }; namespace impala { @@ -112,16 +137,64 @@ static inline bool otel_tls_enabled() { || !FLAGS_otel_trace_ca_cert_string.empty() || !FLAGS_otel_trace_tls_minimum_version.empty() || !FLAGS_otel_trace_ssl_ciphers.empty() - || !FLAGS_otel_trace_tls_cipher_suites.empty(); + || !FLAGS_otel_trace_tls_cipher_suites.empty() + || FLAGS_otel_trace_collector_url.find("https://") == 0; } // function otel_tls_enabled -bool otel_trace_enabled() { - return FLAGS_otel_trace_enabled; -} // function otel_trace_enabled +bool should_otel_trace_query(std::string_view sql, + const TSessionType::type& session_type) { + if (LIKELY(!FLAGS_otel_trace_beeswax) && session_type == TSessionType::BEESWAX) { + return false; + } + + if (LIKELY(is_traceable_sql(sql))) { + return true; + } -bool should_otel_trace_query(const char* sql) { - DCHECK(sql != nullptr) << "SQL statement cannot be null."; - return boost::algorithm::istarts_with(sql, "select "); + // Loop until all leading comments and whitespace are skipped. + while (true) { + if (boost::algorithm::istarts_with(sql, "/*")) { + // Handle leading inline comments + size_t end_comment = sql.find("*/"); + if (end_comment != string_view::npos) { + sql = sql.substr(end_comment + 2); + continue; + } + } else if (boost::algorithm::istarts_with(sql, "--")) { + // Handle leading comment lines + size_t end_comment = sql.find("\n"); + if (end_comment != string_view::npos) { + sql = sql.substr(end_comment + 1); + continue; + } + } else if (UNLIKELY(boost::algorithm::istarts_with(sql, " "))) { + // Handle leading whitespace. Since Impala removes leading whitespace from the SQL + // statement, this case only happens if the sql statement starts with inline + // comments or there is a leading space on the first non-comment line. + size_t end_comment = sql.find_first_not_of(" "); + if (end_comment != string_view::npos) { + sql = sql.substr(end_comment); + continue; + } + } else if (boost::algorithm::istarts_with(sql, "\n")) { + // Handline newlines after inline comments. + size_t end_comment = sql.find_first_not_of("\n"); + if (end_comment != string_view::npos) { + sql = sql.substr(end_comment); + continue; + } + } + + // Check if the SQL statement starts with any of the keywords we want to trace + if (LIKELY(is_traceable_sql(sql))) { + return true; + } + + // No more patterns to check + break; + } + + return false; } // function should_otel_trace_query // Initializes an OtlpHttpExporter instance with configuration from global flags. The @@ -158,13 +231,9 @@ static Status init_exporter_http(unique_ptr<SpanExporter>& exporter) { // Set minimum TLS version to the value of the global ssl_minimum_version flag. // Since this flag is in the format "tlv1.2" or "tlsv1.3", we need to // convert it to the format expected by OtlpHttpExporterOptions. - const string min_ssl_ver = to_lower_copy(trim_copy(FLAGS_ssl_minimum_version)); - - if (!min_ssl_ver.empty() && min_ssl_ver.rfind("tlsv", 0) != 0) { - return Status("ssl_minimum_version must start with 'tlsv'"); + if (!FLAGS_ssl_minimum_version.empty()) { + opts.ssl_min_tls = FLAGS_ssl_minimum_version.substr(4); // Remove "tlsv" prefix } - - opts.ssl_min_tls = min_ssl_ver.substr(4); // Remove "tlsv" prefix } else { opts.ssl_min_tls = FLAGS_otel_trace_tls_minimum_version; } diff --git a/be/src/observe/otel.h b/be/src/observe/otel.h index f5591d6cb..025715eb6 100644 --- a/be/src/observe/otel.h +++ b/be/src/observe/otel.h @@ -19,8 +19,10 @@ #include <memory> #include <string> +#include <string_view> #include "common/status.h" +#include "gen-cpp/Query_types.h" #include "observe/span-manager.h" #include "service/client-request-state.h" @@ -37,11 +39,10 @@ const std::string OTEL_EXPORTER_FILE = "file"; const std::string SPAN_PROCESSOR_SIMPLE = "simple"; const std::string SPAN_PROCESSOR_BATCH = "batch"; -// Returns true if OpenTelemetry tracing is enabled, false otherwise. -bool otel_trace_enabled(); - // Returns true if an OpenTelemetry trace needs to be created for the given SQL query. -bool should_otel_trace_query(const char* sql); +// The sql string_view will be trimmed of leading whitespace and comments. +bool should_otel_trace_query(std::string_view sql, + const TSessionType::type& session_type); // Initializes the OpenTelemetry tracer with the configuration defined in the coordinator // startup flags (see otel-flags.cc and otel-flags-trace.cc for the list). Does not verify diff --git a/be/src/observe/span-manager.cc b/be/src/observe/span-manager.cc index dbd0ae8d2..f5b21ac15 100644 --- a/be/src/observe/span-manager.cc +++ b/be/src/observe/span-manager.cc @@ -34,9 +34,13 @@ #include "common/compiler-util.h" #include "gen-cpp/Types_types.h" #include "observe/timed-span.h" +#include "runtime/coordinator.h" +#include "runtime/exec-env.h" #include "scheduling/admission-control-client.h" +#include "scheduling/admission-controller.h" #include "service/client-request-state.h" #include "util/debug-util.h" +#include "util/network-util.h" using namespace opentelemetry; using namespace std; @@ -54,6 +58,7 @@ static constexpr char const* ATTR_STATE = "State"; // Names of attributes on both Root and one or more child spans. static constexpr char const* ATTR_CLUSTER_ID = "ClusterId"; +static constexpr char const* ATTR_COORDINATOR = "Coordinator"; static constexpr char const* ATTR_ORIGINAL_QUERY_ID = "OriginalQueryId"; static constexpr char const* ATTR_QUERY_ID = "QueryId"; static constexpr char const* ATTR_QUERY_TYPE = "QueryType"; @@ -155,7 +160,7 @@ SpanManager::~SpanManager() { root_->End(); debug_log_span(root_.get(), "Root", query_id_, false); LOG(INFO) << strings::Substitute("Closed OpenTelemetry trace with trace_id=\"$0\" " - "span_id=\"$1\"", root_->TraceId(), root_->SpanId()); + "span_id=\"$1\" query_id=\"$2\"", root_->TraceId(), root_->SpanId(), query_id_); scope_.reset(); root_.reset(); @@ -176,7 +181,8 @@ void SpanManager::AddChildSpanEvent(const nostd::string_view& name) { LOG(WARNING) << strings::Substitute("Attempted to add event '$0' with no active " "child span trace_id=\"$1\" span_id=\"$2\"\n$3", name.data(), root_->TraceId(), root_->SpanId(), GetStackTrace()); - DCHECK(current_child_) << "Cannot add event when child span is not active."; + DCHECK(current_child_) << strings::Substitute("Cannot add event '$0' when child span " + "is not active.", name.data()); } } // function AddChildSpanEvent @@ -185,7 +191,9 @@ void SpanManager::StartChildSpanInit() { ChildSpanBuilder(ChildSpanType::INIT, { {ATTR_CLUSTER_ID, FLAGS_cluster_id}, - {ATTR_QUERY_ID, query_id_} + {ATTR_QUERY_ID, query_id_}, + {ATTR_COORDINATOR, TNetworkAddressToString( + ExecEnv::GetInstance()->configured_backend_address())} }); { @@ -261,10 +269,10 @@ void SpanManager::StartChildSpanAdmissionControl() { ChildSpanBuilder(ChildSpanType::ADMISSION_CONTROL); } // function StartChildSpanAdmissionControl -void SpanManager::EndChildSpanAdmissionControl() { +void SpanManager::EndChildSpanAdmissionControl(const Status& cause) { lock_guard<mutex> l(child_span_mu_); lock_guard<mutex> crs_lock(*(client_request_state_->lock())); - DoEndChildSpanAdmissionControl(); + DoEndChildSpanAdmissionControl(&cause); } // function EndChildSpanAdmissionControl inline void SpanManager::DoEndChildSpanAdmissionControl(const Status* cause) { @@ -276,23 +284,36 @@ inline void SpanManager::DoEndChildSpanAdmissionControl(const Status* cause) { DCHECK_CHILD_SPAN_TYPE(ChildSpanType::ADMISSION_CONTROL); - bool was_queued = false; - const string* adm_result = nullptr; + bool queued = false; + string adm_result; - if (LIKELY(client_request_state_->summary_profile() != nullptr)) { - adm_result = - client_request_state_->summary_profile()->GetInfoString("Admission result"); + if (LIKELY(client_request_state_->admission_control_client() != nullptr)) { + queued = client_request_state_->admission_control_client()->WasQueued(); } - if (LIKELY(client_request_state_->admission_control_client() != nullptr)) { - was_queued = client_request_state_->admission_control_client()->WasQueued(); + if (LIKELY(client_request_state_->summary_profile() != nullptr)) { + // The case of a query being cancelled while in the admission queue is handled here + // because the summary profile may not be updated by the time this code runs. + if (UNLIKELY(queued && cause != nullptr && cause->code() == TErrorCode::CANCELLED)) { + adm_result = AdmissionController::PROFILE_INFO_VAL_CANCELLED_IN_QUEUE; + } else{ + const string* profile_adm_res = client_request_state_->summary_profile()-> + GetInfoString("Admission result"); + if (UNLIKELY(profile_adm_res == nullptr)) { + // Handle the case where the query closes during admission control before the + // summary profile is updated with the admission result. + adm_result = ""; + } else { + adm_result = *profile_adm_res; + } + } } EndChildSpan( cause, OtelAttributesMap{ - {ATTR_QUEUED, was_queued}, - {ATTR_ADM_RESULT, (adm_result == nullptr ? "" : *adm_result)}, + {ATTR_QUEUED, queued}, + {ATTR_ADM_RESULT, adm_result}, {ATTR_REQUEST_POOL, client_request_state_->request_pool()} }); } // function DoEndChildSpanAdmissionControl @@ -329,11 +350,26 @@ inline void SpanManager::DoEndChildSpanQueryExecution(const Status* cause) { attrs.emplace(ATTR_NUM_DELETED_ROWS, static_cast<int64_t>(0)); attrs.emplace(ATTR_NUM_MODIFIED_ROWS, static_cast<int64_t>(0)); } else { - attrs.emplace(ATTR_NUM_DELETED_ROWS, static_cast<int64_t>(-1)); - attrs.emplace(ATTR_NUM_MODIFIED_ROWS, static_cast<int64_t>(-1)); + int64_t num_deleted_rows = 0; + int64_t num_modified_rows = 0; + + if (client_request_state_->GetCoordinator() != nullptr + && client_request_state_->GetCoordinator()->dml_exec_state() != nullptr) { + num_deleted_rows = + client_request_state_->GetCoordinator()->dml_exec_state()->GetNumDeletedRows(); + num_modified_rows = + client_request_state_->GetCoordinator()->dml_exec_state()->GetNumModifiedRows(); + } + + attrs.emplace(ATTR_NUM_DELETED_ROWS, num_deleted_rows); + attrs.emplace(ATTR_NUM_MODIFIED_ROWS, num_modified_rows); } - attrs.emplace(ATTR_NUM_ROWS_FETCHED, client_request_state_->num_rows_fetched()); + if (client_request_state_->stmt_type() == TStmtType::DDL) { + attrs.emplace(ATTR_NUM_ROWS_FETCHED, 0); + } else { + attrs.emplace(ATTR_NUM_ROWS_FETCHED, client_request_state_->num_rows_fetched()); + } EndChildSpan(cause, attrs); } // function DoEndChildSpanQueryExecution @@ -365,42 +401,48 @@ void SpanManager::EndChildSpanClose() { DCHECK_CHILD_SPAN_TYPE(ChildSpanType::CLOSE); lock_guard<mutex> l(child_span_mu_); - lock_guard<mutex> crs_lock(*(client_request_state_->lock())); - EndChildSpan(); - // Set all root span attributes to avoid dereferencing the client_request_state_ in the - // dtor (as the dtor is invoked when client_request_state_ is destroyed). - root_->SetAttribute(ATTR_QUERY_TYPE, - to_string(client_request_state_->exec_request().stmt_type)); + root_->SetAttribute(ATTR_COORDINATOR, TNetworkAddressToString( + ExecEnv::GetInstance()->configured_backend_address())); - if (client_request_state_->query_status().ok()) { - root_->SetAttributeEmpty(ATTR_ERROR_MESSAGE); - } else { - string error_msg = client_request_state_->query_status().msg().msg(); + { + lock_guard<mutex> crs_lock(*(client_request_state_->lock())); + EndChildSpan(); - for (const auto& detail : client_request_state_->query_status().msg().details()) { - error_msg += "\n" + detail; - } + // Set all root span attributes to avoid dereferencing the client_request_state_ in + // the dtor (as the dtor is invoked when client_request_state_ is destroyed). + root_->SetAttribute(ATTR_QUERY_TYPE, + to_string(client_request_state_->exec_request().stmt_type)); - root_->SetAttribute(ATTR_ERROR_MESSAGE, error_msg); - } + if (client_request_state_->query_status().ok()) { + root_->SetAttributeEmpty(ATTR_ERROR_MESSAGE); + } else { + string error_msg = client_request_state_->query_status().msg().msg(); - if (UNLIKELY(client_request_state_->WasRetried())) { - root_->SetAttribute(ATTR_STATE, ClientRequestState::RetryStateToString( - client_request_state_->retry_state())); - root_->SetAttribute(ATTR_RETRIED_QUERY_ID, - PrintId(client_request_state_->retried_id())); - } else { - root_->SetAttribute(ATTR_STATE, - ClientRequestState::ExecStateToString(client_request_state_->exec_state())); - root_->SetAttributeEmpty(ATTR_RETRIED_QUERY_ID); - } + for (const auto& detail : client_request_state_->query_status().msg().details()) { + error_msg += "\n" + detail; + } - if (UNLIKELY(client_request_state_->IsRetriedQuery())) { - root_->SetAttribute(ATTR_ORIGINAL_QUERY_ID, - PrintId(client_request_state_->original_id())); - } else { - root_->SetAttributeEmpty(ATTR_ORIGINAL_QUERY_ID); + root_->SetAttribute(ATTR_ERROR_MESSAGE, error_msg); + } + + if (UNLIKELY(client_request_state_->WasRetried())) { + root_->SetAttribute(ATTR_STATE, ClientRequestState::RetryStateToString( + client_request_state_->retry_state())); + root_->SetAttribute(ATTR_RETRIED_QUERY_ID, + PrintId(client_request_state_->retried_id())); + } else { + root_->SetAttribute(ATTR_STATE, + ClientRequestState::ExecStateToString(client_request_state_->exec_state())); + root_->SetAttributeEmpty(ATTR_RETRIED_QUERY_ID); + } + + if (UNLIKELY(client_request_state_->IsRetriedQuery())) { + root_->SetAttribute(ATTR_ORIGINAL_QUERY_ID, + PrintId(client_request_state_->original_id())); + } else { + root_->SetAttributeEmpty(ATTR_ORIGINAL_QUERY_ID); + } } } // function EndChildSpanClose @@ -408,14 +450,17 @@ inline void SpanManager::ChildSpanBuilder(const ChildSpanType& span_type, OtelAttributesMap&& additional_attributes, bool running) { DCHECK(client_request_state_ != nullptr) << "Cannot start child span without a valid " "client request state."; - DCHECK(span_type != ChildSpanType::NONE) << "Span type cannot be " << span_type << "."; + DCHECK(span_type != ChildSpanType::NONE) << strings::Substitute("Span type cannot be " + "'$0'.", to_string(span_type)); if (UNLIKELY(current_child_)) { LOG(WARNING) << strings::Substitute("Attempted to start child span '$0' while " "another child span '$1' is still active trace_id=\"$2\" span_id=\"$3\"\n$4", to_string(span_type), to_string(child_span_type_), root_->TraceId(), root_->SpanId(), GetStackTrace()); - DCHECK(false) << "Should not start a new child span while one is already active."; + DCHECK(false) << strings::Substitute("Should not start child span '$0' when child " + "span '$1' is already active.", to_string(span_type), + to_string(child_span_type_)); { lock_guard<mutex> crs_lock(*(client_request_state_->lock())); diff --git a/be/src/observe/span-manager.h b/be/src/observe/span-manager.h index b47b0c161..643e404b0 100644 --- a/be/src/observe/span-manager.h +++ b/be/src/observe/span-manager.h @@ -92,7 +92,7 @@ public: // client_request_state_->lock(). void EndChildSpanInit(); void EndChildSpanSubmitted(); - void EndChildSpanAdmissionControl(); + void EndChildSpanAdmissionControl(const Status& cause); void EndChildSpanQueryExecution(); void EndChildSpanClose(); diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index 1f9d0d419..fe338f6f9 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -533,9 +533,6 @@ Status Coordinator::StartBackendExec() { query_events_->MarkEvent(Substitute("Ready to start on $0 backends", num_backends)); parent_query_driver_->SetExecTimeLimit(parent_request_state_); - if (parent_request_state_->otel_trace_query()) { - parent_request_state_->otel_span_manager()->StartChildSpanQueryExecution(); - } // Serialize the TQueryCtx once and pass it to each backend. The serialized buffer must // stay valid until WaitOnExecRpcs() has returned. ThriftSerializer serializer(true); diff --git a/be/src/runtime/dml-exec-state.cc b/be/src/runtime/dml-exec-state.cc index d54ff27a4..6c5024338 100644 --- a/be/src/runtime/dml-exec-state.cc +++ b/be/src/runtime/dml-exec-state.cc @@ -160,6 +160,15 @@ int64_t DmlExecState::GetNumModifiedRows() { return result; } +int64_t DmlExecState::GetNumDeletedRows() { + lock_guard<mutex> l(lock_); + int64_t result = 0; + for (const PartitionStatusMap::value_type& p : per_partition_status_) { + result += p.second.num_deleted_rows(); + } + return result; +} + bool DmlExecState::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update, const TFinalizeParams& finalize_params) { lock_guard<mutex> l(lock_); diff --git a/be/src/runtime/dml-exec-state.h b/be/src/runtime/dml-exec-state.h index aa6439edf..e82a1bb09 100644 --- a/be/src/runtime/dml-exec-state.h +++ b/be/src/runtime/dml-exec-state.h @@ -107,6 +107,9 @@ class DmlExecState { /// Return the total number of modified rows across all partitions. int64_t GetNumModifiedRows(); + /// Return the total number of deleted rows across all partitions. + int64_t GetNumDeletedRows(); + /// Populates 'catalog_update' with PartitionStatusMap data. /// Returns true if a catalog update is required, false otherwise. bool PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update, diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc index 0e32e1ea8..8370db0f3 100644 --- a/be/src/runtime/exec-env.cc +++ b/be/src/runtime/exec-env.cc @@ -160,6 +160,7 @@ DECLARE_string(ssl_client_ca_certificate); DECLARE_string(ai_api_key_jceks_secret); DECLARE_string(ai_endpoint); DECLARE_string(ai_additional_platforms); +DECLARE_bool(otel_trace_enabled); DEFINE_int32(backend_client_connection_num_retries, 3, "Retry backend connections."); // When network is unstable, TCP will retry and sending could take longer time. @@ -347,7 +348,7 @@ Status ExecEnv::Init() { LOG(INFO) << "Initializing impalad with backend uuid: " << PrintId(backend_id_); // Initialize OTel - if (FLAGS_is_coordinator && otel_trace_enabled()) { + if (FLAGS_is_coordinator && FLAGS_otel_trace_enabled) { RETURN_IF_ERROR(init_otel_tracer()); } diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc index 2cad22c66..eb1c53c9e 100644 --- a/be/src/service/client-request-state.cc +++ b/be/src/service/client-request-state.cc @@ -84,6 +84,7 @@ DECLARE_bool(abort_on_failed_audit_event); DECLARE_bool(abort_on_failed_lineage_event); DECLARE_int32(krpc_port); DECLARE_int64(max_result_cache_size); +DECLARE_bool(otel_trace_enabled); DECLARE_bool(use_local_catalog); namespace impala { @@ -124,7 +125,8 @@ ClientRequestState::ClientRequestState(const TQueryCtx& query_ctx, Frontend* fro fetch_rows_timeout_us_(MICROS_PER_MILLI * query_options().fetch_rows_timeout_ms), parent_driver_(query_driver) { - if (otel_trace_enabled() && should_otel_trace_query(sql_stmt().c_str())) { + if (FLAGS_otel_trace_enabled && should_otel_trace_query(sql_stmt(), + query_ctx.session.session_type)) { // initialize OpenTelemetry for this query VLOG(2) << "Initializing OpenTelemetry for query " << PrintId(query_id()); otel_span_manager_ = build_span_manager(this); @@ -320,6 +322,9 @@ Status ClientRequestState::Exec() { } case TStmtType::DDL: { DCHECK(exec_req.__isset.catalog_op_request); + if (otel_trace_query()) { + otel_span_manager_->StartChildSpanQueryExecution(); + } LOG_AND_RETURN_IF_ERROR(ExecDdlRequest()); break; } @@ -653,7 +658,8 @@ void ClientRequestState::FinishExecQueryOrDmlRequest() { DCHECK(exec_req.__isset.query_exec_request); UniqueIdPB query_id_pb; TUniqueIdToUniqueIdPB(query_id(), &query_id_pb); - if (otel_trace_query()) { + + if (otel_trace_query() && !IsCTAS()) { otel_span_manager_->StartChildSpanAdmissionControl(); } @@ -663,15 +669,17 @@ void ClientRequestState::FinishExecQueryOrDmlRequest() { summary_profile_, blacklisted_executor_addresses_}, query_events_, &schedule_, &wait_start_time_ms_, &wait_end_time_ms_, otel_span_manager_.get()); + + if (otel_trace_query() && !IsCTAS()) { + otel_span_manager_->EndChildSpanAdmissionControl(admit_status); + otel_span_manager_->StartChildSpanQueryExecution(); + } + { lock_guard<mutex> l(lock_); if (!UpdateQueryStatus(admit_status).ok()) return; } - if (otel_trace_query()) { - otel_span_manager_->EndChildSpanAdmissionControl(); - } - DCHECK(schedule_.get() != nullptr); // Note that we don't need to check for cancellation between admission and query // startup. The query was not cancelled right before being admitted and the window here @@ -718,7 +726,6 @@ void ClientRequestState::FinishExecQueryOrDmlRequest() { } Status ClientRequestState::ExecDdlRequestImplSync() { - if (catalog_op_type() != TCatalogOpType::DDL && catalog_op_type() != TCatalogOpType::RESET_METADATA) { Status status = ExecLocalCatalogOp(exec_request().catalog_op_request); @@ -1211,6 +1218,9 @@ void ClientRequestState::Finalize(const Status* cause) { if (otel_trace_query()) { otel_span_manager_->AddChildSpanEvent("QueryUnregistered"); otel_span_manager_->EndChildSpanClose(); + + // End the root span and thus the entire trace is also ended. + otel_span_manager_.reset(); } } @@ -1268,7 +1278,7 @@ void ClientRequestState::Wait() { lock_guard<mutex> l(lock_); if (returns_result_set()) { query_events()->MarkEvent("Rows available"); - if (otel_trace_query()) { + if (LIKELY(status.code() != TErrorCode::CANCELLED) && otel_trace_query()) { otel_span_manager_->AddChildSpanEvent("RowsAvailable"); } } else { diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h index 7ffdb374f..e327907e3 100644 --- a/be/src/service/client-request-state.h +++ b/be/src/service/client-request-state.h @@ -826,6 +826,9 @@ class ClientRequestState { /// remote. std::unique_ptr<AdmissionControlClient> admission_control_client_; + /// SpanManager instance for this query. + std::shared_ptr<SpanManager> otel_span_manager_; + /// Executes a local catalog operation (an operation that does not need to execute /// against the catalog service). Includes USE, SHOW, DESCRIBE, and EXPLAIN statements. Status ExecLocalCatalogOp(const TCatalogOpRequest& catalog_op) WARN_UNUSED_RESULT; @@ -1016,7 +1019,9 @@ class ClientRequestState { Status TryKillQueryRemotely( const TUniqueId& query_id, const KillQueryRequestPB& request); - /// SpanManager instance for this query. - std::shared_ptr<SpanManager> otel_span_manager_; + bool IsCTAS() const { + return catalog_op_type() == TCatalogOpType::DDL + && ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT; + } }; } diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index b4fdb61f2..23be7ee4b 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -143,6 +143,7 @@ DECLARE_bool(enable_ldap_auth); DECLARE_bool(enable_workload_mgmt); DECLARE_bool(gen_experimental_profile); DECLARE_bool(use_local_catalog); +DECLARE_bool(otel_trace_enabled); DEFINE_int32(beeswax_port, 21000, "port on which Beeswax client requests are served." "If 0 or less, the Beeswax server is not started. This interface is deprecated and " @@ -3618,7 +3619,7 @@ bool ImpalaServer::CancelQueriesForGracefulShutdown() { // Clean up temporary files if needed. ExecEnv::GetInstance()->tmp_file_mgr()->CleanupAtShutdown(); - if (FLAGS_is_coordinator && otel_trace_enabled()) { + if (FLAGS_is_coordinator && FLAGS_otel_trace_enabled) { shutdown_otel_tracer(); } diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py index 109ac15f6..ef57e8da7 100644 --- a/tests/common/custom_cluster_test_suite.py +++ b/tests/common/custom_cluster_test_suite.py @@ -672,7 +672,8 @@ class CustomClusterTestSuite(ImpalaTestSuite): Returns None if no impalad was started.""" return cls.cluster.impalads[0].service if cls.cluster.impalads else None - def query_id_from_ui(self, section, match_func=None, match_query=None, coord_idx=0): + def query_id_from_ui(self, section, match_func=None, match_query=None, coord_idx=0, + not_found_ok=False): """ Calls to the debug UI's queries page and loops over all queries in the specified section calling the provided func for each query, Returns the string id of the @@ -689,6 +690,8 @@ class CustomClusterTestSuite(ImpalaTestSuite): is used instead of match_func. coord_idx: Index of the Impalad to use as the coordinator. This is used to determine which impalad's UI to query. + not_found_ok: If True, returns None when no matching query is found. If False, + fails an assert when no matching query is found. Returns: String of the query id of the first matching query or None if no query matches. @@ -712,7 +715,11 @@ class CustomClusterTestSuite(ImpalaTestSuite): query_id = query['query_id'] return query_id, service.read_query_profile_page(query_id) - assert False, "No matching query found in section '{}'".format(section) + if not_found_ok: + return None, None + else: + assert False, "No matching query found in section '{}'".format(section) + def query_profile_from_ui(self, query_id, coord_idx=0): """ diff --git a/tests/common/file_utils.py b/tests/common/file_utils.py index 2f0b70a7c..f0744024d 100644 --- a/tests/common/file_utils.py +++ b/tests/common/file_utils.py @@ -202,10 +202,15 @@ def cleanup_tmp_test_dir(dir_path): shutil.rmtree(dir_path, ignore_errors=True) -def count_lines(file_path): +def count_lines(file_path, missing_ok=False): """Counts the number of lines in the file located at 'file_path'.""" - with open(file_path, 'rb') as file: - return sum(1 for _ in file.readlines()) + try: + with open(file_path, 'rb') as file: + return sum(1 for _ in file.readlines()) + except IOError: + if missing_ok: + return 0 + raise def file_ends_with_char(file_path, char="\n"): @@ -240,7 +245,7 @@ def file_ends_with_char(file_path, char="\n"): def wait_for_file_line_count(file_path, expected_line_count, max_attempts=3, - sleep_time_s=1, backoff=2, last_char="\n"): + sleep_time_s=1, backoff=2, last_char="\n", exact_match=False): """ Waits until the given file contains the expected number of lines or until the timeout is reached. Fails an assert if the timeout is reached before the expected number of lines @@ -254,6 +259,9 @@ def wait_for_file_line_count(file_path, expected_line_count, max_attempts=3, backoff: Backoff factor for exponential backoff (default is 2). last_char: Optional character that the file should end with (default is newline). If None, the file is not checked for a specific ending character. + exact_match: If True, the function will assert that the file has exactly the expected + number of lines. If False, it will check if the file has at least the + expected number of lines. Raises: AssertionError: If the file does not reach the expected line count within the given @@ -261,7 +269,11 @@ def wait_for_file_line_count(file_path, expected_line_count, max_attempts=3, character (if provided). """ def assert_trace_file_lines(): - ret = count_lines(file_path) == expected_line_count + if exact_match: + ret = count_lines(file_path) == expected_line_count + else: + ret = count_lines(file_path) >= expected_line_count + if last_char is not None: ret = ret and file_ends_with_char(file_path, last_char) diff --git a/tests/custom_cluster/test_otel_trace.py b/tests/custom_cluster/test_otel_trace.py index 25647011c..e01108b60 100644 --- a/tests/custom_cluster/test_otel_trace.py +++ b/tests/custom_cluster/test_otel_trace.py @@ -17,213 +17,57 @@ from __future__ import absolute_import, division, print_function +from threading import Thread +from time import sleep + +from impala.error import HiveServer2Error from tests.common.custom_cluster_test_suite import CustomClusterTestSuite -from tests.common.file_utils import wait_for_file_line_count -from tests.common.impala_connection import ERROR, RUNNING, FINISHED, INITIALIZED +from tests.common.file_utils import wait_for_file_line_count, count_lines +from tests.common.impala_connection import ERROR, RUNNING, FINISHED, INITIALIZED, PENDING +from tests.common.test_vector import BEESWAX, HS2, ImpalaTestDimension from tests.util.otel_trace import parse_trace_file, ATTR_VAL_TYPE_STRING, \ ATTR_VAL_TYPE_INT, ATTR_VAL_TYPE_BOOL from tests.util.query_profile_util import parse_db_user, parse_session_id, parse_sql, \ parse_query_type, parse_query_status, parse_impala_query_state, parse_query_id, \ parse_retry_status, parse_original_query_id, parse_retried_query_id, \ - parse_num_rows_fetched, parse_admission_result, parse_default_db + parse_num_rows_fetched, parse_admission_result, parse_default_db, \ + parse_num_modified_rows, parse_num_deleted_rows, parse_coordinator from tests.util.retry import retry +OUT_DIR = "out_dir_traces" +TRACE_FILE = "export-trace.jsonl" +TRACE_FLAGS = "--otel_trace_enabled=true --otel_trace_exporter=file " \ + "--otel_file_flush_interval_ms=500 " \ + "--otel_file_pattern={out_dir_traces}/" + TRACE_FILE class TestOtelTrace(CustomClusterTestSuite): - """Tests that exercise OpenTelemetry tracing behavior.""" - - OUT_DIR = "out_dir" - TRACE_FILE = "export-trace.jsonl" def setup_method(self, method): super(TestOtelTrace, self).setup_method(method) + self.assert_impalad_log_contains("INFO", "join Impala Service pool") + self.trace_file_path = "{}/{}".format(self.get_tmp_dir(OUT_DIR), TRACE_FILE) + self.trace_file_count = count_lines(self.trace_file_path, True) - @CustomClusterTestSuite.with_args( - impalad_args="-v=2 --cluster_id=otel_trace --otel_trace_enabled=true " - "--otel_trace_exporter=file --otel_file_flush_interval_ms=500 " - "--otel_file_pattern={out_dir}/" + TRACE_FILE, - cluster_size=1, tmp_dir_placeholders=[OUT_DIR], disable_log_buffering=True) - def test_query_success(self): - """Test that OpenTelemetry tracing is working by running a simple query and - checking that the trace file is created and contains spans.""" - query = "select count(*) from functional.alltypes" - result = self.execute_query_expect_success(self.client, query) - - self.__assert_trace(result.query_id, result.runtime_profile, "otel_trace") - - @CustomClusterTestSuite.with_args( - impalad_args="-v=2 --cluster_id=test_invalid_sql " - "--otel_trace_enabled=true --otel_trace_exporter=file " - "--otel_file_flush_interval_ms=500 " - "--otel_file_pattern={out_dir}/" + TRACE_FILE, - cluster_size=1, tmp_dir_placeholders=[OUT_DIR], disable_log_buffering=True) - def test_invalid_sql(self): - query = "select * from functional.alltypes where field_does_not_exist=1" - self.execute_query_expect_failure(self.client, query) - - # Retrieve the query id and runtime profile from the UI since the query execute call - # only returns a HiveServer2Error object and not the query id or profile. - query_id, profile = self.query_id_from_ui(section="completed_queries", - match_query=query) - - self.__assert_trace( - query_id=query_id, - query_profile=profile, - cluster_id="test_invalid_sql", - trace_cnt=1, - err_span="Planning", - missing_spans=["AdmissionControl", "QueryExecution"]) - - @CustomClusterTestSuite.with_args( - impalad_args="-v=2 --cluster_id=test_retry_select_success " - "--otel_trace_enabled=true --otel_trace_exporter=file " - "--otel_file_flush_interval_ms=500 " - "--otel_file_pattern={out_dir}/" + TRACE_FILE, - cluster_size=3, num_exclusive_coordinators=1, tmp_dir_placeholders=[OUT_DIR], - disable_log_buffering=True, - statestored_args="-statestore_heartbeat_frequency_ms=60000") - def test_retry_select_success(self): - query = "select count(*) from tpch_parquet.lineitem where l_orderkey < 50" - self.cluster.impalads[1].kill() - - result = self.execute_query_expect_success(self.client, query, - {"RETRY_FAILED_QUERIES": True}) - retried_query_id = parse_query_id(result.runtime_profile) - orig_query_profile = self.query_profile_from_ui(result.query_id) - - # Assert the trace from the original query. - self.__assert_trace( - query_id=result.query_id, - query_profile=orig_query_profile, - cluster_id="test_retry_select_success", - trace_cnt=2, - err_span="QueryExecution") - - # Assert the trace from the retried query. - self.__assert_trace( - query_id=retried_query_id, - query_profile=result.runtime_profile, - cluster_id="test_retry_select_success", - trace_cnt=2, - missing_spans=["Submitted", "Planning"]) - - @CustomClusterTestSuite.with_args( - impalad_args="-v=2 --cluster_id=test_retry_select_failed " - "--otel_trace_enabled=true --otel_trace_exporter=file " - "--otel_file_flush_interval_ms=500 " - "--otel_file_pattern={out_dir}/" + TRACE_FILE, - cluster_size=3, num_exclusive_coordinators=1, tmp_dir_placeholders=[OUT_DIR], - disable_log_buffering=True, - statestored_args="-statestore_heartbeat_frequency_ms=1000") - def test_retry_select_failed(self): - # Shuffle heavy query. - query = "select * from tpch.lineitem t1, tpch.lineitem t2 where " \ - "t1.l_orderkey = t2.l_orderkey order by t1.l_orderkey, t2.l_orderkey limit 1" - - with self.create_impala_client() as client: - client.set_configuration({"retry_failed_queries": "true"}) - - # Launch a query, it should be retried. - handle = client.execute_async(query) - client.wait_for_impala_state(handle, RUNNING, 60) - query_id = client.handle_id(handle) - self.cluster.impalads[1].kill() - - # Wait until the retry is running. - def __wait_until_retried(): - return parse_retry_status(self.query_profile_from_ui(query_id)) == "RETRIED" - retry(__wait_until_retried, 60, 1, 1, False) - - # Kill another impalad so that another retry is attempted. - self.cluster.impalads[2].kill() - - # Wait until the query fails. - client.wait_for_impala_state(handle, ERROR, 60) - - retried_query_profile = client.get_runtime_profile(handle) - retried_query_id = parse_query_id(retried_query_profile) - orig_query_profile = self.query_profile_from_ui(query_id) - - client.close_query(handle) - - # Assert the trace from the original query. - self.__assert_trace( - query_id=query_id, - query_profile=orig_query_profile, - cluster_id="test_retry_select_failed", - trace_cnt=3, - err_span="QueryExecution") - - # Assert the trace from the retried query. - self.__assert_trace( - query_id=retried_query_id, - query_profile=retried_query_profile, - cluster_id="test_retry_select_failed", - trace_cnt=3, - err_span="QueryExecution", - missing_spans=["Submitted", "Planning"]) - - @CustomClusterTestSuite.with_args( - impalad_args="-v=2 --cluster_id=test_select_queued " - "--otel_trace_enabled=true --otel_trace_exporter=file " - "--otel_file_flush_interval_ms=500 " - "--otel_file_pattern={out_dir}/" + TRACE_FILE + " " - "--default_pool_max_requests=1", - cluster_size=1, tmp_dir_placeholders=[OUT_DIR], disable_log_buffering=True) - def test_select_queued(self): - # Launch two queries, the second will be queued until the first completes. - query = "select * from functional.alltypes where id = 1" - handle1 = self.client.execute_async("{} and int_col = sleep(5000)".format(query)) - self.client.wait_for_impala_state(handle1, RUNNING, 60) - query_id_1 = self.client.handle_id(handle1) - - handle2 = self.client.execute_async(query) - query_id_2 = self.client.handle_id(handle2) - - self.client.wait_for_impala_state(handle1, FINISHED, 60) - query_profile_1 = self.client.get_runtime_profile(handle1) - self.client.close_query(handle1) - - self.client.wait_for_impala_state(handle2, FINISHED, 60) - query_profile_2 = self.client.get_runtime_profile(handle2) - - self.client.close_query(handle2) - - self.__assert_trace( - query_id=query_id_1, - query_profile=query_profile_1, - cluster_id="test_select_queued", - trace_cnt=3) - - self.__assert_trace( - query_id=query_id_2, - query_profile=query_profile_2, - cluster_id="test_select_queued", - trace_cnt=3) - - ###################### - # Helper functions. - ###################### - def __assert_trace(self, query_id, query_profile, cluster_id, trace_cnt=1, err_span="", - missing_spans=[]): + def assert_trace(self, query_id, query_profile, cluster_id, trace_cnt=1, err_span="", + missing_spans=[], async_close=False, exact_trace_cnt=False): # Parse common values needed in multiple asserts. session_id = parse_session_id(query_profile) db_user = parse_db_user(query_profile) # Wait until all spans are written to the trace file. - trace_file_path = "{}/{}".format(self.get_tmp_dir(self.OUT_DIR), self.TRACE_FILE) wait_for_file_line_count( - file_path=trace_file_path, - expected_line_count=trace_cnt, + file_path=self.trace_file_path, + expected_line_count=trace_cnt + self.trace_file_count, max_attempts=60, sleep_time_s=1, - backoff=1) + backoff=1, + exact_match=exact_trace_cnt) # Remove missing spans from the expected span count. expected_span_count = 6 - len(missing_spans) # Parse the trace json files to get the trace for the query. - trace = parse_trace_file(trace_file_path, query_id) + trace = parse_trace_file(self.trace_file_path, query_id) self.__assert_trace_common(trace, expected_span_count) # Retrieve the query status which contains error messages if the query failed. @@ -246,10 +90,18 @@ class TestOtelTrace(CustomClusterTestSuite): # Error message should follow on all spans after the errored span in_error = False + # Retrieve the coordinator from the query profile. + coordinator = parse_coordinator(query_profile) + + # Parse the query type from the query profile. + query_type = parse_query_type(query_profile) + if query_type == "N/A": + query_type = "UNKNOWN" + # Assert root span. root_span_id = self.__assert_rootspan_attrs(trace.root_span, query_id, session_id, cluster_id, db_user, "default-pool", impala_query_state, query_status, - original_query_id, retried_query_id) + original_query_id, retried_query_id, coordinator) # Assert Init span. if "Init" not in missing_spans: @@ -259,7 +111,7 @@ class TestOtelTrace(CustomClusterTestSuite): in_error = True self.__assert_initspan_attrs(trace.child_spans, root_span_id, query_id, session_id, cluster_id, db_user, "default-pool", parse_default_db(query_profile), - parse_sql(query_profile), original_query_id) + parse_sql(query_profile).replace('\n', ' '), original_query_id, coordinator) # Assert Submitted span. if "Submitted" not in missing_spans: @@ -277,16 +129,19 @@ class TestOtelTrace(CustomClusterTestSuite): span_err_msg = query_status status = ERROR in_error = True - query_type = parse_query_type(query_profile) - if query_type == "N/A": - query_type = "UNKNOWN" self.__assert_planningspan_attrs(trace.child_spans, root_span_id, query_id, query_type, span_err_msg, status) # Assert AdmissionControl span. if "AdmissionControl" not in missing_spans: + status = PENDING + span_err_msg = "" + if err_span == "AdmissionControl" or in_error: + span_err_msg = query_status + status = ERROR + in_error = True self.__assert_admissioncontrol_attrs(trace.child_spans, root_span_id, query_id, - "default-pool", parse_admission_result(query_profile)) + "default-pool", parse_admission_result(query_profile), span_err_msg, status) # Assert QueryExecution span. if "QueryExecution" not in missing_spans: @@ -304,7 +159,7 @@ class TestOtelTrace(CustomClusterTestSuite): span_err_msg = query_status in_error = True self.__assert_close_attrs(trace.child_spans, root_span_id, query_id, span_err_msg, - parse_impala_query_state(query_profile)) + parse_impala_query_state(query_profile), async_close) def __assert_trace_common(self, trace, expected_child_spans_count): """ @@ -447,14 +302,26 @@ class TestOtelTrace(CustomClusterTestSuite): "of type '{}', actual: '{}'".format(span_name, expected_key, expected_type, val.get_type()) + def __assert_span_events(self, span, expected_events=[]): + """ + Helper function to assert that a span contains the expected span events. + """ + assert len(expected_events) == len(span.events), "Span '{}' expected to have " \ + "exactly {} events, actual: {}".format(span.name, len(expected_events), + len(span.events)) + + for event in expected_events: + assert event in span.events, "Expected '{}' event on span '{}' but " \ + "no such events was found.".format(event, span.name) + def __assert_rootspan_attrs(self, span, query_id, session_id, cluster_id, user_name, - request_pool, state, err_msg, original_query_id, retried_query_id): + request_pool, state, err_msg, original_query_id, retried_query_id, coordinator): """ Helper function that asserts the common attributes in the root span. """ root_span_id, _ = self.__find_span_log("Root", query_id) - self.__assert_scopespan_common(span, query_id, True, "Root", 13, "", None, err_msg) + self.__assert_scopespan_common(span, query_id, True, "Root", 14, "", None, err_msg) self.__assert_attr(span.name, span.attributes, "QueryId", query_id) self.__assert_attr(span.name, span.attributes, "SessionId", session_id) @@ -464,11 +331,12 @@ class TestOtelTrace(CustomClusterTestSuite): self.__assert_attr(span.name, span.attributes, "State", state) self.__assert_attr(span.name, span.attributes, "OriginalQueryId", original_query_id) self.__assert_attr(span.name, span.attributes, "RetriedQueryId", retried_query_id) + self.__assert_attr(span.name, span.attributes, "Coordinator", coordinator) return root_span_id def __assert_initspan_attrs(self, spans, root_span_id, query_id, session_id, cluster_id, - user_name, request_pool, default_db, query_string, original_query_id): + user_name, request_pool, default_db, query_string, original_query_id, coordinator): """ Helper function that asserts the common and span-specific attributes in the init span. @@ -477,7 +345,7 @@ class TestOtelTrace(CustomClusterTestSuite): # Locate the init span and assert. init_span = self.__find_span(spans, "Init", query_id) - self.__assert_scopespan_common(init_span, query_id, False, "Init", 8, INITIALIZED, + self.__assert_scopespan_common(init_span, query_id, False, "Init", 9, INITIALIZED, root_span_id) self.__assert_attr(init_span.name, init_span.attributes, "QueryId", query_id) @@ -489,6 +357,9 @@ class TestOtelTrace(CustomClusterTestSuite): self.__assert_attr(init_span.name, init_span.attributes, "QueryString", query_string) self.__assert_attr(init_span.name, init_span.attributes, "OriginalQueryId", original_query_id) + self.__assert_attr(init_span.name, init_span.attributes, "Coordinator", coordinator) + + self.__assert_span_events(init_span) def __assert_submittedspan_attrs(self, spans, root_span_id, query_id): """ @@ -499,6 +370,8 @@ class TestOtelTrace(CustomClusterTestSuite): self.__assert_scopespan_common(submitted_span, query_id, False, "Submitted", 0, INITIALIZED, root_span_id) + self.__assert_span_events(submitted_span) + def __assert_planningspan_attrs(self, spans, root_span_id, query_id, query_type, err_msg="", status=INITIALIZED): """ @@ -512,14 +385,17 @@ class TestOtelTrace(CustomClusterTestSuite): self.__assert_attr(planning_span.name, planning_span.attributes, "QueryType", query_type) + self.__assert_span_events(planning_span) + def __assert_admissioncontrol_attrs(self, spans, root_span_id, query_id, request_pool, - adm_result, err_msg="", status="PENDING"): + adm_result, err_msg, status): """ Helper function that asserts the common and span-specific attributes in the admission control span. """ - queued = False if adm_result == "Admitted immediately" else True + queued = False if adm_result == "Admitted immediately" \ + or adm_result == "Admitted as a trivial query" else True adm_ctrl_span = self.__find_span(spans, "AdmissionControl", query_id) self.__assert_scopespan_common(adm_ctrl_span, query_id, False, "AdmissionControl", 3, @@ -531,6 +407,11 @@ class TestOtelTrace(CustomClusterTestSuite): self.__assert_attr(adm_ctrl_span.name, adm_ctrl_span.attributes, "RequestPool", request_pool) + if queued: + self.__assert_span_events(adm_ctrl_span, ["Queued"]) + else: + self.__assert_span_events(adm_ctrl_span) + def __assert_query_exec_attrs(self, spans, query_profile, root_span_id, query_id, err_msg, status): """ @@ -541,15 +422,17 @@ class TestOtelTrace(CustomClusterTestSuite): query_exec_span = self.__find_span(spans, "QueryExecution", query_id) self.__assert_scopespan_common(query_exec_span, query_id, False, "QueryExecution", 3, status, root_span_id, err_msg) - self.__assert_attr(query_exec_span.name, query_exec_span.attributes, "NumDeletedRows", - 0, "intValue") self.__assert_attr(query_exec_span.name, query_exec_span.attributes, - "NumModifiedRows", 0, "intValue") + "NumModifiedRows", parse_num_modified_rows(query_profile), "intValue") + self.__assert_attr(query_exec_span.name, query_exec_span.attributes, "NumDeletedRows", + parse_num_deleted_rows(query_profile), "intValue") self.__assert_attr(query_exec_span.name, query_exec_span.attributes, "NumRowsFetched", parse_num_rows_fetched(query_profile), "intValue") - def __assert_close_attrs(self, spans, root_span_id, query_id, - err_msg="", status=FINISHED): + # TODO: IMPALA-14334 - Assert QueryExecution span events + + def __assert_close_attrs(self, spans, root_span_id, query_id, err_msg, status, + async_close): """ Helper function that asserts the common and span-specific attributes in the close span. @@ -559,6 +442,11 @@ class TestOtelTrace(CustomClusterTestSuite): self.__assert_scopespan_common(close_span, query_id, False, "Close", 0, status, root_span_id, err_msg) + expected_events = ["QueryUnregistered"] + if async_close and "ReleasedAdmissionControlResources" in close_span.events: + expected_events.append("ReleasedAdmissionControlResources") + self.__assert_span_events(close_span, expected_events) + def __find_span(self, spans, name, query_id): """ Helper function to find a span by name in a list of OtelSpan objects. @@ -569,3 +457,720 @@ class TestOtelTrace(CustomClusterTestSuite): return s assert False, "Span '{}' not found for query '{}'".format(name, query_id) + + +@CustomClusterTestSuite.with_args( + impalad_args="-v=2 --cluster_id=select_dml {}".format(TRACE_FLAGS), + cluster_size=1, tmp_dir_placeholders=[OUT_DIR], disable_log_buffering=True) +class TestOtelTraceSelectsDMLs(TestOtelTrace): + """Tests that exercise OpenTelemetry tracing behavior for select and dml queries.""" + + def setup_method(self, method): + super(TestOtelTraceSelectsDMLs, self).setup_method(method) + self.client.clear_configuration() + + def test_beeswax_no_trace(self): + """Since tracing Beeswax queries is not supported, tests that no trace is created + when executing a query using the Beeswax protocol.""" + with self.create_impala_client(protocol=BEESWAX) as client: + self.execute_query_expect_success(client, + "SELECT COUNT(*) FROM functional.alltypes") + sleep(2) # Wait for any spans to be flushed to the trace file. + + line_count = count_lines(self.trace_file_path, True) + assert line_count - self.trace_file_count == 0 + + def test_query_success(self): + """Test that OpenTelemetry tracing is working by running a simple query and + checking that the trace file is created and contains spans.""" + result = self.execute_query_expect_success(self.client, + "SELECT COUNT(*) FROM functional.alltypes") + + self.assert_trace( + query_id=result.query_id, + query_profile=result.runtime_profile, + cluster_id="select_dml") + + def test_invalid_sql(self): + query = "SELECT * FROM functional.alltypes WHERE field_does_not_exist=1" + self.execute_query_expect_failure(self.client, query) + + # Retrieve the query id and runtime profile from the UI since the query execute call + # only returns a HiveServer2Error object and not the query id or profile. + query_id, profile = self.query_id_from_ui(section="completed_queries", + match_query=query) + + self.assert_trace( + query_id=query_id, + query_profile=profile, + cluster_id="select_dml", + trace_cnt=1, + err_span="Planning", + missing_spans=["AdmissionControl", "QueryExecution"]) + + def test_cte_query_success(self): + """Test that OpenTelemetry tracing is working by running a simple query that uses a + common table expression and checking that the trace file is created and contains + expected spans with the expected attributes.""" + result = self.execute_query_expect_success(self.client, + "WITH alltypes_tiny1 AS (SELECT * FROM functional.alltypes WHERE tinyint_col=1 " + "LIMIT 10) SELECT id FROM alltypes_tiny1") + + self.assert_trace( + query_id=result.query_id, + query_profile=result.runtime_profile, + cluster_id="select_dml") + + def test_query_exec_fail(self): + """Test that OpenTelemetry tracing is working by running a query that fails during + execution and checking that the trace file is created and contains expected spans + with the expected attributes.""" + query = "SELECT 1 FROM functional.alltypessmall a JOIN functional.alltypessmall b " \ + "ON a.id != b.id" + self.execute_query_expect_failure(self.client, + query, + { + "abort_on_error": 1, + "batch_size": 0, + "debug_action": "3:PREPARE:FAIL|COORD_BEFORE_EXEC_RPC:SLEEP@5000", + "disable_codegen": 0, + "disable_codegen_rows_threshold": 0, + "exec_single_node_rows_threshold": 0, + "mt_dop": 4, + "num_nodes": 0, + "test_replan": 1 + }) + + query_id, profile = self.query_id_from_ui(section="completed_queries", + match_query=query) + + self.assert_trace( + query_id=query_id, + query_profile=profile, + cluster_id="select_dml", + err_span="QueryExecution", + async_close=True) + + def test_select_timeout(self): + query = "SELECT * FROM functional.alltypes WHERE id=sleep(5000)" + self.execute_query_expect_failure(self.client, query, {"exec_time_limit_s": "1"}) + + # Retrieve the query id and runtime profile from the UI since the query execute call + # only returns a HiveServer2Error object and not the query id or profile. + query_id, profile = self.query_id_from_ui(section="completed_queries", + match_query=query) + + self.assert_trace( + query_id=query_id, + query_profile=profile, + cluster_id="select_dml", + trace_cnt=1, + err_span="QueryExecution", + async_close=True) + + def test_select_empty(self): + self.execute_query_expect_failure(self.client, "") + + # Run a query that will succeed to ensure all traces have been flushed. + result = self.execute_query_expect_success(self.client, "select 1") + + self.assert_trace( + query_id=result.query_id, + query_profile=result.runtime_profile, + cluster_id="select_dml", + trace_cnt=1, + err_span="QueryExecution", + async_close=True) + + self.assert_trace( + query_id=result.query_id, + query_profile=result.runtime_profile, + cluster_id="select_dml", + exact_trace_cnt=True) + + def test_select_union(self): + result = self.execute_query_expect_success(self.client, "SELECT * FROM " + "functional.alltypes LIMIT 5 UNION ALL SELECT * FROM functional.alltypessmall") + + self.assert_trace( + query_id=result.query_id, + query_profile=result.runtime_profile, + cluster_id="select_dml", + exact_trace_cnt=True) + + def test_dml_timeout(self): + query = "INSERT INTO functional.alltypes (id, string_col, year, month) VALUES " \ + "(99999, 'foo', 2025, 1)" + self.execute_query_expect_failure(self.client, query, + {"debug_action": "CRS_AFTER_COORD_STARTS:SLEEP@3000", "exec_time_limit_s": "1"}) + + # Retrieve the query id and runtime profile from the UI since the query execute call + # only returns a HiveServer2Error object and not the query id or profile. + query_id, profile = self.query_id_from_ui(section="completed_queries", + match_query=query) + + self.assert_trace( + query_id=query_id, + query_profile=profile, + cluster_id="select_dml", + trace_cnt=1, + err_span="QueryExecution", + async_close=True) + + def test_dml_update(self, unique_database, unique_name): + # IMPALA-14340: Cannot update a table that has the same name as the database. + tbl = "{}.{}_tbl".format(unique_database, unique_name) + + self.execute_query_expect_success(self.client, "CREATE TABLE {} (id int, " + "string_col string) STORED AS ICEBERG TBLPROPERTIES('format-version'='2')" + .format(tbl)) + + self.execute_query_expect_success(self.client, "INSERT INTO {} (id, string_col) " + "VALUES (1, 'foo'), (2, 'bar')".format(tbl)) + + result = self.execute_query_expect_success(self.client, + "UPDATE {} SET string_col='a' WHERE id=1".format(tbl)) + + self.assert_trace( + query_id=result.query_id, + query_profile=result.runtime_profile, + cluster_id="select_dml", + trace_cnt=4) + + def test_dml_delete(self, unique_database, unique_name): + # IMPALA-14340: Cannot delete from a table that has the same name as the database. + tbl = "{}.{}_tbl".format(unique_database, unique_name) + + self.execute_query_expect_success(self.client, "CREATE TABLE {} (id int, " + "string_col string) STORED AS ICEBERG TBLPROPERTIES('format-version'='2')" + .format(tbl)) + + self.execute_query_expect_success(self.client, "INSERT INTO {} (id, string_col) " + "VALUES (1, 'foo'), (2, 'bar')".format(tbl)) + + result = self.execute_query_expect_success(self.client, + "DELETE FROM {} WHERE id=1".format(tbl)) + + self.assert_trace( + query_id=result.query_id, + query_profile=result.runtime_profile, + cluster_id="select_dml", + trace_cnt=4) + + def test_dml_delete_join(self, unique_database, unique_name): + tbl1 = "{}.{}_1".format(unique_database, unique_name) + self.execute_query_expect_success(self.client, "CREATE TABLE {} STORED AS ICEBERG " + "TBLPROPERTIES('format-version'='2') AS SELECT id, bool_col, int_col, year, month " + "FROM functional.alltypes ORDER BY id limit 100".format(tbl1)) + + tbl2 = "{}.{}_2".format(unique_database, unique_name) + self.execute_query_expect_success(self.client, "CREATE TABLE {} STORED AS ICEBERG " + "TBLPROPERTIES('format-version'='2') AS SELECT id, bool_col, int_col, year, month " + "FROM functional.alltypessmall ORDER BY id LIMIT 100".format(tbl2)) + + result = self.execute_query_expect_success(self.client, "DELETE t1 FROM {} t1 JOIN " + "{} t2 ON t1.id = t2.id WHERE t2.id < 5".format(tbl1, tbl2)) + + self.assert_trace( + query_id=result.query_id, + query_profile=result.runtime_profile, + cluster_id="select_dml", + trace_cnt=5) + + def test_ignored_queries(self, unique_database, unique_name): + """Asserts queries that should not generate traces do not generate traces.""" + tbl = "{}.{}".format(unique_database, unique_name) + res_create = self.execute_query_expect_success(self.client, + "CREATE TABLE {} (a int)".format(tbl)) + + # These queries are not expected to have traces created for them. + ignore_queries = [ + "COMMENT ON DATABASE {} IS 'test'".format(unique_database), + "DESCRIBE {}".format(tbl), + "EXPLAIN SELECT * FROM {}".format(tbl), + "REFRESH FUNCTIONS functional", + "REFRESH functional.alltypes", + "SET ALL", + "SHOW TABLES IN {}".format(unique_database), + "SHOW DATABASES", + "TRUNCATE TABLE {}".format(tbl), + "USE functional", + "VALUES (1, 2, 3)", + "/*comment1*/SET EXPLAIN_LEVEL=0", + "--comment1\nSET EXPLAIN_LEVEL=0" + ] + + for query in ignore_queries: + self.execute_query_expect_success(self.client, query) + + # REFRESH AUTHORIZATION will error if authorization is not enabled. Since the test is + # only asserting that no trace is created, ignore any errors from the query. + try: + self.execute_query_expect_success(self.client, "REFRESH AUTHORIZATION") + except HiveServer2Error: + pass + + # Run one more query that is expected to have a trace created to ensure that all + # traces have been flushed to the trace file. + res_drop = self.execute_query_expect_success(self.client, "DROP TABLE {} PURGE" + .format(tbl)) + + # Ensure the expected number of traces are present in the trace file. + # The expected line count is 4 because: + # 1. unique_database fixture runs drop database + # 2. unique_database fixture runs create database + # 3. test runs create table + # 4. test runs drop table + wait_for_file_line_count(file_path=self.trace_file_path, + expected_line_count=4 + self.trace_file_count, max_attempts=10, sleep_time_s=1, + backoff=1, exact_match=True) + + # Assert the traces for the create/drop table query to ensure both were created. + self.assert_trace( + query_id=res_create.query_id, + query_profile=res_create.runtime_profile, + cluster_id="select_dml", + trace_cnt=4, + missing_spans=["AdmissionControl"]) + + self.assert_trace( + query_id=res_drop.query_id, + query_profile=res_drop.runtime_profile, + cluster_id="select_dml", + trace_cnt=4, + missing_spans=["AdmissionControl"]) + + def test_dml_insert_success(self, unique_database, unique_name): + self.execute_query_expect_success(self.client, + "CREATE TABLE {}.{} (id int, string_col string, int_col int)" + .format(unique_database, unique_name)) + + result = self.execute_query_expect_success(self.client, + "INSERT INTO {}.{} (id, string_col, int_col) VALUES (1, 'a', 10), (2, 'b', 20), " + "(3, 'c', 30)".format(unique_database, unique_name)) + + self.assert_trace( + query_id=result.query_id, + query_profile=result.runtime_profile, + cluster_id="select_dml", + trace_cnt=4) + + def test_dml_insert_cte_success(self, unique_database, unique_name): + self.execute_query_expect_success(self.client, + "CREATE TABLE {}.{} (id int)".format(unique_database, unique_name)) + + result = self.execute_query_expect_success(self.client, + "WITH a1 AS (SELECT * FROM functional.alltypes WHERE tinyint_col=1 limit 10) " + "INSERT INTO {}.{} SELECT id FROM a1".format(unique_database, unique_name)) + + self.assert_trace( + query_id=result.query_id, + query_profile=result.runtime_profile, + cluster_id="select_dml", + trace_cnt=4) + + def test_dml_insert_overwrite(self, unique_database, unique_name): + """Test that OpenTelemetry tracing is working by running an insert overwrite query and + checking that the trace file is created and contains expected spans with the + expected attributes.""" + self.execute_query_expect_success(self.client, + "CREATE TABLE {}.{} AS SELECT * FROM functional.alltypes WHERE id < 500 ".format( + unique_database, unique_name)) + + result = self.execute_query_expect_success(self.client, + "INSERT OVERWRITE TABLE {}.{} SELECT * FROM functional.alltypes WHERE id > 500 " + "AND id < 1000".format(unique_database, unique_name)) + + self.assert_trace( + query_id=result.query_id, + query_profile=result.runtime_profile, + cluster_id="select_dml", + trace_cnt=4) + + +class TestOtelTraceSelectQueued(TestOtelTrace): + """Tests that require setting additional startup flags to assert admission control + queueing behavior. The cluster must be restarted after each test to apply the + new flags.""" + + @CustomClusterTestSuite.with_args( + impalad_args="-v=2 --cluster_id=select_queued --default_pool_max_requests=1 {}" + .format(TRACE_FLAGS), + cluster_size=1, tmp_dir_placeholders=[OUT_DIR], disable_log_buffering=True) + def test_select_queued(self): + # Launch two queries, the second will be queued until the first completes. + query = "SELECT * FROM functional.alltypes WHERE id = 1" + handle1 = self.client.execute_async("{} AND int_col = SLEEP(5000)".format(query)) + self.client.wait_for_impala_state(handle1, RUNNING, 60) + query_id_1 = self.client.handle_id(handle1) + + handle2 = self.client.execute_async(query) + query_id_2 = self.client.handle_id(handle2) + + self.client.wait_for_impala_state(handle1, FINISHED, 60) + self.client.fetch(None, handle1) + query_profile_1 = self.client.get_runtime_profile(handle1) + self.client.close_query(handle1) + + self.client.wait_for_impala_state(handle2, FINISHED, 60) + self.client.fetch(None, handle2) + query_profile_2 = self.client.get_runtime_profile(handle2) + + self.client.close_query(handle2) + + self.assert_trace( + query_id=query_id_1, + query_profile=query_profile_1, + cluster_id="select_queued", + trace_cnt=3) + + self.assert_trace( + query_id=query_id_2, + query_profile=query_profile_2, + cluster_id="select_queued", + trace_cnt=3) + + @CustomClusterTestSuite.with_args( + impalad_args="-v=2 --cluster_id=cancel_adm_ctl --default_pool_max_requests=1 {}" + .format(TRACE_FLAGS), cluster_size=1, tmp_dir_placeholders=[OUT_DIR], + disable_log_buffering=True) + def test_cancel_during_admission_control(self): + """Tests that a query that is queued in admission control can be cancelled and + the trace is created with the expected spans and events.""" + # Start a long-running query that will take up the only admission control slot. + self.client.execute_async("SELECT * FROM functional.alltypes WHERE id = " + "SLEEP(5000)") + + # Start a second query that will be queued and then cancelled. + handle2 = self.client.execute_async("SELECT * FROM functional.alltypes") + self.client.wait_for_impala_state(handle2, PENDING, 30) + query_id = self.client.handle_id(handle2) + + # Cancel the second query while it is queued. + self.execute_query_expect_success(self.client, "KILL QUERY '{}'".format(query_id)) + self.client.wait_for_impala_state(handle2, ERROR, 30) + query_profile = self.client.get_runtime_profile(handle2) + + self.assert_trace( + query_id=query_id, + query_profile=query_profile, + cluster_id="cancel_adm_ctl", + trace_cnt=1, + err_span="AdmissionControl", + missing_spans=["QueryExecution"], + async_close=True) + + +class TestOtelTraceSelectRetry(TestOtelTrace): + """Tests the require ending an Impala daemon and thus the cluster must restart after + each test.""" + + @CustomClusterTestSuite.with_args( + impalad_args="-v=2 --cluster_id=retry_select_success {}".format(TRACE_FLAGS), + cluster_size=3, num_exclusive_coordinators=1, tmp_dir_placeholders=[OUT_DIR], + disable_log_buffering=True, + statestored_args="-statestore_heartbeat_frequency_ms=60000") + def test_retry_select_success(self): + self.cluster.impalads[1].kill() + + result = self.execute_query_expect_success(self.client, + "SELECT COUNT(*) FROM tpch_parquet.lineitem WHERE l_orderkey < 50", + {"RETRY_FAILED_QUERIES": True}) + retried_query_id = parse_query_id(result.runtime_profile) + orig_query_profile = self.query_profile_from_ui(result.query_id) + + # Assert the trace from the original query. + self.assert_trace( + query_id=result.query_id, + query_profile=orig_query_profile, + cluster_id="retry_select_success", + trace_cnt=2, + err_span="QueryExecution", + async_close=True) + + # Assert the trace from the retried query. + self.assert_trace( + query_id=retried_query_id, + query_profile=result.runtime_profile, + cluster_id="retry_select_success", + trace_cnt=2, + missing_spans=["Submitted", "Planning"], + async_close=True) + + @CustomClusterTestSuite.with_args( + impalad_args="-v=2 --cluster_id=retry_select_failed {}".format(TRACE_FLAGS), + cluster_size=3, num_exclusive_coordinators=1, tmp_dir_placeholders=[OUT_DIR], + disable_log_buffering=True, + statestored_args="-statestore_heartbeat_frequency_ms=1000") + def test_retry_select_failed(self): + + with self.create_impala_client() as client: + client.set_configuration({"retry_failed_queries": "true"}) + + # Launch a shuffle heavy query, it should be retried. + handle = client.execute_async("SELECT * FROM tpch.lineitem t1, tpch.lineitem " + "t2 WHERE t1.l_orderkey = t2.l_orderkey ORDER BY t1.l_orderkey, t2.l_orderkey " + "LIMIT 1") + client.wait_for_impala_state(handle, RUNNING, 60) + query_id = client.handle_id(handle) + self.cluster.impalads[1].kill() + + # Wait until the retry is running. + def __wait_until_retried(): + return parse_retry_status(self.query_profile_from_ui(query_id)) == "RETRIED" + retry(__wait_until_retried, 60, 1, 1, False) + + # Kill another impalad so that another retry is attempted. + self.cluster.impalads[2].kill() + + # Wait until the query fails. + client.wait_for_impala_state(handle, ERROR, 60) + + retried_query_profile = client.get_runtime_profile(handle) + retried_query_id = parse_query_id(retried_query_profile) + orig_query_profile = self.query_profile_from_ui(query_id) + + client.close_query(handle) + + # Assert the trace from the original query. + self.assert_trace( + query_id=query_id, + query_profile=orig_query_profile, + cluster_id="retry_select_failed", + trace_cnt=2, + err_span="QueryExecution", + async_close=True) + + # Assert the trace from the retried query. + self.assert_trace( + query_id=retried_query_id, + query_profile=retried_query_profile, + cluster_id="retry_select_failed", + trace_cnt=2, + err_span="QueryExecution", + missing_spans=["Submitted", "Planning"], + async_close=True) + + +@CustomClusterTestSuite.with_args( + impalad_args="-v=2 --cluster_id=trace_ddl {}".format(TRACE_FLAGS), + cluster_size=2, tmp_dir_placeholders=[OUT_DIR], disable_log_buffering=True) +class TestOtelTraceDDLs(TestOtelTrace): + """Tests that exercise OpenTelemetry tracing behavior on DDLs. These tests are in their + own class because they require an additional test dimension for async DDLs""" + + @classmethod + def add_test_dimensions(cls): + super(TestOtelTraceDDLs, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('async_ddl', True, False)) + + def test_ddl_createdb(self, vector, unique_name): + try: + result = self.execute_query_expect_success(self.client, + "CREATE DATABASE {}".format(unique_name), + {"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')}) + + self.assert_trace( + query_id=result.query_id, + query_profile=result.runtime_profile, + cluster_id="trace_ddl", + missing_spans=["AdmissionControl"]) + finally: + result = self.execute_query_expect_success(self.client, + "DROP DATABASE IF EXISTS {}".format(unique_name), + {"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')}) + self.assert_trace( + query_id=result.query_id, + query_profile=result.runtime_profile, + cluster_id="trace_ddl", + trace_cnt=2, + missing_spans=["AdmissionControl"]) + + def test_ddl_create_alter_table(self, vector, unique_database, unique_name): + """Tests that traces are created for a successful create table, a successful alter + table, and a failed alter table (adding a column that already exists).""" + create_result = self.execute_query_expect_success(self.client, + "CREATE TABLE {}.{} (id int, string_col string, int_col int)" + .format(unique_database, unique_name), + {"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')}) + + alter_success_result = self.execute_query_expect_success(self.client, "ALTER TABLE " + "{}.{} ADD COLUMNS (new_col string)".format(unique_database, unique_name), + {"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')}) + + fail_query = "ALTER TABLE {}.{} ADD COLUMNS (new_col string)" \ + .format(unique_database, unique_name) + self.execute_query_expect_failure(self.client, fail_query, + {"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')}) + + fail_query_id, fail_profile = self.query_id_from_ui(section="completed_queries", + match_query=fail_query) + + self.assert_trace( + query_id=create_result.query_id, + query_profile=create_result.runtime_profile, + cluster_id="trace_ddl", + trace_cnt=5, + missing_spans=["AdmissionControl"]) + + self.assert_trace( + query_id=alter_success_result.query_id, + query_profile=alter_success_result.runtime_profile, + cluster_id="trace_ddl", + trace_cnt=5, + missing_spans=["AdmissionControl"]) + + self.assert_trace( + query_id=fail_query_id, + query_profile=fail_profile, + cluster_id="trace_ddl", + trace_cnt=5, + missing_spans=["AdmissionControl", "QueryExecution"], + err_span="Planning") + + def test_ddl_createtable_fail(self, vector, unique_name): + with self.create_client_for_nth_impalad(1, HS2) as second_coord_client: + try: + # Create a database to use for this test. Cannot use the unique_database fixture + # because we want to drop the database after planning but before execution and + # that fixture drops the database without the "if exists" clause. + self.execute_query_expect_success(self.client, "CREATE DATABASE {}" + .format(unique_name)) + + with self.create_client_for_nth_impalad(0, HS2) as first_coord_client: + # In a separate thread, run the create table DDL that will fail. + fail_query = "CREATE TABLE {}.{} (id int, string_col string, int_col int)" \ + .format(unique_name, unique_name) + + def execute_query_fail(): + self.execute_query_expect_failure(first_coord_client, fail_query, + {"debug_action": "CRS_DELAY_BEFORE_CATALOG_OP_EXEC:SLEEP@5000", + "ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')}) + + thread = Thread(target=execute_query_fail) + thread.daemon = True + thread.start() + + # Wait until the create table query is in flight. + fail_query_id = None + while fail_query_id is None: + fail_query_id, profile = self.query_id_from_ui(section="in_flight_queries", + match_query=fail_query, not_found_ok=True) + if fail_query_id is not None and len(profile.strip()) > 0 \ + and parse_impala_query_state(profile) == "RUNNING": + break + sleep(0.1) + + # Drop the database after planning to cause the create table to fail. + self.execute_query_expect_success(second_coord_client, + "DROP DATABASE {} CASCADE".format(unique_name), + {"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')}) + + # Wait until the create table query fails. + thread.join() + fail_profile_str = self.query_profile_from_ui(fail_query_id) + finally: + self.execute_query_expect_success(second_coord_client, + "DROP DATABASE IF EXISTS {} CASCADE".format(unique_name)) + + # Assert the errored query. + self.assert_trace( + query_id=fail_query_id, + query_profile=fail_profile_str, + cluster_id="trace_ddl", + trace_cnt=4, + missing_spans=["AdmissionControl"], + err_span="QueryExecution") + + def test_ddl_createtable_cte_success(self, vector, unique_database, unique_name): + result = self.execute_query_expect_success(self.client, + "CREATE TABLE {}.{} AS WITH a1 AS (SELECT * FROM functional.alltypes WHERE " + "tinyint_col=1 LIMIT 10) SELECT id FROM a1".format(unique_database, unique_name), + {"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')}) + + self.assert_trace( + query_id=result.query_id, + query_profile=result.runtime_profile, + cluster_id="trace_ddl", + trace_cnt=3, + missing_spans=["AdmissionControl"]) + + def test_compute_stats(self, vector, unique_database, unique_name): + """The compute stats queries are a special case. These statements run two separate + select queries. Locate both select queries on the UI and assert their traces.""" + + tbl_name = "{}.{}_alltypes".format(unique_database, unique_name) + + # Setup a test table to ensure calculating stats on an existing table does not impact + # other tests. + self.execute_query_expect_success(self.client, "CREATE TABLE {} PARTITIONED BY " + "(year, month) AS SELECT * FROM functional.alltypes".format(tbl_name)) + + compute_result = self.execute_query_expect_success(self.client, + "COMPUTE STATS {}".format(tbl_name), + {"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')}) + + query_id_cnt, profile_cnt = self.query_id_from_ui(section="completed_queries", + match_query="SELECT COUNT(*), `year`, `month` FROM {} GROUP BY " + "`year`, `month`".format(tbl_name)) + + query_id_ndv, profile_ndv = self.query_id_from_ui(section="completed_queries", + match_func=lambda q: q['stmt'].startswith("SELECT NDV(id)")) + + self.assert_trace( + query_id=compute_result.query_id, + query_profile=compute_result.runtime_profile, + cluster_id="trace_ddl", + trace_cnt=4, + missing_spans=["AdmissionControl"]) + + self.assert_trace( + query_id=query_id_cnt, + query_profile=profile_cnt, + cluster_id="trace_ddl", + trace_cnt=4) + + self.assert_trace( + query_id=query_id_ndv, + query_profile=profile_ndv, + cluster_id="trace_ddl", + trace_cnt=4) + + def test_compute_incremental_stats(self, vector, unique_database, unique_name): + tbl_name = "{}.{}_alltypes".format(unique_database, unique_name) + + # Setup a test table to ensure calculating stats on an existing table does not impact + # other tests. + self.execute_query_expect_success(self.client, "CREATE TABLE {} PARTITIONED BY " + "(year, month) AS SELECT * FROM functional.alltypes".format(tbl_name)) + + result = self.execute_query_expect_success(self.client, + "COMPUTE INCREMENTAL STATS {} PARTITION (month=1)".format(tbl_name), + {"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')}) + + self.assert_trace( + query_id=result.query_id, + query_profile=result.runtime_profile, + cluster_id="trace_ddl", + trace_cnt=2, + missing_spans=["AdmissionControl"]) + + # Assert the one trace matches the refresh table query. + self.assert_trace( + query_id=result.query_id, + query_profile=result.runtime_profile, + cluster_id="trace_ddl", + trace_cnt=2, + missing_spans=["AdmissionControl"]) + + def test_invalidate_metadata(self, vector): + result = self.execute_query_expect_success(self.client, + "INVALIDATE METADATA functional.alltypes", + {"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')}) + + self.assert_trace( + query_id=result.query_id, + query_profile=result.runtime_profile, + cluster_id="trace_ddl", + trace_cnt=1, + missing_spans=["AdmissionControl"]) diff --git a/tests/util/otel_trace.py b/tests/util/otel_trace.py index 234722953..b763c9a47 100644 --- a/tests/util/otel_trace.py +++ b/tests/util/otel_trace.py @@ -21,6 +21,8 @@ import json import os import sys +from time import sleep + from tests.common.environ import IMPALA_LOCAL_BUILD_VERSION # Valid types of OpenTelemetry attribute values. @@ -106,6 +108,7 @@ class OtelSpan: scope_name: The name of the scope that produced this span. scope_version: The version of the scope that produced this span. attributes: A dictionary of attribute key to AttributeValue object. + events: A dictionary of event name to event time. start_time: The start time of the span in nanoseconds since epoch. end_time: The end time of the span in nanoseconds since epoch. flags: The OpenTelemetry trace flags of the span (represented as an integer). @@ -122,6 +125,7 @@ class OtelSpan: self.scope_name = "" self.scope_version = "" self.attributes = {} + self.events = {} self.start_time = 0 self.end_time = 0 self.flags = -1 @@ -136,10 +140,36 @@ class OtelSpan: return self.parent_span_id is None def add_attribute(self, key, value): + if sys.version_info.major < 3: + assert isinstance(key, unicode), "key must be a string" # noqa: F821 + key = str(key) + else: + assert isinstance(key, str), "key must be a string" + + assert isinstance(value, AttributeValue), "Value must be an instance of " \ + "AttributeValue, got: {}".format(type(value)) + self.attributes[key] = value if key == "QueryId": self.query_id = value.value + def add_event(self, name, time_unix_nano): + if sys.version_info.major < 3: + assert isinstance(name, unicode), "Event name must be a string" # noqa: F821 + name = str(name) + assert isinstance(time_unix_nano, unicode), \ + "Time value must be a string" # noqa: F821 + time_unix_nano = str(time_unix_nano) + else: + assert isinstance(name, str), "Event name must be a string" + assert isinstance(time_unix_nano, str), "Time value must be a string" + + try: + self.events[name] = int(time_unix_nano) + except ValueError: + raise ValueError("Could not convert time_unix_nano '{}' to an integer" + .format(time_unix_nano)) + def __str__(self): """ Returns a string representation of the OtelSpan object. @@ -153,6 +183,8 @@ class OtelSpan: self.scope_name, self.scope_version, self.query_id) for k in self.attributes: s += "\n '{}': {},".format(k, self.attributes[k]) + for k in self.events: + s += "\n '{}': {},".format(k, self.events[k]) s += "\n })" return s @@ -226,6 +258,11 @@ def __parse_line(line): key, value = __parse_attr(attr) s.add_attribute(key, value) + # Parse each span event list. + if "events" in span: + for event in span["events"]: + s.add_event(event["name"], event["timeUnixNano"]) + parsed_spans.append(s) except Exception as e: sys.stderr.write("Failed to parse json:\n{}".format(line)) @@ -249,10 +286,33 @@ def parse_trace_file(file_path, query_id): traces_by_query_id = {} parsed_spans = [] - with open(file_path, "r") as f: - lines = f.readlines() - for line in lines: - parsed_spans.extend(__parse_line(line)) + max_retries = 3 + retry_count = 0 + + while retry_count < max_retries: + try: + with open(file_path, "r") as f: + lines = f.readlines() + for line in lines: + if not line.endswith('\n'): + # Line does not end with a newline, thus the entire trace has not yet been + # written to the file. Retry by restarting the loop + parsed_spans = [] + retry_count += 1 + print("Line doesn't end with newline, retrying (attempt {} of {})" + .format(retry_count, max_retries)) + sleep(1) + break + parsed_spans.extend(__parse_line(line)) + else: + # Successfully read all lines, exit the retry loop. + break + except Exception as e: + retry_count += 1 + if retry_count >= max_retries: + raise + print("Error reading trace file, retrying (attempt {} of {}): {}" + .format(retry_count, max_retries, e)) # Build a map of query_id -> OtelTrace for easy lookup. # First, locate all root spans diff --git a/tests/util/query_profile_util.py b/tests/util/query_profile_util.py index 2b9827ed7..f781c20e7 100644 --- a/tests/util/query_profile_util.py +++ b/tests/util/query_profile_util.py @@ -35,7 +35,8 @@ def parse_session_id(profile_text): def parse_sql(profile_text): """Parses the SQL statement from the query profile text.""" - sql_stmt = re.search(r'\n\s+Sql Statement:\s+(.*?)\n', profile_text) + sql_stmt = re.search(r'\n\s+Sql Statement:\s+(.*?)\n\s+Coordinator', profile_text, + re.DOTALL) assert sql_stmt is not None return sql_stmt.group(1) @@ -127,3 +128,26 @@ def parse_default_db(profile_text): default_db = re.search(r'\n\s+Default Db:\s+(.*?)\n', profile_text) assert default_db is not None, "Default Db not found in query profile" return default_db.group(1) + + +def parse_num_modified_rows(profile_text): + """Parses the number of modified rows from the query profile text.""" + num_mod_rows = re.search(r'\nNumModifiedRows:\s+(\d+)', profile_text) + if num_mod_rows is None: + return 0 + return int(num_mod_rows.group(1)) + + +def parse_num_deleted_rows(profile_text): + """Parses the number of deleted rows from the query profile text.""" + num_del_rows = re.search(r'\nNumDeletedRows:\s+(\d+)', profile_text) + if num_del_rows is None: + return 0 + return int(num_del_rows.group(1)) + + +def parse_coordinator(profile_text): + """Parses the coordinator from the query profile text.""" + coordinator = re.search(r'\n\s+Coordinator:\s+(.*?)\n', profile_text) + assert coordinator is not None, "Coordinator not found in query profile" + return coordinator.group(1)