This is an automated email from the ASF dual-hosted git repository. jasonmfehr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 2ad6f818a5a3f8bdc0f19be29760cc22f9f4c18a Author: jasonmfehr <jf...@cloudera.com> AuthorDate: Tue May 20 10:57:38 2025 -0700 IMPALA-13237: [Patch 5] - Implement OpenTelemetry Traces for Select Queries Tracking Adds representation of Impala select queries using OpenTelemetry traces. Each Impala query is represented as its own individual OpenTelemetry trace. The one exception is retried queries which will have an individual trace for each attempt. These traces consist of a root span and several child spans. Each child span has the root as its parent. No child span has another child span as its parent. Each child span represents one high-level query lifecycle stage. Each child span also has span attributes that further describe the state of the query. Child spans: 1. Init 2. Submitted 3. Planning 4. Admission Control 5. Query Execution 6. Close Each child span contains a mix of universal attributes (available on all spans) and query phase specific attributes. For example, the "ErrorMsg" attribute, present on all child spans, is the error message (if any) at the end of that particular query phase. One example of a child span specific attribute is "QueryType" on the Planning span. Since query type is first determined during query planning, the "QueryType" attribute is present on the Planning span and has a value of "QUERY" (since only selects are supported). Since queries can run for lengthy periods of time, the Init span communicates the beginning of a query along with global query attributes. For example, span attributes include query id, session id, sql, user, etc. Once the query has closed, the root span is closed. Testing accomplished with new custom cluster tests. Generated-by: Github Copilot (GPT-4.1, Claude Sonnet 3.7) Change-Id: Ie40b5cd33274df13f3005bf7a704299ebfff8a5b Reviewed-on: http://gerrit.cloudera.org:8080/22924 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- be/CMakeLists.txt | 5 + be/src/observe/CMakeLists.txt | 31 ++ be/src/observe/otel-flags-trace.cc | 235 +++++++++ be/src/observe/otel-flags.cc | 60 +++ be/src/observe/otel-instrument.h | 64 +++ be/src/observe/otel.cc | 299 +++++++++++ be/src/observe/otel.h | 57 ++ be/src/observe/span-manager.cc | 473 +++++++++++++++++ be/src/observe/span-manager.h | 167 ++++++ be/src/observe/timed-span.cc | 125 +++++ be/src/observe/timed-span.h | 102 ++++ be/src/runtime/coordinator.cc | 19 +- be/src/runtime/exec-env.cc | 7 + be/src/runtime/query-driver.cc | 4 + be/src/scheduling/admission-control-client.h | 6 +- .../scheduling/local-admission-control-client.cc | 10 +- be/src/scheduling/local-admission-control-client.h | 8 +- .../scheduling/remote-admission-control-client.cc | 12 +- .../scheduling/remote-admission-control-client.h | 8 +- be/src/service/client-request-state.cc | 40 +- be/src/service/client-request-state.h | 11 + be/src/service/impala-server.cc | 28 + bin/impala-config.sh | 6 +- tests/common/custom_cluster_test_suite.py | 58 ++ tests/common/file_utils.py | 20 + tests/custom_cluster/test_otel_trace.py | 581 +++++++++++++++++++++ tests/util/otel_trace.py | 278 ++++++++++ tests/util/query_profile_util.py | 122 +++++ 28 files changed, 2821 insertions(+), 15 deletions(-) diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index d2dfe4c3b..9c5fec0ad 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -387,6 +387,7 @@ if (DOXYGEN_FOUND) ${CMAKE_SOURCE_DIR}/be/src/common/ ${CMAKE_SOURCE_DIR}/be/src/exec/ ${CMAKE_SOURCE_DIR}/be/src/exprs/ + ${CMAKE_SOURCE_DIR}/be/src/observe/ ${CMAKE_SOURCE_DIR}/be/src/runtime/ ${CMAKE_SOURCE_DIR}/be/src/scheduling/ ${CMAKE_SOURCE_DIR}/be/src/service/ @@ -540,6 +541,7 @@ set (IMPALA_LIBS rpc_header_proto rpc_introspection_proto pb_util_proto + Observe Runtime RuntimeIr Scheduling @@ -631,6 +633,7 @@ if (BUILD_SHARED_LIBS) ExecIcebergMetadata CodeGen Exprs + Observe Rpc Service security @@ -878,6 +881,7 @@ add_subdirectory(src/exprs) add_subdirectory(src/kudu/security) add_subdirectory(src/kudu/rpc) add_subdirectory(src/kudu/util) +add_subdirectory(src/observe) add_subdirectory(src/runtime) add_subdirectory(src/scheduling) add_subdirectory(src/statestore) @@ -904,6 +908,7 @@ link_directories( ${CMAKE_CURRENT_SOURCE_DIR}/build/common ${CMAKE_CURRENT_SOURCE_DIR}/build/exec ${CMAKE_CURRENT_SOURCE_DIR}/build/exprs + ${CMAKE_CURRENT_SOURCE_DIR}/build/observe ${CMAKE_CURRENT_SOURCE_DIR}/build/rpc ${CMAKE_CURRENT_SOURCE_DIR}/build/runtime ${CMAKE_CURRENT_SOURCE_DIR}/build/statestore diff --git a/be/src/observe/CMakeLists.txt b/be/src/observe/CMakeLists.txt new file mode 100644 index 000000000..73f835e9e --- /dev/null +++ b/be/src/observe/CMakeLists.txt @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# where to put generated libraries +set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/observe") + +# where to put generated binaries +set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/observe") + +add_library(Observe + otel-flags.cc + otel-flags-trace.cc + otel.cc + span-manager.cc + timed-span.cc +) +add_dependencies(Observe gen-deps) diff --git a/be/src/observe/otel-flags-trace.cc b/be/src/observe/otel-flags-trace.cc new file mode 100644 index 000000000..b144736c8 --- /dev/null +++ b/be/src/observe/otel-flags-trace.cc @@ -0,0 +1,235 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Contains coordinator startup flags applicable only to OpenTelemetry traces. + +#include <chrono> +#include <regex> +#include <string> + +#include <boost/algorithm/string/predicate.hpp> +#include <boost/algorithm/string/trim.hpp> +#include <gflags/gflags.h> +#include <glog/logging.h> +#include <gutil/strings/split.h> +#include <opentelemetry/sdk/trace/batch_span_processor_options.h> + +#include "common/status.h" +#include "kudu/util/env.h" +#include "kudu/util/faststring.h" +#include "kudu/util/status.h" +#include "observe/otel.h" +#include "util/gflag-validator-util.h" +#include "util/openssl-util.h" + +using namespace std; +using namespace std::chrono; + +static opentelemetry::sdk::trace::BatchSpanProcessorOptions batch_opts; + +DEFINE_bool(otel_trace_enabled, false, "If set to true, OpenTelemetry traces will be " + "generated and exported to the configured OpenTelemetry collector."); + +// Specifies the OTel exporter. This flag is hidden because the file exporter is used for +// testing and not supported in production. +DEFINE_string_hidden(otel_trace_exporter, impala::OTEL_EXPORTER_OTLP_HTTP.c_str(), + "The trace exporter to use for OpenTelemetry spans. Supported values: 'otlp_http' " + "and 'file'."); +DEFINE_validator(otel_trace_exporter, [](const char* flagname, const string& value) { + if (value == impala::OTEL_EXPORTER_OTLP_HTTP + || value == impala::OTEL_EXPORTER_FILE) { + return true; + } + + LOG(ERROR) << "Flag '" << flagname << "' must be one of: '" + << impala::OTEL_EXPORTER_OTLP_HTTP << "', '" << impala::OTEL_EXPORTER_FILE << "'."; + return false; +}); // flag otel_trace_exporter + + +// +// Start of HTTP related flags. +// +DEFINE_string(otel_trace_collector_url, "", "The URL of the OpenTelemetry collector to " + "which trace data will be exported."); +DEFINE_validator(otel_trace_collector_url, [](const char* flagname, const string& value) { + if (value.empty()) { + return true; + } + + // Check if URL starts with http:// or https:// + if (!(value.rfind("http://", 0) == 0 || value.rfind("https://", 0) == 0)) { + LOG(ERROR) << "Flag '" << flagname << "' must start with 'http://' or 'https://'"; + return false; + } + + return true; +}); // flag otel_trace_collector_url + +DEFINE_string(otel_trace_additional_headers, "", "List of additional HTTP headers to be " + "sent with each call to the OTel Collector. Individual headers are separated by a " + "delimiter of three colons. Format is 'key1=value1:::key2=value2:::key3=value3'."); +DEFINE_validator(otel_trace_additional_headers, [](const char* flagname, + const string& value) { + bool valid = true; + + if (!value.empty()) { + for (auto header : strings::Split(value, ":::")) { + if (header.find('=') == string::npos) { + LOG(ERROR) << "Flag '" << flagname << "' contains an invalid header " + "(missing '='): " << header; + valid = false; + } + } + } + + return valid; +}); // flag otel_trace_additional_headers + +DEFINE_bool(otel_trace_compression, true, "If set to true, uses ZLib compression for " + "sending data to the OTel Collector. If set to false, sends data uncompressed."); + +DEFINE_int32(otel_trace_timeout_s, 10, "Export timeout in seconds."); + +DEFINE_validator(otel_trace_timeout_s, ge_one); +// +// End of HTTP related flags. +// + + +// +// Start of TLS related flags. +// +static bool validate_pem_bundle_string(const char* flagname, const string& value) { + impala::Status s = impala::ValidatePemBundle(value); + + if (!s.ok()) { + LOG(ERROR) << "Flag '" << flagname << " failed validation: " << s.GetDetail(); + return false; + } + + return true; +} // function validate_pem_bundle_string + +DEFINE_string(otel_trace_ca_cert_path, "", "Path to a file containing CA certificates " + "bundle. Combined with 'otel_trace_ca_cert_string' if both are specified. "); +DEFINE_validator(otel_trace_ca_cert_path, [](const char* flagname, const string& value) { + if (!value.empty()) { + kudu::faststring contents; + kudu::Status s = kudu::ReadFileToString(kudu::Env::Default(), value, &contents); + + if (!s.ok()) { + LOG(ERROR) << "Flag '" << flagname << "' must point to a valid file: " << value; + return false; + } + + return validate_pem_bundle_string(flagname, contents.ToString()); + } + + return true; +}); // flag otel_trace_ca_cert_path + +DEFINE_string(otel_trace_ca_cert_string, "", "String containing CA certificates bundle. " + "Combined with 'otel_trace_ca_cert_path' if both are specified."); +DEFINE_validator(otel_trace_ca_cert_string, [](const char* flagname, + const string& value) { + if (!value.empty()) { + return validate_pem_bundle_string(flagname, value); + } + + return true; +}); // flag otel_trace_ca_cert_string + + +DEFINE_string(otel_trace_tls_minimum_version, "", "String containing the minimum allowed " + "TLS version, if not specified, defaults to the overall minimum TLS version."); +DEFINE_validator(otel_trace_tls_minimum_version, [](const char* flagname, + const string& value) { + if (value.empty() || value == "1.2" || value == "1.3") { + return true; + } + LOG(ERROR) << "Flag '" << flagname << "' must be empty or one of: '1.2', '1.3'."; + return false; +}); // flag otel_trace_tls_minimum_version + +DEFINE_string(otel_trace_ssl_ciphers, "", "List of allowed TLS cipher suites when using " + "TLS 1.2, default to the value of Impala’s ssl_cipher_list startup flag."); + +DEFINE_string(otel_trace_tls_cipher_suites, "", "List of allowed TLS cipher suites when " + "using TLS 1.3, default to the value of Impala’s tls_ciphersuites startup flag."); + +DEFINE_bool(otel_trace_tls_insecure_skip_verify, false, "If set to true, skips " + "verification of collector’s TLS certificate."); +// +// End of TLS related flags. +// + + +// +// Start of retry policy flags +// +DEFINE_int32(otel_trace_retry_policy_max_attempts, 5, "Maximum number of call attempts, " + "including the original attempt."); +DEFINE_validator(otel_trace_retry_policy_max_attempts, gt_zero); + +DEFINE_double(otel_trace_retry_policy_initial_backoff_s, 1.0, "Initial backoff delay " + "between retry attempts in seconds."); +DEFINE_validator(otel_trace_retry_policy_initial_backoff_s, ge_one); + +DEFINE_int32(otel_trace_retry_policy_max_backoff_s, 0, "Maximum backoff delay between " + "retry attempts in seconds. Value of 0 or less indicates not set."); + +DEFINE_double(otel_trace_retry_policy_backoff_multiplier, 2.0, "Backoff will be " + "multiplied by this value after each retry attempt."); +DEFINE_validator(otel_trace_retry_policy_backoff_multiplier, ge_one); +// +// End of retry policy flags +// + + +// +// Start of Span Processor flags +// +static const string SPAN_PROCESSOR_HELP = "The span processor implementation to use for " + "exporting spans to the OTel Collector. Supported values: '" + + impala::SPAN_PROCESSOR_BATCH + "' and '" + impala::SPAN_PROCESSOR_SIMPLE + "'."; +DEFINE_string(otel_trace_span_processor, impala::SPAN_PROCESSOR_BATCH.c_str(), + SPAN_PROCESSOR_HELP.c_str()); +DEFINE_validator(otel_trace_span_processor, [](const char* flagname, + const string& value) { + const std::string trimmed = boost::algorithm::trim_copy(value); + return boost::iequals(trimmed, impala::SPAN_PROCESSOR_BATCH) + || boost::iequals(trimmed, impala::SPAN_PROCESSOR_SIMPLE); +}); + +DEFINE_int32(otel_trace_batch_queue_size, batch_opts.max_queue_size, "The maximum " + "buffer/queue size. After the size is reached, spans are dropped. Applicable when " + "'otel_trace_span_processor' is 'batch'."); +DEFINE_validator(otel_trace_batch_queue_size, ge_one); + +DEFINE_int32(otel_trace_batch_schedule_delay_ms, batch_opts.schedule_delay_millis.count(), + "The delay interval in milliseconds between two consecutive batch exports. " + "Applicable when 'otel_trace_span_processor' is 'batch'."); +DEFINE_validator(otel_trace_batch_schedule_delay_ms, ge_one); + +DEFINE_int32(otel_trace_batch_max_batch_size, batch_opts.max_export_batch_size, + "The maximum batch size of every export to OTel Collector. Applicable when " + "'otel_trace_span_processor' is 'batch'."); +DEFINE_validator(otel_trace_batch_max_batch_size, ge_one); +// +// End of Span Processor flags +// diff --git a/be/src/observe/otel-flags.cc b/be/src/observe/otel-flags.cc new file mode 100644 index 000000000..b4958120f --- /dev/null +++ b/be/src/observe/otel-flags.cc @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Contains coordinator startup flags applicable to all of OpenTelemetry. + +#include <chrono> + +#include <opentelemetry/exporters/otlp/otlp_file_client_options.h> + +#include "util/gflag-validator-util.h" + +using namespace std; +using namespace std::chrono; + +DEFINE_bool(otel_debug, false, "If set to true, outputs additional debug info"); + +// +// Start of OTLP File Exporter flags +// +static opentelemetry::exporter::otlp::OtlpFileClientFileSystemOptions file_opts; + +DEFINE_string_hidden(otel_file_pattern, file_opts.file_pattern, "Pattern to create " + "output file for the OTLP file exporter."); + +DEFINE_string_hidden(otel_file_alias_pattern, file_opts.alias_pattern, "Pattern to " + "create alias file path for the latest file rotation for the OTLP file exporter."); + +DEFINE_int32_hidden(otel_file_flush_interval_ms, duration_cast<milliseconds>( + file_opts.flush_interval).count(), "Flush interval in milliseconds for the OTLP " + "file exporter."); +DEFINE_validator(otel_file_flush_interval_ms, ge_one); + +DEFINE_int32_hidden(otel_file_flush_count, file_opts.flush_count, "Flush record count " + "for the OTLP file exporter."); +DEFINE_validator(otel_file_flush_count, ge_one); + +DEFINE_int32_hidden(otel_file_max_file_size, file_opts.file_size, "Maximum file size in " + "bytes for the OTLP file exporter."); +DEFINE_validator(otel_file_max_file_size, ge_one); + +DEFINE_int32_hidden(otel_file_max_file_count, file_opts.rotate_size, "Maximum file count " + "(rotate size) for the OTLP file exporter."); +DEFINE_validator(otel_file_max_file_count, ge_one); +// +// End of OTLP File Exporter flags +// diff --git a/be/src/observe/otel-instrument.h b/be/src/observe/otel-instrument.h new file mode 100644 index 000000000..4a9260978 --- /dev/null +++ b/be/src/observe/otel-instrument.h @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#ifndef ENABLE_THREAD_INSTRUMENTATION_PREVIEW +#define ENABLE_THREAD_INSTRUMENTATION_PREVIEW +#endif + +#include <string> + +#include <glog/logging.h> +#include <opentelemetry/sdk/common/thread_instrumentation.h> + +namespace impala { + +class LoggingInstrumentation : public opentelemetry::sdk::common::ThreadInstrumentation { + +public: + LoggingInstrumentation(const std::string& thread_type) : thread_type_(thread_type) {} + + void OnStart() override { + VLOG(2) << thread_type_ << " opentelemetry thread started"; + } + + void OnEnd() override { + VLOG(2) << thread_type_ << " opentelemetry thread ended"; + } + + void BeforeWait() override { + VLOG(2) << thread_type_ << " opentelemetry thread before wait"; + } + + void AfterWait() override { + VLOG(2) << thread_type_ << " opentelemetry thread after wait"; + } + + void BeforeLoad() override { + VLOG(2) << thread_type_ << " opentelemetry thread before load"; + } + + void AfterLoad() override { + VLOG(2) << thread_type_ << " opentelemetry thread after load"; + } + +private: + const std::string thread_type_; +}; // class LoggingInstrumentation + +} // namespace impala diff --git a/be/src/observe/otel.cc b/be/src/observe/otel.cc new file mode 100644 index 000000000..456c8030d --- /dev/null +++ b/be/src/observe/otel.cc @@ -0,0 +1,299 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "otel.h" + +#include <chrono> +#include <memory> +#include <string> +#include <utility> + +#include <boost/algorithm/string/case_conv.hpp> +#include <boost/algorithm/string/predicate.hpp> +#include <boost/algorithm/string/trim.hpp> +#include <gflags/gflags.h> +#include <glog/logging.h> +#include <gutil/strings/split.h> +#include <opentelemetry/exporters/otlp/otlp_file_exporter.h> +#include <opentelemetry/exporters/otlp/otlp_file_exporter_factory.h> +#include <opentelemetry/exporters/otlp/otlp_file_exporter_options.h> +#include <opentelemetry/exporters/otlp/otlp_http.h> +#include <opentelemetry/exporters/otlp/otlp_http_exporter.h> +#include <opentelemetry/exporters/otlp/otlp_http_exporter_factory.h> +#include <opentelemetry/exporters/otlp/otlp_http_exporter_options.h> +#include <opentelemetry/exporters/otlp/otlp_http_exporter_runtime_options.h> +#include <opentelemetry/sdk/resource/resource.h> +#include <opentelemetry/sdk/trace/batch_span_processor.h> +#include <opentelemetry/sdk/trace/batch_span_processor_factory.h> +#include <opentelemetry/sdk/trace/batch_span_processor_options.h> +#include <opentelemetry/sdk/trace/batch_span_processor_runtime_options.h> +#include <opentelemetry/sdk/trace/exporter.h> +#include <opentelemetry/sdk/trace/processor.h> +#include <opentelemetry/sdk/trace/tracer_provider.h> +#include <opentelemetry/sdk/trace/tracer_provider_factory.h> +#include <opentelemetry/sdk/trace/simple_processor.h> +#include <opentelemetry/trace/provider.h> +#include <opentelemetry/trace/tracer.h> +#include <opentelemetry/version.h> + +#include "common/status.h" +#include "common/version.h" +#include "observe/otel-instrument.h" +#include "observe/span-manager.h" +#include "service/client-request-state.h" + +using namespace boost::algorithm; +using namespace opentelemetry; +using namespace opentelemetry::exporter::otlp; +using namespace opentelemetry::sdk::trace; +using namespace std; + +// OTel related flags +DECLARE_string(otel_trace_additional_headers); +DECLARE_int32(otel_trace_batch_queue_size); +DECLARE_int32(otel_trace_batch_max_batch_size); +DECLARE_int32(otel_trace_batch_schedule_delay_ms); +DECLARE_string(otel_trace_ca_cert_path); +DECLARE_string(otel_trace_ca_cert_string); +DECLARE_string(otel_trace_collector_url); +DECLARE_bool(otel_trace_compression); +DECLARE_bool(otel_debug); +DECLARE_string(otel_trace_exporter); +DECLARE_string(otel_file_pattern); +DECLARE_string(otel_file_alias_pattern); +DECLARE_int32(otel_file_flush_interval_ms); +DECLARE_int32(otel_file_flush_count); +DECLARE_int32(otel_file_max_file_size); +DECLARE_int32(otel_file_max_file_count); +DECLARE_double(otel_trace_retry_policy_backoff_multiplier); +DECLARE_double(otel_trace_retry_policy_initial_backoff_s); +DECLARE_int32(otel_trace_retry_policy_max_attempts); +DECLARE_int32(otel_trace_retry_policy_max_backoff_s); +DECLARE_string(otel_trace_span_processor); +DECLARE_string(otel_trace_ssl_ciphers); +DECLARE_int32(otel_trace_timeout_s); +DECLARE_string(otel_trace_tls_cipher_suites); +DECLARE_bool(otel_trace_tls_insecure_skip_verify); +DECLARE_string(otel_trace_tls_minimum_version); +DECLARE_bool(otel_trace_enabled); + +// Other flags +DECLARE_string(ssl_cipher_list); +DECLARE_string(tls_ciphersuites); +DECLARE_string(ssl_minimum_version); + +// Constants +static const string SCOPE_SPAN_NAME = "org.apache.impala.impalad.query"; + +namespace impala { + +// TraceProvider is a singleton that provides access to the OpenTelemetry TracerProvider. +// Not shared globally via opentelemetry::trace::Provider::SetTracerProvider to enforce +// all tracing to go through the Impala-specific code interfaces to OpenTelemetry tracing. +static unique_ptr<trace::TracerProvider> provider_; + +// Returns true if any TLS configuration flags are set for the OTel exporter. +static inline bool otel_tls_enabled() { + return !FLAGS_otel_trace_ca_cert_path.empty() + || !FLAGS_otel_trace_ca_cert_string.empty() + || !FLAGS_otel_trace_tls_minimum_version.empty() + || !FLAGS_otel_trace_ssl_ciphers.empty() + || !FLAGS_otel_trace_tls_cipher_suites.empty(); +} // function otel_tls_enabled + +bool otel_trace_enabled() { + return FLAGS_otel_trace_enabled; +} // function otel_trace_enabled + +bool should_otel_trace_query(const char* sql) { + DCHECK(sql != nullptr) << "SQL statement cannot be null."; + return boost::algorithm::istarts_with(sql, "select "); +} // function should_otel_trace_query + +// Initializes an OtlpHttpExporter instance with configuration from global flags. The +// OtlpHttpExporter instance implements the SpanExporter interface. The function parameter +// `exporter` is an in-out parameter that will be populated with the created +// OtlpHttpExporter instance. Returns Status::OK() on success, or an error Status if +// configuration fails. +static Status init_exporter_http(unique_ptr<SpanExporter>& exporter) { + // Configure OTLP HTTP exporter + OtlpHttpExporterOptions opts; + opts.url = FLAGS_otel_trace_collector_url; + opts.content_type = HttpRequestContentType::kJson; + opts.timeout = chrono::seconds(FLAGS_otel_trace_timeout_s); + opts.console_debug = FLAGS_otel_debug; + + // Retry settings + opts.retry_policy_max_attempts = FLAGS_otel_trace_retry_policy_max_attempts; + opts.retry_policy_initial_backoff = + chrono::duration<float>(FLAGS_otel_trace_retry_policy_initial_backoff_s); + if (FLAGS_otel_trace_retry_policy_max_backoff_s > 0) { + opts.retry_policy_max_backoff = chrono::duration<float>( + chrono::seconds(FLAGS_otel_trace_retry_policy_max_backoff_s)); + } + opts.retry_policy_backoff_multiplier = FLAGS_otel_trace_retry_policy_backoff_multiplier; + + // Compression Type + if (FLAGS_otel_trace_compression) { + opts.compression = "zlib"; + } + + // TLS Configurations + if (otel_tls_enabled()) { + if (FLAGS_otel_trace_tls_minimum_version.empty()) { + // Set minimum TLS version to the value of the global ssl_minimum_version flag. + // Since this flag is in the format "tlv1.2" or "tlsv1.3", we need to + // convert it to the format expected by OtlpHttpExporterOptions. + const string min_ssl_ver = to_lower_copy(trim_copy(FLAGS_ssl_minimum_version)); + + if (!min_ssl_ver.empty() && min_ssl_ver.rfind("tlsv", 0) != 0) { + return Status("ssl_minimum_version must start with 'tlsv'"); + } + + opts.ssl_min_tls = min_ssl_ver.substr(4); // Remove "tlsv" prefix + } else { + opts.ssl_min_tls = FLAGS_otel_trace_tls_minimum_version; + } + + opts.ssl_insecure_skip_verify = FLAGS_otel_trace_tls_insecure_skip_verify; + opts.ssl_ca_cert_path = FLAGS_otel_trace_ca_cert_path; + opts.ssl_ca_cert_string = FLAGS_otel_trace_ca_cert_string; + opts.ssl_max_tls = "1.3"; + opts.ssl_cipher = FLAGS_otel_trace_ssl_ciphers.empty() ? FLAGS_ssl_cipher_list : + FLAGS_otel_trace_ssl_ciphers; + opts.ssl_cipher_suite = FLAGS_otel_trace_tls_cipher_suites.empty() ? + FLAGS_tls_ciphersuites : FLAGS_otel_trace_tls_cipher_suites; + } + + // Additional HTTP headers + if (!FLAGS_otel_trace_additional_headers.empty()) { + for (auto header : strings::Split(FLAGS_otel_trace_additional_headers, ":::")) { + auto pos = header.find('='); + const string key = trim_copy(header.substr(0, pos).as_string()); + const string value = trim_copy(header.substr(pos + 1).as_string()); + + VLOG(2) << "Adding additional OTel header: " << key << " = " << value; + opts.http_headers.emplace(key, value); + } + } + + if (FLAGS_otel_debug) { + opentelemetry::v1::exporter::otlp::OtlpHttpExporterRuntimeOptions runtime_opts; + runtime_opts.thread_instrumentation = + make_shared<LoggingInstrumentation>("http_exporter"); + exporter = OtlpHttpExporterFactory::Create(opts, runtime_opts); + } else { + exporter = OtlpHttpExporterFactory::Create(opts); + } + + return Status::OK(); +} // function init_exporter_http + +// Initializes an OtlpFileExporter instance with configuration from global flags. The +// OtlpFileExporter instance implements the SpanExporter interface. Returns a unique_ptr +// which will always be initialized with the created OtlpHttpExporter instance. +// +// The file exporter is for test use only. +static unique_ptr<SpanExporter> init_exporter_file() { + OtlpFileClientFileSystemOptions file_client_opts; + + file_client_opts.file_pattern = FLAGS_otel_file_pattern; + file_client_opts.alias_pattern = FLAGS_otel_file_alias_pattern; + file_client_opts.flush_interval = chrono::microseconds(chrono::milliseconds( + FLAGS_otel_file_flush_interval_ms)); + file_client_opts.flush_count = FLAGS_otel_file_flush_count; + file_client_opts.file_size = FLAGS_otel_file_max_file_size; + file_client_opts.rotate_size = FLAGS_otel_file_max_file_count; + + OtlpFileExporterOptions exporter_opts; + exporter_opts.backend_options = file_client_opts; + exporter_opts.console_debug = FLAGS_otel_debug; + + return OtlpFileExporterFactory::Create(exporter_opts); +} // function init_exporter_file + +// Initializes the OpenTelemetry Tracer singleton with the configuration defined in the +// coordinator startup flags. Returns Status::OK() on success, or an error Status if +// configuration fails. +Status init_otel_tracer() { + LOG(INFO) << "Initializing OpenTelemetry tracing."; + VLOG(2) << "OpenTelemetry version: " << OPENTELEMETRY_VERSION; + VLOG(2) << "OpenTelemetry ABI version: " << OPENTELEMETRY_ABI_VERSION; + VLOG(2) << "OpenTelemetry namespace: " + << OPENTELEMETRY_STRINGIFY(OPENTELEMETRY_NAMESPACE); + + unique_ptr<SpanExporter> exporter; + + if(FLAGS_otel_trace_exporter == OTEL_EXPORTER_OTLP_HTTP) { + RETURN_IF_ERROR(init_exporter_http(exporter)); + } else { + exporter = init_exporter_file(); + } + VLOG(2) << "OpenTelemetry exporter: " << FLAGS_otel_trace_exporter; + + // Set up tracer provider + unique_ptr<SpanProcessor> processor; + + if (boost::iequals(trim_copy(FLAGS_otel_trace_span_processor), SPAN_PROCESSOR_BATCH)) { + VLOG(2) << "Using BatchSpanProcessor for OpenTelemetry spans"; + BatchSpanProcessorOptions batch_opts; + + batch_opts.max_queue_size = FLAGS_otel_trace_batch_queue_size; + batch_opts.max_export_batch_size = FLAGS_otel_trace_batch_max_batch_size; + batch_opts.schedule_delay_millis = + chrono::milliseconds(FLAGS_otel_trace_batch_schedule_delay_ms); + + if (FLAGS_otel_debug) { + BatchSpanProcessorRuntimeOptions runtime_opts; + runtime_opts.thread_instrumentation = + make_shared<LoggingInstrumentation>("batch_span_processor"); + processor = BatchSpanProcessorFactory::Create(move(exporter), batch_opts, + runtime_opts); + } else { + processor = BatchSpanProcessorFactory::Create(move(exporter), batch_opts); + } + } else { + VLOG(2) << "Using SimpleSpanProcessor for OTel spans"; + processor = make_unique<SimpleSpanProcessor>(move(exporter)); + } + + provider_ = TracerProviderFactory::Create(move(processor), + sdk::resource::Resource::Create({ + {"service.name", "Impala"}, + {"service.version", GetDaemonBuildVersion()} + })); + + return Status::OK(); +} // function init_otel_tracer + +void shutdown_otel_tracer() { + LOG(INFO) << "Shutting down OpenTelemetry tracing."; + DCHECK(provider_) << "OpenTelemetry tracer was not initialized."; + + // Force a reset of the provider_ shared_ptr to ensure that the + // TracerProvider destructor is called, which will flush any remaining spans. + provider_.reset(); +} + +shared_ptr<SpanManager> build_span_manager(const ClientRequestState* crs) { + DCHECK(provider_) << "OpenTelemetry tracer was not initialized."; + + return make_shared<SpanManager>( + provider_->GetTracer(SCOPE_SPAN_NAME, SCOPE_SPAN_SPEC_VERSION), crs); +} // function build_span_manager + +} // namespace impala \ No newline at end of file diff --git a/be/src/observe/otel.h b/be/src/observe/otel.h new file mode 100644 index 000000000..35e79a810 --- /dev/null +++ b/be/src/observe/otel.h @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> +#include <string> + +#include "common/status.h" +#include "observe/span-manager.h" +#include "service/client-request-state.h" + +namespace impala { + +// Version of the spec for representing Impala queries as OpenTelemetry traces. +const std::string SCOPE_SPAN_SPEC_VERSION = "1.0.0"; + +// Constants representing the supported OpenTelemetry exporters. +const std::string OTEL_EXPORTER_OTLP_HTTP = "otlp_http"; +const std::string OTEL_EXPORTER_FILE = "file"; + +// Constants representing the supported OpenTelemetry span processor implementations. +const std::string SPAN_PROCESSOR_SIMPLE = "simple"; +const std::string SPAN_PROCESSOR_BATCH = "batch"; + +// Returns true if OpenTelemetry tracing is enabled, false otherwise. +bool otel_trace_enabled(); + +// Returns true if an OpenTelemetry trace needs to be created for the given SQL query. +bool should_otel_trace_query(const char* sql); + +// Initializes the OpenTelemetry tracer with the configuration defined in the coordinator +// startup flags (see otel-flags.cc and otel-flags-trace.cc for the list). Does not verify +// that OpenTelemetry tracing is enabled (otel_trace_enabled flag). +Status init_otel_tracer(); + +// Force flushes any buffered spans and shuts down the OpenTelemetry tracer. +void shutdown_otel_tracer(); + +// Builds a SpanManager instance for the given query. +std::shared_ptr<SpanManager> build_span_manager(const ClientRequestState*); + +} // namespace impala diff --git a/be/src/observe/span-manager.cc b/be/src/observe/span-manager.cc new file mode 100644 index 000000000..3b4218937 --- /dev/null +++ b/be/src/observe/span-manager.cc @@ -0,0 +1,473 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "observe/span-manager.h" + +#include <chrono> +#include <memory> +#include <mutex> +#include <string> +#include <utility> + +#include <gflags/gflags.h> +#include <glog/logging.h> +#include "gutil/strings/substitute.h" +#include <opentelemetry/nostd/shared_ptr.h> +#include <opentelemetry/trace/span_metadata.h> +#include <opentelemetry/trace/scope.h> +#include <opentelemetry/trace/tracer.h> + +#include "common/compiler-util.h" +#include "gen-cpp/Types_types.h" +#include "observe/timed-span.h" +#include "scheduling/admission-control-client.h" +#include "service/client-request-state.h" +#include "util/debug-util.h" + +using namespace opentelemetry; +using namespace std; + +DECLARE_string(cluster_id); +DECLARE_int32(otel_trace_retry_policy_max_attempts); +DECLARE_int32(otel_trace_retry_policy_max_backoff_s); + +// Names of attributes only on Root spans. +static constexpr char const* ATTR_ERROR_MESSAGE = "ErrorMessage"; +static constexpr char const* ATTR_QUERY_START_TIME = "QueryStartTime"; +static constexpr char const* ATTR_RETRIED_QUERY_ID = "RetriedQueryId"; +static constexpr char const* ATTR_RUNTIME = "Runtime"; +static constexpr char const* ATTR_STATE = "State"; + +// Names of attributes on both Root and one or more child spans. +static constexpr char const* ATTR_CLUSTER_ID = "ClusterId"; +static constexpr char const* ATTR_ORIGINAL_QUERY_ID = "OriginalQueryId"; +static constexpr char const* ATTR_QUERY_ID = "QueryId"; +static constexpr char const* ATTR_QUERY_TYPE = "QueryType"; +static constexpr char const* ATTR_REQUEST_POOL = "RequestPool"; +static constexpr char const* ATTR_SESSION_ID = "SessionId"; +static constexpr char const* ATTR_USER_NAME = "UserName"; + +// Names of attributes common to all child spans. +static constexpr char const* ATTR_BEGIN_TIME = "BeginTime"; +static constexpr char const* ATTR_ELAPSED_TIME = "ElapsedTime"; +static constexpr char const* ATTR_ERROR_MSG = "ErrorMsg"; +static constexpr char const* ATTR_NAME = "Name"; +static constexpr char const* ATTR_RUNNING = "Running"; +static constexpr char const* ATTR_STATUS = "Status"; + +// Names of attributes only on Init child spans. +static constexpr char const* ATTR_DEFAULT_DB = "DefaultDb"; +static constexpr char const* ATTR_QUERY_STRING = "QueryString"; + +// Names of attributes only on Admission Control child spans. +static constexpr char const* ATTR_ADM_RESULT = "AdmissionResult"; +static constexpr char const* ATTR_QUEUED = "Queued"; + +// Names of attributes only on Query Execution child spans. +static constexpr char const* ATTR_NUM_DELETED_ROWS = "NumDeletedRows"; +static constexpr char const* ATTR_NUM_MODIFIED_ROWS = "NumModifiedRows"; +static constexpr char const* ATTR_NUM_ROWS_FETCHED = "NumRowsFetched"; + +// Names of the child spans. +static constexpr char const* CHILD_SPAN_NAMES[] = { + "None", "Init", "Submitted", "Planning", "AdmissionControl", "QueryExecution", + "Close"}; + +#define DCHECK_CHILD_SPAN_TYPE(expected_type) \ + DCHECK(child_span_type_ == expected_type) << "Expected child span type '" \ + << expected_type << "' but received '" << child_span_type_ << "' instead." +// macro DCHECK_CHILD_SPAN_TYPE + +namespace impala { + +// Helper function to convert ChildSpanType enum values to strings. +static inline string to_string(const ChildSpanType& val) { + return CHILD_SPAN_NAMES[static_cast<int>(val)]; +} // function to_string + +// Helper functions to stream the string representation of ChildSpanType enum values. +static inline ostream& operator<<(ostream& out, const ChildSpanType& val) { + out << to_string(val); + return out; +} // operator<< + +// Helper function to log the start and end of a span with debug information. Callers +// must hold the child_span_mu_ lock when calling this function. +static inline void debug_log_span(const TimedSpan* span, const string& span_name, + const string& query_id, bool started) { + DCHECK(span != nullptr) << "Cannot log null span."; + + if (LIKELY(span != nullptr)) { + VLOG(2) << strings::Substitute("$0 '$1' span trace_id=\"$2\" span_id=\"$3\" " + "query_id=\"$4\"", (started ? "Started" : "Ended"), span_name, span->TraceId(), + span->SpanId(), query_id); + } else { + LOG(WARNING) << "Attempted to log span '" << span_name << "' but provided span is " + "null."; + } +} // function debug_log_span + +SpanManager::SpanManager(nostd::shared_ptr<trace::Tracer> tracer, + const ClientRequestState* client_request_state) : tracer_(std::move(tracer)), + client_request_state_(client_request_state), + query_id_(PrintId(client_request_state_->query_id())) { + child_span_type_ = ChildSpanType::NONE; + + DCHECK(client_request_state_ != nullptr) << "Cannot start root span without a valid " + "client request state."; + + root_ = make_shared<TimedSpan>(tracer_, query_id_, ATTR_QUERY_START_TIME, ATTR_RUNTIME, + OtelAttributesMap{ + {ATTR_CLUSTER_ID, FLAGS_cluster_id}, + {ATTR_QUERY_ID, query_id_}, + {ATTR_REQUEST_POOL, client_request_state_->request_pool()}, + {ATTR_SESSION_ID, PrintId(client_request_state_->session_id())}, + {ATTR_USER_NAME, client_request_state_->effective_user()} + }, + trace::SpanKind::kServer); + + scope_ = make_unique<trace::Scope>(root_->SetActive()); + debug_log_span(root_.get(), "Root", query_id_, true); +} // ctor + +SpanManager::~SpanManager() { + lock_guard<mutex> l(child_span_mu_); + + root_->End(); + debug_log_span(root_.get(), "Root", query_id_, false); + LOG(INFO) << strings::Substitute("Closed OpenTelemetry trace with trace_id=\"$0\" " + "span_id=\"$1\"", root_->TraceId(), root_->SpanId()); + + scope_.reset(); + root_.reset(); + + tracer_->Close(chrono::seconds(FLAGS_otel_trace_retry_policy_max_backoff_s * + FLAGS_otel_trace_retry_policy_max_attempts)); +} // dtor + +void SpanManager::AddChildSpanEvent(const nostd::string_view& name) { + lock_guard<mutex> l(child_span_mu_); + + if (LIKELY(current_child_)) { + current_child_->AddEvent(name); + VLOG(2) << strings::Substitute("Adding event named '$0' to child span '$1' " + "trace_id=\"$2\" span_id=\"$3", name.data(), to_string(child_span_type_), + current_child_->TraceId(), current_child_->SpanId()); + } else { + LOG(WARNING) << strings::Substitute("Attempted to add event '$0' with no active " + "child span trace_id=\"$1\" span_id=\"$2\"\n$3", name.data(), root_->TraceId(), + root_->SpanId(), GetStackTrace()); + DCHECK(current_child_) << "Cannot add event when child span is not active."; + } +} // function AddChildSpanEvent + +void SpanManager::StartChildSpanInit() { + lock_guard<mutex> l(child_span_mu_); + ChildSpanBuilder(ChildSpanType::INIT, + { + {ATTR_CLUSTER_ID, FLAGS_cluster_id}, + {ATTR_DEFAULT_DB, client_request_state_->default_db()}, + {ATTR_QUERY_ID, query_id_}, + {ATTR_QUERY_STRING, client_request_state_->redacted_sql()}, + {ATTR_REQUEST_POOL, client_request_state_->request_pool()}, + {ATTR_SESSION_ID, PrintId(client_request_state_->session_id())}, + {ATTR_USER_NAME, client_request_state_->effective_user()} + }); +} // function StartChildSpanInit + +void SpanManager::EndChildSpanInit() { + lock_guard<mutex> l(child_span_mu_); + DoEndChildSpanInit(); +} // function EndChildSpanInit + +void SpanManager::DoEndChildSpanInit(const Status* cause) { + DCHECK_CHILD_SPAN_TYPE(ChildSpanType::INIT); + + EndChildSpan( + cause, + OtelAttributesMap{ + {ATTR_ORIGINAL_QUERY_ID, (client_request_state_->IsRetriedQuery() ? + PrintId(client_request_state_->original_id()) : "")} + }); +} // function DoEndChildSpanInit + +void SpanManager::StartChildSpanSubmitted() { + lock_guard<mutex> l(child_span_mu_); + ChildSpanBuilder(ChildSpanType::SUBMITTED); +} // function StartChildSpanSubmitted + +void SpanManager::EndChildSpanSubmitted() { + lock_guard<mutex> l(child_span_mu_); + DoEndChildSpanSubmitted(); +} // function EndChildSpanSubmitted + +void SpanManager::DoEndChildSpanSubmitted(const Status* cause) { + DCHECK_CHILD_SPAN_TYPE(ChildSpanType::SUBMITTED); + EndChildSpan(cause); +} // function DoEndChildSpanSubmitted + +void SpanManager::StartChildSpanPlanning() { + lock_guard<mutex> l(child_span_mu_); + ChildSpanBuilder(ChildSpanType::PLANNING); +} // function StartChildSpanPlanning + +void SpanManager::EndChildSpanPlanning() { + lock_guard<mutex> l(child_span_mu_); + DoEndChildSpanPlanning(); +} // function EndChildSpanPlanning + +void SpanManager::DoEndChildSpanPlanning(const Status* cause) { + DCHECK_CHILD_SPAN_TYPE(ChildSpanType::PLANNING); + EndChildSpan( + cause, + OtelAttributesMap{ + {ATTR_QUERY_TYPE, to_string(client_request_state_->exec_request().stmt_type)} + }); +} // function DoEndChildSpanPlanning + +void SpanManager::StartChildSpanAdmissionControl() { + lock_guard<mutex> l(child_span_mu_); + ChildSpanBuilder(ChildSpanType::ADMISSION_CONTROL, + {{ATTR_REQUEST_POOL, client_request_state_->request_pool()}}); +} // function StartChildSpanAdmissionControl + +void SpanManager::EndChildSpanAdmissionControl() { + lock_guard<mutex> l(child_span_mu_); + DoEndChildSpanAdmissionControl(); +} // function EndChildSpanAdmissionControl + +void SpanManager::DoEndChildSpanAdmissionControl(const Status* cause) { + if (IsClosing()) { + // If we are already closing, silently return as some cases (such as FIRST_FETCH) + // will end the admission control phase even though the query already finished. + return; // <-- EARLY RETURN + } + + DCHECK_CHILD_SPAN_TYPE(ChildSpanType::ADMISSION_CONTROL); + + const bool was_queued = client_request_state_->admission_control_client()->WasQueued(); + + EndChildSpan( + cause, + OtelAttributesMap{ + {ATTR_QUEUED, was_queued}, + {ATTR_ADM_RESULT, + *client_request_state_->summary_profile()->GetInfoString("Admission result")} + }); +} // function DoEndChildSpanAdmissionControl + +void SpanManager::StartChildSpanQueryExecution() { + lock_guard<mutex> l(child_span_mu_); + + if (IsClosing()) { + // If we are already closing, silently return as some cases (such as FIRST_FETCH) + // will start the query execution phase even though the query already failed. + return; // <-- EARLY RETURN + } + + ChildSpanBuilder(ChildSpanType::QUERY_EXEC, {}, true); +} // function StartChildSpanQueryExecution + +void SpanManager::EndChildSpanQueryExecution() { + lock_guard<mutex> l(child_span_mu_); + DoEndChildSpanQueryExecution(); +} // function EndChildSpanQueryExecution + +void SpanManager::DoEndChildSpanQueryExecution(const Status* cause) { + if (IsClosing()) { + // If we are already closing, silently return as some cases (such as FIRST_FETCH) + // will end the query execution phase even though the query already failed. + return; // <-- EARLY RETURN + } + + DCHECK_CHILD_SPAN_TYPE(ChildSpanType::QUERY_EXEC); + OtelAttributesMap attrs; + + if (client_request_state_->exec_request().stmt_type == TStmtType::QUERY) { + attrs.emplace(ATTR_NUM_DELETED_ROWS, static_cast<int64_t>(0)); + attrs.emplace(ATTR_NUM_MODIFIED_ROWS, static_cast<int64_t>(0)); + } else { + attrs.emplace(ATTR_NUM_DELETED_ROWS, static_cast<int64_t>(-1)); + attrs.emplace(ATTR_NUM_MODIFIED_ROWS, static_cast<int64_t>(-1)); + } + + attrs.emplace(ATTR_NUM_ROWS_FETCHED, client_request_state_->num_rows_fetched()); + + EndChildSpan(cause, attrs); +} // function DoEndChildSpanQueryExecution + +void SpanManager::StartChildSpanClose(const Status* cause) { + lock_guard<mutex> l(child_span_mu_); + + // In an error scenario, another child span may still be active since the normal code + // path was interrupted and thus the correct end child span function was not called. + // In this case, we must first end the current child span. + EndActiveChildSpan(cause); + + ChildSpanBuilder(ChildSpanType::CLOSE); +} // function StartChildSpanClose + +void SpanManager::EndChildSpanClose() { + lock_guard<mutex> l(child_span_mu_); + DCHECK_CHILD_SPAN_TYPE(ChildSpanType::CLOSE); + EndChildSpan(); + + // Set all root span attributes to avoid dereferencing the client_request_state_ in the + // dtor (as the dtor is invoked when client_request_state_ is destroyed). + root_->SetAttribute(ATTR_QUERY_TYPE, + to_string(client_request_state_->exec_request().stmt_type)); + + if (client_request_state_->query_status().ok()) { + root_->SetAttributeEmpty(ATTR_ERROR_MESSAGE); + } else { + string error_msg = client_request_state_->query_status().msg().msg(); + + for (const auto& detail : client_request_state_->query_status().msg().details()) { + error_msg += "\n" + detail; + } + + root_->SetAttribute(ATTR_ERROR_MESSAGE, error_msg); + } + + if (UNLIKELY(client_request_state_->WasRetried())) { + root_->SetAttribute(ATTR_STATE, ClientRequestState::RetryStateToString( + client_request_state_->retry_state())); + root_->SetAttribute(ATTR_RETRIED_QUERY_ID, + PrintId(client_request_state_->retried_id())); + } else { + root_->SetAttribute(ATTR_STATE, + ClientRequestState::ExecStateToString(client_request_state_->exec_state())); + root_->SetAttributeEmpty(ATTR_RETRIED_QUERY_ID); + } + + if (UNLIKELY(client_request_state_->IsRetriedQuery())) { + root_->SetAttribute(ATTR_ORIGINAL_QUERY_ID, + PrintId(client_request_state_->original_id())); + } else { + root_->SetAttributeEmpty(ATTR_ORIGINAL_QUERY_ID); + } +} // function EndChildSpanClose + +void SpanManager::ChildSpanBuilder(const ChildSpanType& span_type, + OtelAttributesMap&& additional_attributes, bool running) { + DCHECK(client_request_state_ != nullptr) << "Cannot start child span without a valid " + "client request state."; + DCHECK(span_type != ChildSpanType::NONE) << "Span type cannot be " << span_type << "."; + + if (UNLIKELY(current_child_)) { + LOG(WARNING) << strings::Substitute("Attempted to start child span '$0' while " + "another child span '$1' is still active trace_id=\"$2\" span_id=\"$3\"\n$4", + to_string(span_type), to_string(child_span_type_), root_->TraceId(), + root_->SpanId(), GetStackTrace()); + DCHECK(!current_child_) << "Cannot start a new child span while one is already " + << "active."; + + EndActiveChildSpan(); + } + + const string full_span_name = query_id_ + " - " + to_string(span_type); + + additional_attributes.insert_or_assign(ATTR_NAME, full_span_name); + additional_attributes.insert_or_assign(ATTR_RUNNING, running); + + current_child_ = make_unique<TimedSpan>(tracer_, full_span_name, ATTR_BEGIN_TIME, + ATTR_ELAPSED_TIME, std::move(additional_attributes), trace::SpanKind::kInternal, + root_); + child_span_type_ = span_type; + + debug_log_span(current_child_.get(), to_string(span_type), query_id_, true); +} // function ChildSpanBuilder + +void SpanManager::EndChildSpan(const Status* cause, + const OtelAttributesMap& additional_attributes) { + DCHECK(client_request_state_ != nullptr) << "Cannot end child span without a valid " + "client request state."; + + + if (LIKELY(current_child_)) { + for (const auto& a : additional_attributes) { + current_child_->SetAttribute(a.first, a.second); + } + + current_child_->SetAttribute(ATTR_STATUS, + ClientRequestState::ExecStateToString(client_request_state_->exec_state())); + + const Status* query_status; + if (cause != nullptr) { + query_status = cause; + } else { + query_status = &client_request_state_->query_status(); + } + + if (query_status->ok()) { + current_child_->SetAttributeEmpty(ATTR_ERROR_MSG); + } else { + string error_msg = query_status->msg().msg(); + + for (const auto& detail : query_status->msg().details()) { + error_msg += "\n" + detail; + } + + current_child_->SetAttribute(ATTR_ERROR_MSG, error_msg); + } + + current_child_->End(); + + debug_log_span(current_child_.get(), to_string(child_span_type_), query_id_, false); + + current_child_.reset(); + child_span_type_ = ChildSpanType::NONE; + } else { + LOG(WARNING) << strings::Substitute("Attempted to end a non-active child span " + "trace_id=\"$0\" span_id=\"$1\"\n$2", root_->TraceId(), root_->SpanId(), + GetStackTrace()); + DCHECK(current_child_) << "Cannot end child span when one is not active."; + } +} // function EndChildSpan + +void SpanManager::EndActiveChildSpan(const Status* cause) { + switch (child_span_type_) { + case ChildSpanType::INIT: + DoEndChildSpanInit(cause); + break; + case ChildSpanType::SUBMITTED: + DoEndChildSpanSubmitted(cause); + break; + case ChildSpanType::PLANNING: + DoEndChildSpanPlanning(cause); + break; + case ChildSpanType::ADMISSION_CONTROL: + DoEndChildSpanAdmissionControl(cause); + break; + case ChildSpanType::QUERY_EXEC: + DoEndChildSpanQueryExecution(cause); + break; + case ChildSpanType::CLOSE: + // If we are already in a Close child span, we cannot start a new one. + LOG(WARNING) << "Attempted to start Close child span while another Close child " + "span is already active trace_id=\"$0\" span_id=\"$1\"\n$2", root_->TraceId(), + current_child_->SpanId(), GetStackTrace(); + DCHECK(false) << "Cannot start a new Close child span while a Close child span is " + "already active."; + break; + default: + // No-op, no active child span to end. + break; + } +} // function EndActiveChildSpan + +} // namespace impala \ No newline at end of file diff --git a/be/src/observe/span-manager.h b/be/src/observe/span-manager.h new file mode 100644 index 000000000..969a11f40 --- /dev/null +++ b/be/src/observe/span-manager.h @@ -0,0 +1,167 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> +#include <mutex> +#include <ostream> +#include <string> +#include <utility> + +#include <opentelemetry/nostd/shared_ptr.h> +#include <opentelemetry/trace/scope.h> +#include <opentelemetry/trace/tracer.h> + +#include "common/status.h" +#include "observe/timed-span.h" + +namespace impala { + +// Forward declaration to break cyclical imports. +class ClientRequestState; + +// Enum defining the child span types. +enum class ChildSpanType { + NONE = 0, + INIT = 1, + SUBMITTED = 2, + PLANNING = 3, + ADMISSION_CONTROL = 4, + QUERY_EXEC = 5, + CLOSE = 6 +}; + +// Manages the root and child spans for a single query. Provides convenience methods to +// start each child span with the appropriate name/attributes and to end each child span. +// Only one child span can be active at a time. +// +// The root span is started when the SpanManager is constructed and ended when the +// SpanManager is destructed. The root span is made the active span for the duration of +// the SpanManager's lifetime. +class SpanManager { +public: + SpanManager( + opentelemetry::nostd::shared_ptr<opentelemetry::trace::Tracer> tracer, + const ClientRequestState* client_request_state); + ~SpanManager(); + + // Adds an event to the currently active child span. If no child span is active, + // logs a warning and does nothing else. The event time is set to the current time. + // Parameters: + // name -- The name of the event to add. + void AddChildSpanEvent(const opentelemetry::nostd::string_view& name); + + // Functions to start child spans. If another child span is active, it will be ended, + // a warning will be logged, and a DCHECK failed. + void StartChildSpanInit(); + void StartChildSpanSubmitted(); + void StartChildSpanPlanning(); + void StartChildSpanAdmissionControl(); + void StartChildSpanQueryExecution(); + + // Starts the Close child span. If another child span is active, closes that span first + // without logging and warnings or failing any DCHECKs. This behavior is different from + // other start child span functions since the Close span will be started in error + // scenarios where another child span may not have been closed due to exiting the code + // happy path early. + // + // Parameters: + // cause - Pointer to the Status that caused the Close span to be started. If null, + // the query status from the client_request_state_ will be used. This + // parameter exists for error scenarios where the query status in the + // client_request_state_ will not yet be updated with the error. + void StartChildSpanClose(const Status* cause = nullptr); + + // Functions to end child spans. If no child span is active, logs a warning and does + // nothing else. + void EndChildSpanInit(); + void EndChildSpanSubmitted(); + void EndChildSpanPlanning(); + void EndChildSpanAdmissionControl(); + void EndChildSpanQueryExecution(); + void EndChildSpanClose(); + +private: + // Tracer instance used to construct spans. + opentelemetry::nostd::shared_ptr<opentelemetry::trace::Tracer> tracer_; + + // Scope to make the root span active and to deactivate the root span when it finishes. + std::unique_ptr<opentelemetry::trace::Scope> scope_; + + // ClientRequestState for the query this SpanManager is tracking. + const ClientRequestState* client_request_state_; + + // Convenience constant string the string representation of the Query ID for the query + // this SpanManager is tracking. + const std::string query_id_; + + // TimedSpan instances for the root span. Only modified in the ctor, dtor, and + // EndChildSpanClose functions. + std::shared_ptr<TimedSpan> root_; + + // TimedSpan instance for the current child span and the mutex to protect it. + std::unique_ptr<TimedSpan> current_child_; + std::mutex child_span_mu_; + + // Span type of the current childspan. Will be ChildSpanType::NONE if no child span is + // active. + ChildSpanType child_span_type_; + + // Helper method that builds a child span and populates it with common attributes plus + // the specified additional attributes. Does not take ownership of the child span mutex. + // Callers must already hold the child_span_mu_ lock. + void ChildSpanBuilder(const ChildSpanType& span_type, + OtelAttributesMap&& additional_attributes = {}, bool running = false); + + // Internal helper functions to perform the actual work of ending child spans. + // Callers must already hold the child_span_mu_ lock. + // + // Parameters: + // cause - See comments on StartChildSpanClose(). + void DoEndChildSpanInit(const Status* cause = nullptr); + void DoEndChildSpanSubmitted(const Status* cause = nullptr); + void DoEndChildSpanPlanning(const Status* cause = nullptr); + void DoEndChildSpanAdmissionControl(const Status* cause = nullptr); + void DoEndChildSpanQueryExecution(const Status* cause = nullptr); + + // Properly closes the active child span by calling the appropriate End method for the + // active child span type. If no child span is active, does nothing. + // Callers must already hold the child_span_mu_ lock. + // + // Parameters: + // cause - See comments on StartChildSpanClose(). + void EndActiveChildSpan(const Status* cause = nullptr); + + // Helper method to end a child span and populate its common attributes. + // Callers must already hold the child_span_mu_ lock. + // + // Parameters: + // cause - See comments on StartChildSpanClose(). + // additional_attributes - Span specific attributes that will be set on the span + // before ending it. + void EndChildSpan(const Status* cause = nullptr, + const OtelAttributesMap& additional_attributes = {}); + + // Returns true if the Close child span is active. + // Callers must already hold the child_span_mu_ lock. + inline bool IsClosing() { + return UNLIKELY(current_child_ && child_span_type_ == ChildSpanType::CLOSE); + } +}; // class SpanManager + +} // namespace impala diff --git a/be/src/observe/timed-span.cc b/be/src/observe/timed-span.cc new file mode 100644 index 000000000..48ed887f6 --- /dev/null +++ b/be/src/observe/timed-span.cc @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "observe/timed-span.h" + +#include <chrono> +#include <iomanip> +#include <sstream> +#include <string> + +#include <glog/logging.h> +#include <opentelemetry/common/attribute_value.h> +#include <opentelemetry/context/context.h> +#include <opentelemetry/nostd/shared_ptr.h> +#include <opentelemetry/nostd/span.h> +#include <opentelemetry/nostd/string_view.h> +#include <opentelemetry/trace/scope.h> +#include <opentelemetry/trace/span_metadata.h> +#include <opentelemetry/trace/span_startoptions.h> +#include <opentelemetry/trace/tracer.h> + +#include "common/compiler-util.h" + +using namespace opentelemetry; +using namespace std; + +namespace impala { + +// Helper function to get the current time in milliseconds since the epoch. +static int64_t get_epoch_milliseconds() { + auto now = chrono::system_clock::now(); + return chrono::duration_cast<chrono::milliseconds>(now.time_since_epoch()).count(); +} // function get_epoch_milliseconds + +// Helper function to convert an OpenTelemetry ID to a string representation. +template <size_t N> +static string id_to_string(const nostd::span<const uint8_t, N> id) { + DCHECK(N == 8 || N == 16) << "id_to_string only supports 8 or 16 byte IDs"; + + ostringstream oss; + for (auto byte : id) { + oss << hex << setw(2) << setfill('0') << static_cast<int>(byte); + } + + return oss.str(); +} // function id_to_string + +TimedSpan::TimedSpan(const nostd::shared_ptr<trace::Tracer>& tracer, + const nostd::string_view& name, const nostd::string_view& start_time_attribute_name, + const nostd::string_view& duration_attribute_name, OtelAttributesMap&& attributes, + trace::SpanKind span_kind, const shared_ptr<TimedSpan>& root) : + start_time_attribute_name_(start_time_attribute_name), + duration_attribute_name_(duration_attribute_name), + start_time_(get_epoch_milliseconds()) { + + trace::StartSpanOptions options; + options.kind = span_kind; + if (root) { + options.parent = root->span_->GetContext(); + } else { + options.parent = context::Context().SetValue(trace::kIsRootSpanKey, true); + } + + attributes.insert_or_assign(start_time_attribute_name_, + static_cast<int64_t>(start_time_)); + + span_ = tracer->StartSpan( + name, + attributes, + options); + + trace_id_ = id_to_string(span_->GetContext().trace_id().Id()); + span_id_ = id_to_string(span_->GetContext().span_id().Id()); +} // constructor TimedSpan + +void TimedSpan::End() { + const int64_t end_time = get_epoch_milliseconds(); + + span_->SetAttribute("EndTime", end_time); + span_->SetAttribute(duration_attribute_name_, (end_time - start_time_)); + + span_->End(); +} // function End + +void TimedSpan::SetAttribute(const nostd::string_view& key, + const common::AttributeValue& value) noexcept { + span_->SetAttribute(key, value); +} // function SetAttribute + +void TimedSpan::SetAttributeEmpty(const nostd::string_view& key) noexcept { + this->SetAttribute(key, ""); +} // function SetAttributeEmpty + +void TimedSpan::AddEvent(const nostd::string_view& name, const OtelAttributesMap& + additional_attributes) noexcept { + span_->AddEvent(name, additional_attributes); +} // function AddEvent + +trace::Scope TimedSpan::SetActive() { + return trace::Tracer::WithActiveSpan(span_); +} // function SetActive + +const string& TimedSpan::SpanId() const { + return span_id_; +} // function SpanId + +const string& TimedSpan::TraceId() const { + return trace_id_; +} // function TraceId + +} // namespace impala \ No newline at end of file diff --git a/be/src/observe/timed-span.h b/be/src/observe/timed-span.h new file mode 100644 index 000000000..bc12398b8 --- /dev/null +++ b/be/src/observe/timed-span.h @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <unordered_map> +#include <string> + +#include <opentelemetry/common/attribute_value.h> +#include <opentelemetry/nostd/shared_ptr.h> +#include <opentelemetry/nostd/string_view.h> +#include <opentelemetry/trace/scope.h> +#include <opentelemetry/trace/span.h> +#include <opentelemetry/trace/span_context.h> +#include <opentelemetry/trace/span_metadata.h> +#include <opentelemetry/trace/tracer.h> + +#include "common/compiler-util.h" + +namespace impala { + +typedef std::unordered_map<opentelemetry::nostd::string_view, + opentelemetry::common::AttributeValue> OtelAttributesMap; + +// Proxy class for an OpenTelemetry Span that automatically adds attributes for start +// time, end time, and total span duration. +class TimedSpan { +public: + // Initializes and starts a new span with the given name and attribute names. + // Parameters: + // tracer -- The OpenTelemetry tracer to use to create the span. Not stored, only used + // during construction. + // name -- The name of the span. + // start_time_attribute_name -- The name of the attribute that contains the span start + // time. + // duration_attribute_name -- The name of the attribute that contains the span + // duration. + // attributes -- A map of attributes to set on the span at creation time. + // span_kind -- The kind of span. Default is INTERNAL. + TimedSpan(const opentelemetry::nostd::shared_ptr<opentelemetry::trace::Tracer>& tracer, + const opentelemetry::nostd::string_view& name, + const opentelemetry::nostd::string_view& start_time_attribute_name, + const opentelemetry::nostd::string_view& duration_attribute_name, + OtelAttributesMap&& attributes, + opentelemetry::trace::SpanKind span_kind = + opentelemetry::trace::SpanKind::kInternal, + const std::shared_ptr<TimedSpan>& root = nullptr); + + // Ends the span and sets the "EndTime", and duration attributes. + void End(); + + // Set any attribute on the underlying span. + void SetAttribute(const opentelemetry::nostd::string_view& key, + const opentelemetry::common::AttributeValue& value) noexcept; + + // Set any attribute on the underlying span with a value of an empty string. + void SetAttributeEmpty(const opentelemetry::nostd::string_view& key) noexcept; + + // Adds an event with the given name to the underlying span. + void AddEvent(const opentelemetry::nostd::string_view& name, + const OtelAttributesMap& additional_attributes = {}) noexcept; + + // Provides a scope that sets this span as the currently active span. + opentelemetry::trace::Scope SetActive(); + + // Returns a string representation of the span id of the underlying span. Will never + // return nullptr. + const std::string& SpanId() const; + + // Returns a string representation of the trace id of the underlying span. Will never + // return nullptr. + const std::string& TraceId() const; + + // Returns the OpenTelemetry span context. + opentelemetry::trace::SpanContext GetContext() const { return span_->GetContext(); } + +private: + const opentelemetry::nostd::string_view start_time_attribute_name_; + const opentelemetry::nostd::string_view duration_attribute_name_; + const int64_t start_time_; + + opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span_; + std::string span_id_; + std::string trace_id_; + +}; // class TimedSpan + +} // namespace impala diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index ee403164b..1f9d0d419 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -16,7 +16,6 @@ // under the License. #include "runtime/coordinator.h" - #include <cerrno> #include <iomanip> #include <list> @@ -40,6 +39,7 @@ #include "gen-cpp/admission_control_service.pb.h" #include "kudu/rpc/rpc_context.h" #include "kudu/rpc/rpc_sidecar.h" +#include "observe/otel.h" #include "runtime/coordinator-backend-state.h" #include "runtime/coordinator-filter-state.h" #include "runtime/debug-options.h" @@ -533,6 +533,9 @@ Status Coordinator::StartBackendExec() { query_events_->MarkEvent(Substitute("Ready to start on $0 backends", num_backends)); parent_query_driver_->SetExecTimeLimit(parent_request_state_); + if (parent_request_state_->otel_trace_query()) { + parent_request_state_->otel_span_manager()->StartChildSpanQueryExecution(); + } // Serialize the TQueryCtx once and pass it to each backend. The serialized buffer must // stay valid until WaitOnExecRpcs() has returned. ThriftSerializer serializer(true); @@ -593,6 +596,10 @@ Status Coordinator::StartBackendExec() { query_events_->MarkEvent( Substitute("All $0 execution backends ($1 fragment instances) started", num_backends, exec_params_.GetNumFragmentInstances())); + + if (parent_request_state_->otel_trace_query()) { + parent_request_state_->otel_span_manager()->AddChildSpanEvent("AllBackendsStarted"); + } return Status::OK(); } @@ -1016,6 +1023,9 @@ Status Coordinator::Wait() { RETURN_IF_ERROR(SetNonErrorTerminalState(ExecState::RETURNED_RESULTS)); query_profile_->AddInfoString( "DML Stats", dml_exec_state_.OutputPartitionStats("\n")); + if (parent_request_state_->otel_trace_query()) { + parent_request_state_->otel_span_manager()->AddChildSpanEvent("LastRowFetched"); + } return Status::OK(); } @@ -1053,6 +1063,9 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos, if (!first_row_fetched_ && results->size() > 0) { query_events_->MarkEvent(Coordinator::PROFILE_EVENT_LABEL_FIRST_ROW_FETCHED); first_row_fetched_ = true; + if (parent_request_state_->otel_trace_query()) { + parent_request_state_->otel_span_manager()->AddChildSpanEvent("FirstRowFetched"); + } } RETURN_IF_ERROR(UpdateExecState( status, &runtime_state->fragment_instance_id(), FLAGS_hostname)); @@ -1470,6 +1483,10 @@ void Coordinator::ReleaseQueryAdmissionControlResources() { admission_control_client->ReleaseQuery( ComputeQueryResourceUtilization().peak_per_host_mem_consumption); query_events_->MarkEvent("Released admission control resources"); + if (parent_request_state_->otel_trace_query()) { + parent_request_state_->otel_span_manager()->AddChildSpanEvent( + "ReleasedAdmissionControlResources"); + } } void Coordinator::ReleaseBackendAdmissionControlResources( diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc index 9bfa4beab..0e32e1ea8 100644 --- a/be/src/runtime/exec-env.cc +++ b/be/src/runtime/exec-env.cc @@ -30,6 +30,7 @@ #include "exec/kudu/kudu-util.h" #include "exprs/ai-functions.h" #include "kudu/rpc/service_if.h" +#include "observe/otel.h" #include "rpc/rpc-mgr.h" #include "runtime/bufferpool/buffer-pool.h" #include "runtime/bufferpool/reservation-tracker.h" @@ -344,6 +345,12 @@ Status ExecEnv::InitForFeSupport() { Status ExecEnv::Init() { LOG(INFO) << "Initializing impalad with backend uuid: " << PrintId(backend_id_); + + // Initialize OTel + if (FLAGS_is_coordinator && otel_trace_enabled()) { + RETURN_IF_ERROR(init_otel_tracer()); + } + // Initialize thread pools if (FLAGS_is_coordinator) { RETURN_IF_ERROR(hdfs_op_thread_pool_->Init()); diff --git a/be/src/runtime/query-driver.cc b/be/src/runtime/query-driver.cc index 810bb9e43..6fb584cc5 100644 --- a/be/src/runtime/query-driver.cc +++ b/be/src/runtime/query-driver.cc @@ -483,6 +483,10 @@ void QueryDriver::CreateRetriedClientRequestState(ClientRequestState* request_st if (retry_exec_request_->__isset.result_set_metadata) { (*retry_request_state)->set_result_metadata(retry_exec_request_->result_set_metadata); } + + if ((*retry_request_state)->otel_trace_query()) { + (*retry_request_state)->otel_span_manager()->EndChildSpanInit(); + } } void QueryDriver::HandleRetryFailure(Status* status, string* error_msg, diff --git a/be/src/scheduling/admission-control-client.h b/be/src/scheduling/admission-control-client.h index 9c077dd2b..b95f1aa3c 100644 --- a/be/src/scheduling/admission-control-client.h +++ b/be/src/scheduling/admission-control-client.h @@ -24,6 +24,7 @@ #include "gen-cpp/Types_types.h" #include "gen-cpp/admission_control_service.pb.h" #include "gen-cpp/common.pb.h" +#include "observe/span-manager.h" #include "scheduling/admission-controller.h" namespace impala { @@ -46,7 +47,7 @@ class AdmissionControlClient { virtual Status SubmitForAdmission(const AdmissionController::AdmissionRequest& request, RuntimeProfile::EventSequence* query_events, std::unique_ptr<QuerySchedulePB>* schedule_result, - int64_t* wait_start_time_ms, int64_t* wait_end_time_ms) = 0; + int64_t* wait_start_time_ms, int64_t* wait_end_time_ms, SpanManager* span_mgr) = 0; // Called when the query has completed to release all of its resources. virtual void ReleaseQuery(int64_t peak_mem_consumption) = 0; @@ -57,6 +58,9 @@ class AdmissionControlClient { // Called to cancel admission for the query. virtual void CancelAdmission() = 0; + + // Indicates if the query was queued or not. + virtual bool WasQueued() const = 0; }; } // namespace impala diff --git a/be/src/scheduling/local-admission-control-client.cc b/be/src/scheduling/local-admission-control-client.cc index 94f9b84f0..ed5ef16f2 100644 --- a/be/src/scheduling/local-admission-control-client.cc +++ b/be/src/scheduling/local-admission-control-client.cc @@ -17,6 +17,7 @@ #include "scheduling/local-admission-control-client.h" +#include "observe/span-manager.h" #include "runtime/exec-env.h" #include "util/runtime-profile-counters.h" #include "util/uid-util.h" @@ -25,7 +26,8 @@ namespace impala { -LocalAdmissionControlClient::LocalAdmissionControlClient(const TUniqueId& query_id) { +LocalAdmissionControlClient::LocalAdmissionControlClient(const TUniqueId& query_id) + : was_queued_(false) { TUniqueIdToUniqueIdPB(query_id, &query_id_); } @@ -33,7 +35,7 @@ Status LocalAdmissionControlClient::SubmitForAdmission( const AdmissionController::AdmissionRequest& request, RuntimeProfile::EventSequence* query_events, std::unique_ptr<QuerySchedulePB>* schedule_result, - int64_t* wait_start_time_ms, int64_t* wait_end_time_ms) { + int64_t* wait_start_time_ms, int64_t* wait_end_time_ms, SpanManager* span_mgr) { ScopedEvent completedEvent( query_events, AdmissionControlClient::QUERY_EVENT_COMPLETED_ADMISSION); query_events->MarkEvent(QUERY_EVENT_SUBMIT_FOR_ADMISSION); @@ -41,6 +43,10 @@ Status LocalAdmissionControlClient::SubmitForAdmission( Status status = ExecEnv::GetInstance()->admission_controller()->SubmitForAdmission( request, &admit_outcome_, schedule_result, queued); if (queued) { + was_queued_ = true; + if (span_mgr != nullptr) { + span_mgr->AddChildSpanEvent(QUERY_EVENT_QUEUED); + } query_events->MarkEvent(QUERY_EVENT_QUEUED); DCHECK(status.ok()); status = ExecEnv::GetInstance()->admission_controller()->WaitOnQueued( diff --git a/be/src/scheduling/local-admission-control-client.h b/be/src/scheduling/local-admission-control-client.h index 7d675272e..9079af722 100644 --- a/be/src/scheduling/local-admission-control-client.h +++ b/be/src/scheduling/local-admission-control-client.h @@ -38,16 +38,22 @@ class LocalAdmissionControlClient : public AdmissionControlClient { virtual Status SubmitForAdmission(const AdmissionController::AdmissionRequest& request, RuntimeProfile::EventSequence* query_events, std::unique_ptr<QuerySchedulePB>* schedule_result, - int64_t* wait_start_time_ms, int64_t* wait_end_time_ms) override; + int64_t* wait_start_time_ms, int64_t* wait_end_time_ms, + SpanManager* span_mgr) override; virtual void ReleaseQuery(int64_t peak_mem_consumption) override; virtual void ReleaseQueryBackends( const std::vector<NetworkAddressPB>& host_addr) override; virtual void CancelAdmission() override; + bool WasQueued() const override { return was_queued_; } + private: // The id of the query being considered for admission. UniqueIdPB query_id_; + // Whether or not the query was queued. + bool was_queued_; + /// Promise used by the admission controller. AdmissionController:SubmitForAdmission() /// will block on this promise until the query is either rejected, admitted, times out, /// or is cancelled. Can be set to CANCELLED by CancelAdmission() in order to cancel, diff --git a/be/src/scheduling/remote-admission-control-client.cc b/be/src/scheduling/remote-admission-control-client.cc index accc06ca1..5c4c09056 100644 --- a/be/src/scheduling/remote-admission-control-client.cc +++ b/be/src/scheduling/remote-admission-control-client.cc @@ -47,7 +47,7 @@ using namespace kudu::rpc; namespace impala { RemoteAdmissionControlClient::RemoteAdmissionControlClient(const TQueryCtx& query_ctx) - : query_ctx_(query_ctx) { + : query_ctx_(query_ctx), was_queued_(false) { TUniqueIdToUniqueIdPB(query_ctx.query_id, &query_id_); } @@ -98,7 +98,7 @@ Status RemoteAdmissionControlClient::SubmitForAdmission( const AdmissionController::AdmissionRequest& request, RuntimeProfile::EventSequence* query_events, std::unique_ptr<QuerySchedulePB>* schedule_result, - int64_t* wait_start_time_ms, int64_t* wait_end_time_ms) { + int64_t* wait_start_time_ms, int64_t* wait_end_time_ms, SpanManager* span_mgr) { ScopedEvent completedEvent( query_events, AdmissionControlClient::QUERY_EVENT_COMPLETED_ADMISSION); @@ -145,7 +145,6 @@ Status RemoteAdmissionControlClient::SubmitForAdmission( KUDU_RETURN_IF_ERROR(admit_rpc_status, "AdmitQuery rpc failed"); RETURN_IF_ERROR(admit_status); - bool is_query_queued = false; while (true) { RpcController rpc_controller2; GetQueryStatusRequestPB get_status_req; @@ -179,9 +178,12 @@ Status RemoteAdmissionControlClient::SubmitForAdmission( break; } - if (!is_query_queued) { + if (!was_queued_) { query_events->MarkEvent(QUERY_EVENT_QUEUED); - is_query_queued = true; + if (span_mgr != nullptr) { + span_mgr->AddChildSpanEvent(QUERY_EVENT_QUEUED); + } + was_queued_ = true; } SleepForMs(FLAGS_admission_status_retry_time_ms); diff --git a/be/src/scheduling/remote-admission-control-client.h b/be/src/scheduling/remote-admission-control-client.h index 48881a5f3..7011fa247 100644 --- a/be/src/scheduling/remote-admission-control-client.h +++ b/be/src/scheduling/remote-admission-control-client.h @@ -54,12 +54,15 @@ class RemoteAdmissionControlClient : public AdmissionControlClient { virtual Status SubmitForAdmission(const AdmissionController::AdmissionRequest& request, RuntimeProfile::EventSequence* query_events, std::unique_ptr<QuerySchedulePB>* schedule_result, - int64_t* wait_start_time_ms, int64_t* wait_end_time_ms) override; + int64_t* wait_start_time_ms, int64_t* wait_end_time_ms, + SpanManager* span_mgr) override; virtual void ReleaseQuery(int64_t peak_mem_consumption) override; virtual void ReleaseQueryBackends( const std::vector<NetworkAddressPB>& host_addr) override; virtual void CancelAdmission() override; + bool WasQueued() const override { return was_queued_; } + private: // Owned by the ClientRequestState. const TQueryCtx& query_ctx_; @@ -78,6 +81,9 @@ class RemoteAdmissionControlClient : public AdmissionControlClient { /// subsequently, it will not send the AdmitQuery rpc bool cancelled_ = false; + // Whether or not the query was queued. + bool was_queued_; + /// Constants related to retrying the idempotent rpcs. static const int RPC_NUM_RETRIES = 3; static const int64_t RPC_TIMEOUT_MS = 10 * MILLIS_PER_SEC; diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc index 673047110..a5b30d36c 100644 --- a/be/src/service/client-request-state.cc +++ b/be/src/service/client-request-state.cc @@ -33,6 +33,8 @@ #include "exprs/timezone_db.h" #include "gen-cpp/Types_types.h" #include "kudu/rpc/rpc_controller.h" +#include "observe/otel.h" +#include "observe/span-manager.h" #include "rpc/rpc-mgr.inline.h" #include "runtime/coordinator.h" #include "runtime/exec-env.h" @@ -121,6 +123,14 @@ ClientRequestState::ClientRequestState(const TQueryCtx& query_ctx, Frontend* fro start_time_us_(UnixMicros()), fetch_rows_timeout_us_(MICROS_PER_MILLI * query_options().fetch_rows_timeout_ms), parent_driver_(query_driver) { + + if (otel_trace_enabled() && should_otel_trace_query(sql_stmt().c_str())) { + // initialize OpenTelemetry for this query + VLOG(2) << "Initializing OpenTelemetry for query " << PrintId(query_id()); + otel_span_manager_ = build_span_manager(this); + otel_span_manager_->StartChildSpanInit(); + } + bool is_external_fe = session_type() == TSessionType::EXTERNAL_FRONTEND; // "Impala Backend Timeline" was specifically chosen to exploit the lexicographical // ordering defined by the underlying std::map holding the timelines displayed in @@ -643,15 +653,25 @@ void ClientRequestState::FinishExecQueryOrDmlRequest() { DCHECK(exec_req.__isset.query_exec_request); UniqueIdPB query_id_pb; TUniqueIdToUniqueIdPB(query_id(), &query_id_pb); + if (otel_trace_query()) { + otel_span_manager_->StartChildSpanAdmissionControl(); + } + Status admit_status = admission_control_client_->SubmitForAdmission( {query_id_pb, ExecEnv::GetInstance()->backend_id(), exec_req.query_exec_request, exec_req.query_options, summary_profile_, blacklisted_executor_addresses_}, - query_events_, &schedule_, &wait_start_time_ms_, &wait_end_time_ms_); + query_events_, &schedule_, &wait_start_time_ms_, &wait_end_time_ms_, + otel_span_manager_.get()); { lock_guard<mutex> l(lock_); if (!UpdateQueryStatus(admit_status).ok()) return; } + + if (otel_trace_query()) { + otel_span_manager_->EndChildSpanAdmissionControl(); + } + DCHECK(schedule_.get() != nullptr); // Note that we don't need to check for cancellation between admission and query // startup. The query was not cancelled right before being admitted and the window here @@ -765,6 +785,9 @@ void ClientRequestState::ExecDdlRequestImpl(bool exec_in_worker_thread) { Status status = catalog_op_executor_->Exec(exec_req.catalog_op_request); query_events_->MarkEvent("CatalogDdlRequest finished"); + if (otel_trace_query()) { + otel_span_manager_->AddChildSpanEvent("UpdateCatalogFinished"); + } AddCatalogTimeline(); { lock_guard<mutex> l(lock_); @@ -1120,6 +1143,11 @@ Status ClientRequestState::ExecEventProcessorCmd() { } void ClientRequestState::Finalize(const Status* cause) { + if (otel_trace_query()) { + // No need to end previous child span since this function takes care of it. + otel_span_manager_->StartChildSpanClose(cause); + } + Cancel(cause, /*wait_until_finalized=*/true); MarkActive(); // Make sure we join on wait_thread_ before we finish (and especially before this object @@ -1174,6 +1202,10 @@ void ClientRequestState::Finalize(const Status* cause) { // Update the timeline here so that all of the above work is captured in the timeline. query_events_->MarkEvent("Unregister query"); UnRegisterRemainingRPCs(); + if (otel_trace_query()) { + otel_span_manager_->AddChildSpanEvent("QueryUnregistered"); + otel_span_manager_->EndChildSpanClose(); + } } Status ClientRequestState::Exec(const TMetadataOpRequest& exec_request) { @@ -1230,6 +1262,9 @@ void ClientRequestState::Wait() { lock_guard<mutex> l(lock_); if (returns_result_set()) { query_events()->MarkEvent("Rows available"); + if (otel_trace_query()) { + otel_span_manager_->AddChildSpanEvent("RowsAvailable"); + } } else { query_events()->MarkEvent("Request finished"); UpdateEndTime(); @@ -1699,6 +1734,9 @@ Status ClientRequestState::UpdateCatalog() { } } query_events_->MarkEvent("DML Metastore update finished"); + if (otel_trace_query()) { + otel_span_manager_->AddChildSpanEvent("MetastoreUpdateFinished"); + } return Status::OK(); } diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h index c4d06c533..eb65da95d 100644 --- a/be/src/service/client-request-state.h +++ b/be/src/service/client-request-state.h @@ -21,6 +21,7 @@ #include "common/object-pool.h" #include "common/status.h" #include "exec/catalog-op-executor.h" +#include "observe/span-manager.h" #include "rpc/rpc-trace.h" #include "service/child-query.h" #include "service/impala-server.h" @@ -542,6 +543,13 @@ class ClientRequestState { void AddClientFetchLockWaitTime(int64_t lock_wait_time_ns) { client_fetch_lock_wait_timer_->Add(lock_wait_time_ns); } + + /// Returns the OpenTelemetry SpanManager for this query. + std::shared_ptr<SpanManager> otel_span_manager() { return otel_span_manager_; } + + /// Returns true if OpenTelemetry tracing is enabled for this query. + inline bool otel_trace_query() const { return otel_span_manager_.get() != nullptr; } + protected: /// Updates the end_time_us_ of this query if it isn't set. The end time is determined /// when this function is called for the first time, calling it multiple times does not @@ -1007,5 +1015,8 @@ class ClientRequestState { /// Try to ask other coordinators to kill query by sending the request. Status TryKillQueryRemotely( const TUniqueId& query_id, const KillQueryRequestPB& request); + + /// SpanManager instance for this query. + std::shared_ptr<SpanManager> otel_span_manager_; }; } diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index 73191965c..ccd9a189b 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -57,6 +57,7 @@ #include "kudu/security/security_flags.h" #include "kudu/util/random_util.h" #include "kudu/util/version_util.h" +#include "observe/otel.h" #include "rpc/authentication.h" #include "rpc/rpc-mgr.h" #include "rpc/rpc-trace.h" @@ -256,6 +257,14 @@ const string SSL_MIN_VERSION_HELP = "The minimum SSL/TLS version that Thrift " "services should use for both client and server connections. Supported versions are " "TLSv1.0, TLSv1.1 and TLSv1.2 (as long as the system OpenSSL library supports them)"; DEFINE_string(ssl_minimum_version, "tlsv1.2", SSL_MIN_VERSION_HELP.c_str()); +DEFINE_validator(ssl_minimum_version, [](const char* flagname, const string& value) { + const std::string trimmed = boost::algorithm::trim_copy(value); + return boost::iequals(trimmed, "tlsv1") + || boost::iequals(trimmed, "tlsv1.0") + || boost::iequals(trimmed, "tlsv1.1") + || boost::iequals(trimmed, "tlsv1.2") + || boost::iequals(trimmed, "tlsv1.3"); +}); DEFINE_int32(idle_session_timeout, 0, "The time, in seconds, that a session may be idle" " for before it is closed (and all running queries cancelled) by Impala. If 0, idle" @@ -1335,6 +1344,11 @@ Status ImpalaServer::ExecuteInternal(const TQueryCtx& query_ctx, // ClientRequestState as well. QueryDriver::CreateNewDriver(this, query_handle, query_ctx, session_state); + if ((*query_handle)->otel_trace_query()) { + (*query_handle)->otel_span_manager()->EndChildSpanInit(); + (*query_handle)->otel_span_manager()->StartChildSpanSubmitted(); + } + bool is_external_req = external_exec_request != nullptr; if (is_external_req && external_exec_request->remote_submit_time) { @@ -1343,6 +1357,11 @@ Status ImpalaServer::ExecuteInternal(const TQueryCtx& query_ctx, (*query_handle)->query_events()->MarkEvent("Query submitted"); + if ((*query_handle)->otel_trace_query()) { + (*query_handle)->otel_span_manager()->EndChildSpanSubmitted(); + (*query_handle)->otel_span_manager()->StartChildSpanPlanning(); + } + { // Keep a lock on query_handle so that registration and setting // result_metadata are atomic. @@ -1391,6 +1410,11 @@ Status ImpalaServer::ExecuteInternal(const TQueryCtx& query_ctx, if (result.__isset.result_set_metadata) { (*query_handle)->set_result_metadata(result.result_set_metadata); } + + if ((*query_handle)->otel_trace_query()) { + (*query_handle)->otel_span_manager()->EndChildSpanPlanning(); + } + } VLOG(2) << "Execution request: " << ThriftDebugString((*query_handle)->exec_request()); @@ -3583,6 +3607,10 @@ bool ImpalaServer::CancelQueriesForGracefulShutdown() { // Clean up temporary files if needed. ExecEnv::GetInstance()->tmp_file_mgr()->CleanupAtShutdown(); + if (FLAGS_is_coordinator && otel_trace_enabled()) { + shutdown_otel_tracer(); + } + // Drain the completed queries queue to the query log table. if (FLAGS_enable_workload_mgmt) { ShutdownWorkloadManagement(); diff --git a/bin/impala-config.sh b/bin/impala-config.sh index 9db35f15a..cbdc27be7 100755 --- a/bin/impala-config.sh +++ b/bin/impala-config.sh @@ -81,13 +81,13 @@ export USE_AVRO_CPP=${USE_AVRO_CPP:=false} # moving to a different build of the toolchain, e.g. when a version is bumped or a # compile option is changed. The build id can be found in the output of the toolchain # build jobs, it is constructed from the build number and toolchain git hash prefix. -export IMPALA_TOOLCHAIN_BUILD_ID_AARCH64=112-7a2bc2334c -export IMPALA_TOOLCHAIN_BUILD_ID_X86_64=548-7a2bc2334c +export IMPALA_TOOLCHAIN_BUILD_ID_AARCH64=108-a38e3142e7 +export IMPALA_TOOLCHAIN_BUILD_ID_X86_64=541-a38e3142e7 export IMPALA_TOOLCHAIN_REPO=\ ${IMPALA_TOOLCHAIN_REPO:-https://github.com/cloudera/native-toolchain.git} export IMPALA_TOOLCHAIN_BRANCH=${IMPALA_TOOLCHAIN_BRANCH:-master} export IMPALA_TOOLCHAIN_COMMIT_HASH=\ -${IMPALA_TOOLCHAIN_COMMIT_HASH-7a2bc2334c55fc662bb45a9d4c77551bcc911271} +${IMPALA_TOOLCHAIN_COMMIT_HASH-a38e3142e70ce74cdf18c3527ece27835adaa58f} # Compare the build ref in build IDs by removing everything up-to-and-including the # first hyphen. if [ "${IMPALA_TOOLCHAIN_BUILD_ID_AARCH64#*-}" \ diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py index 502a53eef..109ac15f6 100644 --- a/tests/common/custom_cluster_test_suite.py +++ b/tests/common/custom_cluster_test_suite.py @@ -671,3 +671,61 @@ class CustomClusterTestSuite(ImpalaTestSuite): """Override ImpalaTestSuite to return 1st impalad of custom cluster. Returns None if no impalad was started.""" return cls.cluster.impalads[0].service if cls.cluster.impalads else None + + def query_id_from_ui(self, section, match_func=None, match_query=None, coord_idx=0): + """ + Calls to the debug UI's queries page and loops over all queries in the specified + section calling the provided func for each query, Returns the string id of the + first query that matches or None if no query matches. + + Parameters: + section: UI section where the query is located, valid values are + "completed_queries" or "in_flight_queries" + match_func: Function that takes a single argument (a dictionary) and returns + True if the query matches, False otherwise. The dictionary parameter + keys mirror the JSON returned by calling the /queries?json endpoint + on a coordinator. If specified, this is used instead of match_query. + match_query: String of the exact query statement to match. If specified, this + is used instead of match_func. + coord_idx: Index of the Impalad to use as the coordinator. This is used to + determine which impalad's UI to query. + + Returns: + String of the query id of the first matching query or None if no query matches. + Fails an assert if no matching query is found. + """ + assert section == "completed_queries" or section == "in_flight_queries" + assert match_func is not None or match_query is not None, \ + "Must specify either match_func or match_query" + assert match_func is None or match_query is None, \ + "Cannot specify both match_func and match_query" + + if match_query is not None: + match_query = match_query.lower().strip() + + service = self.cluster.impalads[coord_idx].service + queries_json = service.get_debug_webpage_json('/queries') + + for query in queries_json[section]: + if (match_query is not None and query["stmt"].lower() == match_query.lower()) \ + or (match_func is not None and match_func(query)): + query_id = query['query_id'] + return query_id, service.read_query_profile_page(query_id) + + assert False, "No matching query found in section '{}'".format(section) + + def query_profile_from_ui(self, query_id, coord_idx=0): + """ + Wrapper function around ImpaladService.read_query_profile_page() to fetch the query + profile for a given query id from the UI of the specified coordinator. + + Parameters: + query_id: String id of the query to fetch the profile for. + coord_idx: Index of the Impalad to use as the coordinator. This is used to + determine which impalad's UI to query. + + Returns: + String of the query profile. + """ + service = self.cluster.impalads[coord_idx].service + return service.read_query_profile_page(query_id) diff --git a/tests/common/file_utils.py b/tests/common/file_utils.py index 45718c717..2bb17db53 100644 --- a/tests/common/file_utils.py +++ b/tests/common/file_utils.py @@ -28,6 +28,7 @@ from subprocess import check_call from tests.util.filesystem_utils import get_fs_path, WAREHOUSE_PREFIX from tests.util.iceberg_metadata_util import rewrite_metadata +from tests.util.retry import retry def create_iceberg_table_from_directory(impala_client, unique_database, table_name, @@ -199,3 +200,22 @@ def cleanup_tmp_test_dir(dir_path): """Remove temporary 'dir_path' and its content. Ignore errors upon deletion.""" shutil.rmtree(dir_path, ignore_errors=True) + + +def count_lines(file_path): + """Counts the number of lines in the file located at 'file_path'.""" + with open(file_path, 'rb') as file: + return sum(1 for _ in file.readlines()) + + +def wait_for_file_line_count(file_path, expected_line_count, max_attempts=3, + sleep_time_s=1, backoff=2): + """Waits until the given file contains the expected number of lines or until the + timeout is reached. Fails an assert if the timeout is reached before the expected + number of lines is found.""" + def assert_trace_file_lines(): + return count_lines(file_path) == expected_line_count + + assert retry(assert_trace_file_lines, max_attempts, sleep_time_s, backoff), \ + "File '{}' did not reach expected line count of '{}'. actual line count: '{}'" \ + .format(file_path, expected_line_count, count_lines(file_path)) diff --git a/tests/custom_cluster/test_otel_trace.py b/tests/custom_cluster/test_otel_trace.py new file mode 100644 index 000000000..0a948d7f1 --- /dev/null +++ b/tests/custom_cluster/test_otel_trace.py @@ -0,0 +1,581 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import absolute_import, division, print_function + +from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.file_utils import wait_for_file_line_count +from tests.common.impala_connection import ERROR, RUNNING, FINISHED +from tests.common.test_vector import PROTOCOL, HS2, BEESWAX, ImpalaTestDimension +from tests.util.otel_trace import parse_trace_file, ATTR_VAL_TYPE_STRING, \ + ATTR_VAL_TYPE_INT, ATTR_VAL_TYPE_BOOL +from tests.util.query_profile_util import parse_db_user, parse_session_id, parse_sql, \ + parse_query_type, parse_query_status, parse_impala_query_state, parse_query_id, \ + parse_retry_status, parse_original_query_id, parse_retried_query_id, \ + parse_num_rows_fetched, parse_admission_result +from tests.util.retry import retry + + +class TestOtelTrace(CustomClusterTestSuite): + """Tests that exercise OpenTelemetry tracing behavior.""" + + OUT_DIR = "out_dir" + TRACE_FILE = "export-trace.jsonl" + + @classmethod + def add_test_dimensions(cls): + super(TestOtelTrace, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension(PROTOCOL, HS2, BEESWAX)) + + def setup_method(self, method): + super(TestOtelTrace, self).setup_method(method) + + @CustomClusterTestSuite.with_args( + impalad_args="-v=2 --cluster_id=otel_trace --otel_trace_enabled=true " + "--otel_trace_exporter=file --otel_file_flush_interval_ms=500 " + "--otel_file_pattern={out_dir}/" + TRACE_FILE, + cluster_size=1, tmp_dir_placeholders=[OUT_DIR], disable_log_buffering=True) + def test_query_success(self, vector): + """Test that OpenTelemetry tracing is working by running a simple query and + checking that the trace file is created and contains spans.""" + query = "select count(*) from functional.alltypes" + result = self.execute_query_expect_success( + self.create_impala_client_from_vector(vector), query) + + self.__assert_trace(result.query_id, result.runtime_profile, "otel_trace") + + @CustomClusterTestSuite.with_args( + impalad_args="-v=2 --cluster_id=test_invalid_sql " + "--otel_trace_enabled=true --otel_trace_exporter=file " + "--otel_file_flush_interval_ms=500 " + "--otel_file_pattern={out_dir}/" + TRACE_FILE, + cluster_size=1, tmp_dir_placeholders=[OUT_DIR], disable_log_buffering=True) + def test_invalid_sql(self, vector): + query = "select * from functional.alltypes where field_does_not_exist=1" + self.execute_query_expect_failure(self.create_impala_client_from_vector(vector), + query) + + # Retrieve the query id and runtime profile from the UI since the query execute call + # only returns a HiveServer2Error object and not the query id or profile. + query_id, profile = self.query_id_from_ui(section="completed_queries", + match_query=query) + + self.__assert_trace( + query_id=query_id, + query_profile=profile, + cluster_id="test_invalid_sql", + trace_cnt=1, + err_span="Planning", + missing_spans=["AdmissionControl", "QueryExecution"]) + + @CustomClusterTestSuite.with_args( + impalad_args="-v=2 --cluster_id=test_retry_select_success " + "--otel_trace_enabled=true --otel_trace_exporter=file " + "--otel_file_flush_interval_ms=500 " + "--otel_file_pattern={out_dir}/" + TRACE_FILE, + cluster_size=3, num_exclusive_coordinators=1, tmp_dir_placeholders=[OUT_DIR], + disable_log_buffering=True, + statestored_args="-statestore_heartbeat_frequency_ms=60000") + def test_retry_select_success(self, vector): + query = "select count(*) from tpch_parquet.lineitem where l_orderkey < 50" + self.cluster.impalads[1].kill() + + result = self.execute_query_expect_success( + self.create_impala_client_from_vector(vector), query, + {"RETRY_FAILED_QUERIES": True}) + retried_query_id = parse_query_id(result.runtime_profile) + orig_query_profile = self.query_profile_from_ui(result.query_id) + + # Assert the trace from the original query. + self.__assert_trace( + query_id=result.query_id, + query_profile=orig_query_profile, + cluster_id="test_retry_select_success", + trace_cnt=2, + err_span="QueryExecution") + + # Assert the trace from the retried query. + self.__assert_trace( + query_id=retried_query_id, + query_profile=result.runtime_profile, + cluster_id="test_retry_select_success", + trace_cnt=2, + missing_spans=["Submitted", "Planning"]) + + @CustomClusterTestSuite.with_args( + impalad_args="-v=2 --cluster_id=test_retry_select_failed " + "--otel_trace_enabled=true --otel_trace_exporter=file " + "--otel_file_flush_interval_ms=500 " + "--otel_file_pattern={out_dir}/" + TRACE_FILE, + cluster_size=3, num_exclusive_coordinators=1, tmp_dir_placeholders=[OUT_DIR], + disable_log_buffering=True, + statestored_args="-statestore_heartbeat_frequency_ms=1000") + def test_retry_select_failed(self, vector): + # Shuffle heavy query. + query = "select * from tpch.lineitem t1, tpch.lineitem t2 where " \ + "t1.l_orderkey = t2.l_orderkey order by t1.l_orderkey, t2.l_orderkey limit 1" + + vector.set_exec_option("retry_failed_queries", "true") + client = self.create_impala_client_from_vector(vector) + + # Launch a query, it should be retried. + handle = self.execute_query_async_using_client(client, query, vector) + client.wait_for_impala_state(handle, RUNNING, 60) + query_id = client.handle_id(handle) + self.cluster.impalads[1].kill() + + # Wait until the retry is running. + def __wait_until_retried(): + return parse_retry_status(self.query_profile_from_ui(query_id)) == "RETRIED" + retry(__wait_until_retried, 60, 1, 1, False) + + # Kill another impalad so that another retry is attempted. + self.cluster.impalads[2].kill() + + # Wait until the query fails. + client.wait_for_impala_state(handle, ERROR, 60) + + retried_query_profile = client.get_runtime_profile(handle) + retried_query_id = parse_query_id(retried_query_profile) + orig_query_profile = self.query_profile_from_ui(query_id) + + client.close_query(handle) + + # Assert the trace from the original query. + self.__assert_trace( + query_id=query_id, + query_profile=orig_query_profile, + cluster_id="test_retry_select_failed", + trace_cnt=3, + err_span="QueryExecution") + + # Assert the trace from the retried query. + self.__assert_trace( + query_id=retried_query_id, + query_profile=retried_query_profile, + cluster_id="test_retry_select_failed", + trace_cnt=3, + err_span="QueryExecution", + missing_spans=["Submitted", "Planning"]) + + @CustomClusterTestSuite.with_args( + impalad_args="-v=2 --cluster_id=test_select_queued " + "--otel_trace_enabled=true --otel_trace_exporter=file " + "--otel_file_flush_interval_ms=500 " + "--otel_file_pattern={out_dir}/" + TRACE_FILE + " " + "--default_pool_max_requests=1", + cluster_size=1, tmp_dir_placeholders=[OUT_DIR], disable_log_buffering=True) + def test_select_queued(self, vector): + # Launch two queries, the second will be queued until the first completes. + client = self.create_impala_client_from_vector(vector) + + query = "select * from functional.alltypes where id = 1" + handle1 = self.execute_query_async_using_client(client, + "{} and int_col = sleep(5000)".format(query), vector) + client.wait_for_impala_state(handle1, RUNNING, 60) + query_id_1 = client.handle_id(handle1) + + handle2 = self.execute_query_async_using_client(client, query, vector) + query_id_2 = client.handle_id(handle2) + + client.wait_for_impala_state(handle1, FINISHED, 60) + query_profile_1 = client.get_runtime_profile(handle1) + client.close_query(handle1) + + client.wait_for_impala_state(handle2, FINISHED, 60) + query_profile_2 = client.get_runtime_profile(handle2) + + client.close_query(handle2) + + self.__assert_trace( + query_id=query_id_1, + query_profile=query_profile_1, + cluster_id="test_select_queued", + trace_cnt=3) + + self.__assert_trace( + query_id=query_id_2, + query_profile=query_profile_2, + cluster_id="test_select_queued", + trace_cnt=3) + + ###################### + # Helper functions. + ###################### + def __assert_trace(self, query_id, query_profile, cluster_id, trace_cnt=1, err_span="", + missing_spans=[]): + # Parse common values needed in multiple asserts. + session_id = parse_session_id(query_profile) + db_user = parse_db_user(query_profile) + + # Wait until all spans are written to the trace file. + trace_file_path = "{}/{}".format(self.get_tmp_dir(self.OUT_DIR), self.TRACE_FILE) + wait_for_file_line_count( + file_path=trace_file_path, + expected_line_count=trace_cnt, + max_attempts=60, + sleep_time_s=1, + backoff=1) + + # Remove missing spans from the expected span count. + expected_span_count = 6 - len(missing_spans) + + # Parse the trace json files to get the trace for the query. + trace = parse_trace_file(trace_file_path, query_id) + self.__assert_trace_common(trace, expected_span_count) + + # Retrieve the query status which contains error messages if the query failed. + query_status = parse_query_status(query_profile) + query_status = "" if query_status == "OK" else query_status + + impala_query_state = parse_retry_status(query_profile) + if impala_query_state is None: + impala_query_state = parse_impala_query_state(query_profile) + + # Determine if the query was retried and if so, get the original query id. + original_query_id = parse_original_query_id(query_profile) + original_query_id = "" if original_query_id is None else original_query_id + + # Determine if the query initially failed but has a successful retry under a different + # query id. If so, get the retried query id. + retried_query_id = parse_retried_query_id(query_profile) + retried_query_id = "" if retried_query_id is None else retried_query_id + + # Error message should follow on all spans after the errored span + in_error = False + + # Assert root span. + root_span_id = self.__assert_rootspan_attrs(trace.root_span, query_id, session_id, + cluster_id, db_user, "default-pool", impala_query_state, query_status, + original_query_id, retried_query_id) + + # Assert Init span. + if "Init" not in missing_spans: + span_err_msg = "" + if err_span == "Init": + span_err_msg = query_status + in_error = True + self.__assert_initspan_attrs(trace.child_spans, root_span_id, query_id, session_id, + cluster_id, db_user, "default-pool", "default", parse_sql(query_profile), + original_query_id) + + # Assert Submitted span. + if "Submitted" not in missing_spans: + span_err_msg = "" + if err_span == "Submitted" or in_error: + span_err_msg = query_status + in_error = True + self.__assert_submittedspan_attrs(trace.child_spans, root_span_id, query_id) + + # Assert Planning span. + if "Planning" not in missing_spans: + span_err_msg = "" + if err_span == "Planning" or in_error: + span_err_msg = query_status + in_error = True + query_type = parse_query_type(query_profile) + if query_type == "N/A": + query_type = "UNKNOWN" + self.__assert_planningspan_attrs(trace.child_spans, root_span_id, query_id, + query_type, span_err_msg) + + # Assert AdmissionControl span. + if "AdmissionControl" not in missing_spans: + self.__assert_admissioncontrol_attrs(trace.child_spans, root_span_id, query_id, + "default-pool", parse_admission_result(query_profile)) + + # Assert QueryExecution span. + if "QueryExecution" not in missing_spans: + span_err_msg = "" + if err_span == "QueryExecution" or in_error: + span_err_msg = query_status + in_error = True + self.__assert_query_exec_attrs(trace.child_spans, query_profile, root_span_id, + query_id, span_err_msg, parse_impala_query_state(query_profile)) + + # Assert Close span. + if "Close" not in missing_spans: + span_err_msg = "" + if err_span == "Close" or in_error: + span_err_msg = query_status + in_error = True + self.__assert_close_attrs(trace.child_spans, root_span_id, query_id, span_err_msg, + parse_impala_query_state(query_profile)) + + def __assert_trace_common(self, trace, expected_child_spans_count): + """ + Asserts common structure/fields in resource spans and scope spans of the + OpenTelemetry trace JSON object. + """ + + # Assert the number of child spans in the trace. + assert len(trace.child_spans) == expected_child_spans_count, \ + "Trace '{}' expected child spans count: {}, actual: {}".format(trace.trace_id, + expected_child_spans_count, len(trace.child_spans)) + + # Each scope span has a scope object which contains the name and version of the + # OpenTelemetry scope. Assert the scope object sttructure and contents contained + # within the single span at the path resourceSpan[0].scopeSpans[0].scope. + assert trace.root_span.scope_name == "org.apache.impala.impalad.query", \ + "Span: '{}' expected: 'org.apache.impala.impalad.query', actual: {}" \ + .format(trace.root_span.span_id, trace.root_span.scope_name) + assert trace.root_span.scope_version == "1.0.0", "Span: '{}' expected scope " \ + "version '1.0.0', actual: '{}'".format("Root", trace.root_span.scope_version) + + # Assert the scope of each child span. + for span in trace.child_spans: + assert span.scope_name == "org.apache.impala.impalad.query", \ + "Span: '{}' expected scope name: 'org.apache.impala.impalad.query', " \ + "actual: {}".format(span.name, span.scope_name) + assert span.scope_version == "1.0.0", "Span: '{}' expected scope " \ + "version '1.0.0', actual: '{}'".format(span.name, span.scope_version) + + def __assert_scopespan_common(self, span, query_id, is_root, name, attributes_count, + status, root_span_id=None, err_msg=""): + """ + Helper function to assert common data points of a single scope span. These spans + contain the actual root and child spans. Assertions include the span object's + structure, span properties, and common span attributes. + - span: The OtelSpan object to assert. + - query_id: The query id of the span. + - is_root: Whether the span is a root span. + - name: The name of the span to assert without the query_id prefix. + - attributes_count: The expected number of attributes unique to the span. If + asserting a child span, adds 7 to this value to account for + attributes common across all child spans. + - status: The expected status of the span. Only used for child spans. + - root_span_id: The root span id of the span. + """ + + # Read the span trace id and span id from the Impalad logs. + expected_span_id, expected_trace_id = self.__find_span_log(name, query_id) + + # Assert span properties. + expected_name = query_id + actual_kind = span.kind + + if (is_root): + assert span.parent_span_id is None, "Found parentSpanId on root span" + assert actual_kind == 2, "Span '{}' expected kind: '{}', actual: '{}'" \ + .format(expected_name, 2, actual_kind) + else: + expected_name += " - {}".format(name) + + assert root_span_id is not None + actual = span.parent_span_id + assert actual == root_span_id, "Span '{}' expected parentSpanId: '{}', actual: " \ + "'{}'".format(expected_name, root_span_id, actual) + + assert actual_kind == 1, "Span '{}' expected kind: '{}', actual: '{}'" \ + .format(expected_name, 1, actual) + + actual = span.name + assert actual == expected_name, "Expected span name: '{}', actual: '{}'" \ + .format(expected_name, actual) + + actual = span.trace_id + assert actual == expected_trace_id, "Span '{}' expected traceId: '{}', " \ + "actual: '{}'".format(expected_name, expected_trace_id, actual) + + actual = span.span_id + assert actual == expected_span_id, "Span '{}' expected spanId: '{}', " \ + "actual: '{}'".format(expected_name, expected_span_id, actual) + + # Flags must always be 1 which indicates the trace is to be sampled. + expected_flags = 1 + actual = span.flags + assert actual == expected_flags, "Span '{}' expected flags: '{}', " \ + "actual: '{}'".format(expected_name, expected_flags, actual) + + # Assert span attributes. + expected_span_attrs_count = attributes_count if is_root else 7 + attributes_count + assert len(span.attributes) == expected_span_attrs_count, "Span '{}' attributes " \ + "must contain exactly {} elements, actual: {}".format(expected_name, + expected_span_attrs_count, len(span.attributes)) + + if (is_root): + self.__assert_attr(expected_name, span.attributes, "ErrorMessage", err_msg) + else: + self.__assert_attr(expected_name, span.attributes, "ErrorMsg", err_msg) + self.__assert_attr(expected_name, span.attributes, "Name", expected_name) + self.__assert_attr(expected_name, span.attributes, "Running", + name == "QueryExecution", "boolValue") + self.__assert_attr(expected_name, span.attributes, "Status", status) + + def __find_span_log(self, span_name, query_id): + """ + Finds the start span log entry for the given span name and query id in the Impalad + logs. This log line contains the trace id and span id for the span which are used + as the expected values when asserting the span properties in the trace file. + """ + span_regex = r'Started \'{}\' span trace_id="(.*?)" span_id="(.*?)" query_id="{}"' \ + .format(span_name, query_id) + span_log = self.assert_impalad_log_contains("INFO", span_regex) + trace_id = span_log.group(1) + span_id = span_log.group(2) + + return span_id, trace_id + + def __assert_attr(self, span_name, attributes, expected_key, expected_value, + expected_type="stringValue"): + """ + Helper function to assert that a specific OpenTelemetry attribute exists in a span. + """ + + assert expected_type in ("stringValue", "boolValue", "intValue"), "Invalid " \ + "expected_type '{}', must be one of 'stringValue', 'boolValue', or 'intValue'" \ + .format(expected_type) + + val = attributes[expected_key] + assert val is not None, "Span '{}' attribute not found: '{}', actual attributes: {}" \ + .format(span_name, expected_key, attributes) + assert val.value == expected_value, "Span '{}' attribute '{}' expected: '{}', " \ + "actual: '{}'".format(span_name, expected_key, expected_value, val.value) + + if expected_type == "boolValue": + expected_type = ATTR_VAL_TYPE_BOOL + elif expected_type == "intValue": + expected_type = ATTR_VAL_TYPE_INT + else: + expected_type = ATTR_VAL_TYPE_STRING + + assert val.get_type() == expected_type, "Span '{}' attribute '{}' expected to be " \ + "of type '{}', actual: '{}'".format(span_name, expected_key, expected_type, + val.get_type()) + + def __assert_rootspan_attrs(self, span, query_id, session_id, cluster_id, user_name, + request_pool, state, err_msg, original_query_id, retried_query_id): + """ + Helper function that asserts the common attributes in the root span. + """ + + root_span_id, _ = self.__find_span_log("Root", query_id) + self.__assert_scopespan_common(span, query_id, True, "Root", 13, "", None, err_msg) + + self.__assert_attr(span.name, span.attributes, "QueryId", query_id) + self.__assert_attr(span.name, span.attributes, "SessionId", session_id) + self.__assert_attr(span.name, span.attributes, "ClusterId", cluster_id) + self.__assert_attr(span.name, span.attributes, "UserName", user_name) + self.__assert_attr(span.name, span.attributes, "RequestPool", request_pool) + self.__assert_attr(span.name, span.attributes, "State", state) + self.__assert_attr(span.name, span.attributes, "OriginalQueryId", original_query_id) + self.__assert_attr(span.name, span.attributes, "RetriedQueryId", retried_query_id) + + return root_span_id + + def __assert_initspan_attrs(self, spans, root_span_id, query_id, session_id, cluster_id, + user_name, request_pool, default_db, query_string, original_query_id): + """ + Helper function that asserts the common and span-specific attributes in the + init span. + """ + + # Locate the init span and assert. + init_span = self.__find_span(spans, "Init", query_id) + + self.__assert_scopespan_common(init_span, query_id, False, "Init", 8, "INITIALIZED", + root_span_id) + + self.__assert_attr(init_span.name, init_span.attributes, "QueryId", query_id) + self.__assert_attr(init_span.name, init_span.attributes, "SessionId", session_id) + self.__assert_attr(init_span.name, init_span.attributes, "ClusterId", cluster_id) + self.__assert_attr(init_span.name, init_span.attributes, "UserName", user_name) + self.__assert_attr(init_span.name, init_span.attributes, "RequestPool", request_pool) + self.__assert_attr(init_span.name, init_span.attributes, "DefaultDb", default_db) + self.__assert_attr(init_span.name, init_span.attributes, "QueryString", query_string) + self.__assert_attr(init_span.name, init_span.attributes, "OriginalQueryId", + original_query_id) + + def __assert_submittedspan_attrs(self, spans, root_span_id, query_id): + """ + Helper function that asserts the common attributes in the submitted span. + """ + + submitted_span = self.__find_span(spans, "Submitted", query_id) + self.__assert_scopespan_common(submitted_span, query_id, False, "Submitted", 0, + "INITIALIZED", root_span_id) + + def __assert_planningspan_attrs(self, spans, root_span_id, query_id, query_type, + err_msg="", status="INITIALIZED"): + """ + Helper function that asserts the common and span-specific attributes in the + planning execution span. + """ + + planning_span = self.__find_span(spans, "Planning", query_id) + self.__assert_scopespan_common(planning_span, query_id, False, "Planning", 1, + status, root_span_id, err_msg) + self.__assert_attr(planning_span.name, planning_span.attributes, "QueryType", + query_type) + + def __assert_admissioncontrol_attrs(self, spans, root_span_id, query_id, request_pool, + adm_result, err_msg="", status="PENDING"): + """ + Helper function that asserts the common and span-specific attributes in the + admission control span. + """ + + queued = False if adm_result == "Admitted immediately" else True + + adm_ctrl_span = self.__find_span(spans, "AdmissionControl", query_id) + self.__assert_scopespan_common(adm_ctrl_span, query_id, False, "AdmissionControl", 3, + status, root_span_id, err_msg) + self.__assert_attr(adm_ctrl_span.name, adm_ctrl_span.attributes, "Queued", + queued, "boolValue") + self.__assert_attr(adm_ctrl_span.name, adm_ctrl_span.attributes, "AdmissionResult", + adm_result) + self.__assert_attr(adm_ctrl_span.name, adm_ctrl_span.attributes, "RequestPool", + request_pool) + + def __assert_query_exec_attrs(self, spans, query_profile, root_span_id, query_id, + err_msg, status): + """ + Helper function that asserts the common and span-specific attributes in the + query execution span. + """ + + query_exec_span = self.__find_span(spans, "QueryExecution", query_id) + self.__assert_scopespan_common(query_exec_span, query_id, False, "QueryExecution", 3, + status, root_span_id, err_msg) + self.__assert_attr(query_exec_span.name, query_exec_span.attributes, "NumDeletedRows", + 0, "intValue") + self.__assert_attr(query_exec_span.name, query_exec_span.attributes, + "NumModifiedRows", 0, "intValue") + self.__assert_attr(query_exec_span.name, query_exec_span.attributes, "NumRowsFetched", + parse_num_rows_fetched(query_profile), "intValue") + + def __assert_close_attrs(self, spans, root_span_id, query_id, + err_msg="", status=FINISHED): + """ + Helper function that asserts the common and span-specific attributes in the + close span. + """ + + close_span = self.__find_span(spans, "Close", query_id) + self.__assert_scopespan_common(close_span, query_id, False, "Close", 0, status, + root_span_id, err_msg) + + def __find_span(self, spans, name, query_id): + """ + Helper function to find a span by name in a list of OtelSpan objects. + """ + + for s in spans: + if s.name.endswith(name): + return s + + assert False, "Span '{}' not found for query '{}'".format(name, query_id) diff --git a/tests/util/otel_trace.py b/tests/util/otel_trace.py new file mode 100644 index 000000000..234722953 --- /dev/null +++ b/tests/util/otel_trace.py @@ -0,0 +1,278 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import absolute_import, division, print_function + +import json +import os +import sys + +from tests.common.environ import IMPALA_LOCAL_BUILD_VERSION + +# Valid types of OpenTelemetry attribute values. +ATTR_VAL_TYPE_STRING = "string" +ATTR_VAL_TYPE_INT = "int" +ATTR_VAL_TYPE_BOOL = "bool" + + +class AttributeValue: + """ + Represents a value of an OpenTelemetry attribute. The key is not stored here, only + the attribute value. Additionally, the class function get_type() returns the attribute + value's type. + """ + def __init__(self, attr): + assert attr["value"] is not None, "Attribute missing value: {}".format(attr) + + v = attr["value"] + if "stringValue" in v: + self.value = v["stringValue"] + elif "intValue" in v: + self.value = int(v["intValue"]) + elif "boolValue" in v: + self.value = v["boolValue"] + else: + raise Exception("Unsupported attribute value type: %s" % str(v)) + + def get_type(self): + if isinstance(self.value, bool): + return ATTR_VAL_TYPE_BOOL + elif isinstance(self.value, int): + return ATTR_VAL_TYPE_INT + else: + return ATTR_VAL_TYPE_STRING + + def __str__(self): + """ + Returns a string representation of the AttributeValue object. + This method is called when print() is used on an instance of this class. + """ + return "AttributeValue(type='{}', value='{}')".format( + self.get_type(), str(self.value)) + + +class OtelTrace(): + """ + Represents a single OpenTelemetry trace, which consists of a root span and zero or + more child spans. Spans are represented by the OtelSpan class. Child spans cannot be + parent spans to other child spans in this representation. + + Attributes: + trace_id: The trace ID of this trace. + root_span: The root span of this trace (an OtelSpan object). + child_spans: A list of child spans (OtelSpan objects) that belong to this trace. + """ + def __init__(self, trace_id): + self.trace_id = trace_id + self.root_span = None + self.child_spans = [] + + def __str__(self): + """ + Returns a string representation of the OtelTrace object. + This method is called when print() is used on an instance of this class. + """ + s = "OtelTrace(trace_id='{}', root_span={}, child_spans=".format( + self.trace_id, self.root_span) + if len(self.child_spans) == 0: + s += "[])" + else: + for cs in self.child_spans: + s += " {},\n".format(cs) + s += "])" + + return s + + +class OtelSpan: + """ + Represents a single OpenTelemetry span. + + Attributes: + scope_name: The name of the scope that produced this span. + scope_version: The version of the scope that produced this span. + attributes: A dictionary of attribute key to AttributeValue object. + start_time: The start time of the span in nanoseconds since epoch. + end_time: The end time of the span in nanoseconds since epoch. + flags: The OpenTelemetry trace flags of the span (represented as an integer). + kind: The OpenTelemetry kind of the span (integer). + name: The OpenTelemetry name of the span. + parent_span_id: The span ID of the parent span, or None if this is a root span. + span_id: The span ID of this span. + trace_id: The trace ID of this span. + query_id: The query ID associated with this span. This value is extracted from the + QueryId attribute. Until an attribute with that key is added via the + add_attribute() method, this value is an empty string. + """ + def __init__(self): + self.scope_name = "" + self.scope_version = "" + self.attributes = {} + self.start_time = 0 + self.end_time = 0 + self.flags = -1 + self.kind = -1 + self.name = "" + self.parent_span_id = None + self.span_id = "" + self.trace_id = "" + self.query_id = "" + + def is_root(self): + return self.parent_span_id is None + + def add_attribute(self, key, value): + self.attributes[key] = value + if key == "QueryId": + self.query_id = value.value + + def __str__(self): + """ + Returns a string representation of the OtelSpan object. + This method is called when print() is used on an instance of this class. + """ + s = "OtelSpan(name='{}', span_id='{}', trace_id='{}', parent_span_id='{}', " \ + "start_time={}, end_time={}, kind={}, flags={}, scope_name='{}', " \ + "scope_version='{}', query_id='{}', attributes={{".format( + self.name, self.span_id, self.trace_id, self.parent_span_id, + self.start_time, self.end_time, self.kind, self.flags, + self.scope_name, self.scope_version, self.query_id) + for k in self.attributes: + s += "\n '{}': {},".format(k, self.attributes[k]) + s += "\n })" + + return s + + +def __parse_attr(attr): + """Internal helper to parse a single attribute from the json object representing it.""" + assert attr["key"] is not None, "Attribute missing key: {}".format(attr) + return attr["key"], AttributeValue(attr) + + +def __parse_line(line): + """Internal helper to parse a single line of the trace file, which is expected to be + a json object representing one or more resource spans. Returns a list of OtelSpan + objects parsed from the line. + """ + obj = json.loads(line.strip()) + assert obj is not None, "Failed to parse line in json:\n{}".format(line) + + parsed_spans = [] + res_idx = -1 + scope_idx = -1 + span_idx = -1 + attr_idx = -1 + + try: + resource_spans = obj["resourceSpans"] + + # Expected resource span attribute keys/values. + expected_resource_attrs = { + "service.name": "Impala", + "service.version": IMPALA_LOCAL_BUILD_VERSION, + "telemetry.sdk.version": os.environ.get("IMPALA_OPENTELEMETRY_CPP_VERSION"), + "telemetry.sdk.name": "opentelemetry", + "telemetry.sdk.language": "cpp"} + + # loop through each resource span + for res_idx, res_span in enumerate(resource_spans): + # Assert resource attributes. + for attr in res_span["resource"]["attributes"]: + k, v = __parse_attr(attr) + expected_value = expected_resource_attrs.get(k) + assert expected_value is not None, "Unexpected resource attribute key: '{}'" \ + .format(k) + assert v.value == expected_value, "Unexpected value '{}' for resource " \ + "attribute '{}', expected '{}'".format(v.value, k, expected_value) + + # Parse each scope span. + scope_spans = res_span["scopeSpans"] + for scope_idx, scope_span in enumerate(scope_spans): + scope_name = scope_span["scope"]["name"] + scope_version = scope_span["scope"]["version"] + + # Parse each span. + for span_idx, span in enumerate(scope_span["spans"]): + s = OtelSpan() + s.scope_name = scope_name + s.scope_version = scope_version + s.start_time = int(span["startTimeUnixNano"]) + s.end_time = int(span["endTimeUnixNano"]) + s.name = span["name"] + s.flags = int(span["flags"]) + s.kind = int(span["kind"]) + s.span_id = span["spanId"] + s.trace_id = span["traceId"] + if "parentSpanId" in span: + s.parent_span_id = span["parentSpanId"] + + # Parse each span attribute list. + for attr_idx, attr in enumerate(span["attributes"]): + key, value = __parse_attr(attr) + s.add_attribute(key, value) + + parsed_spans.append(s) + except Exception as e: + sys.stderr.write("Failed to parse json:\n{}".format(line)) + sys.stderr.write("Resource Span Index: {}\n".format(res_idx)) + sys.stderr.write("Scope Span Index: {}\n".format(scope_idx)) + sys.stderr.write("Span Index: {}\n".format(span_idx)) + sys.stderr.write("Attribute Index: {}\n".format(attr_idx)) + sys.stderr.flush() + raise e + + return parsed_spans + + +def parse_trace_file(file_path, query_id): + """ + Parses the OpenTelemetry trace file located at 'file_path' and returns the OtelTrace + object for the trace that contains the given 'query_id'. Fails an assertion if no + trace with the given query ID is found, or if the trace does not have a root span. + """ + traces_by_trace_id = {} + traces_by_query_id = {} + parsed_spans = [] + + with open(file_path, "r") as f: + lines = f.readlines() + for line in lines: + parsed_spans.extend(__parse_line(line)) + + # Build a map of query_id -> OtelTrace for easy lookup. + # First, locate all root spans + for s in parsed_spans: + if s.trace_id not in traces_by_trace_id: + traces_by_trace_id[s.trace_id] = OtelTrace(s.trace_id) + + if s.is_root(): + traces_by_trace_id[s.trace_id].root_span = s + traces_by_query_id[s.query_id] = traces_by_trace_id[s.trace_id] + else: + traces_by_trace_id[s.trace_id].child_spans.append(s) + + assert len(traces_by_query_id) > 0, "No root span(s) in the file: {}".format(file_path) + assert query_id in traces_by_query_id, "Could not find trace for query: {}" \ + .format(query_id) + + query_trace = traces_by_query_id[query_id] + assert query_trace is not None, "Trace was None for query: {}".format(query_id) + assert query_trace.root_span is not None, "Trace for query '{}' has no root span" \ + .format(query_id) + + return query_trace diff --git a/tests/util/query_profile_util.py b/tests/util/query_profile_util.py new file mode 100644 index 000000000..04e76e3d3 --- /dev/null +++ b/tests/util/query_profile_util.py @@ -0,0 +1,122 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import absolute_import, division, print_function + +import re + + +def parse_db_user(profile_text): + user = re.search(r'\n\s+User:\s+(.*?)\n', profile_text) + assert user is not None, "User not found in query profile" + return user.group(1) + + +def parse_session_id(profile_text): + """Parses the session id from the query profile text.""" + match = re.search(r'\n\s+Session ID:\s+(.*)\n', profile_text) + assert match is not None, "Session ID not found in query profile" + return match.group(1) + + +def parse_sql(profile_text): + """Parses the SQL statement from the query profile text.""" + sql_stmt = re.search(r'\n\s+Sql Statement:\s+(.*?)\n', profile_text) + assert sql_stmt is not None + return sql_stmt.group(1) + + +def parse_query_type(profile_text): + """Parses the query type from the query profile text.""" + query_type = re.search(r'\n\s+Query Type:\s+(.*?)\n', profile_text) + assert query_type is not None + return query_type.group(1) + + +def parse_query_state(profile_text): + """Parses the query state from the query profile text.""" + query_state = re.search(r'\n\s+Query State:\s+(.*?)\n', profile_text) + assert query_state is not None + return query_state.group(1) + + +def parse_impala_query_state(profile_text): + """Parses the Impala query state from the query profile text. """ + impala_query_state = re.search(r'\n\s+Impala Query State:\s+(.*?)\n', profile_text) + assert impala_query_state is not None + return impala_query_state.group(1) + + +def parse_query_status(profile_text): + """Parses the query status from the query profile text. Status can be multiple lines if + the query errored.""" + # Query Status (can be multiple lines if the query errored) + query_status = re.search(r'\n\s+Query Status:\s+(.*?)\n\s+Impala Version', profile_text, + re.DOTALL) + assert query_status is not None + return query_status.group(1) + + +def parse_query_id(profile_text): + """Parses the query id from the query profile text.""" + query_id = re.search(r'Query\s+\(id=(.*?)\):', profile_text) + assert query_id is not None + return query_id.group(1) + + +def parse_retry_status(profile_text): + """Parses the retry status from the query profile text. Returns None if the query was + not retried.""" + retry_status = re.search(r'\n\s+Retry Status:\s+(.*?)\n', profile_text) + if retry_status is None: + return None + + return retry_status.group(1) + + +def parse_original_query_id(profile_text): + """Parses the original query id from the query profile text. Returns None if the + original query id is not present in the profile text.""" + original_query_id = re.search(r'\n\s+Original Query Id:\s+(.*?)\n', profile_text) + if original_query_id is None: + return None + + return original_query_id.group(1) + + +def parse_retried_query_id(profile_text): + """Parses the retried query id from the query profile text. Returns None if the + retried query id is not present in the profile text.""" + retried_query_id = re.search(r'\n\s+Retried Query Id:\s+(.*?)\n', profile_text) + if retried_query_id is None: + return None + + return retried_query_id.group(1) + + +def parse_num_rows_fetched(profile_text): + """Parses the number of rows fetched from the query profile text.""" + num_rows_fetched = re.search(r'\n\s+\-\sNumRowsFetched:\s+(\d+)', profile_text) + assert num_rows_fetched is not None, "Number of Rows Fetched not found in query profile" + return int(num_rows_fetched.group(1)) + + +def parse_admission_result(profile_text): + """Parses the admission result from the query profile text.""" + admission_result = re.search(r'\n\s+Admission result:\s+(.*?)\n', profile_text) + assert admission_result is not None, "Admission Result not found in query profile" + return admission_result.group(1)