lidavidm commented on code in PR #11920: URL: https://github.com/apache/arrow/pull/11920#discussion_r887299997
########## cpp/src/arrow/flight/flight_test.cc: ########## @@ -1534,5 +1556,136 @@ TEST_F(TestCancel, DoExchange) { ARROW_UNUSED(do_exchange_result.writer->Close()); } +class TracingTestServer : public FlightServerBase { + public: + Status DoAction(const ServerCallContext& call_context, const Action&, + std::unique_ptr<ResultStream>* result) override { + std::vector<Result> results; + auto* middleware = + reinterpret_cast<TracingServerMiddleware*>(call_context.GetMiddleware("tracing")); + if (!middleware) return Status::Invalid("Could not find middleware"); +#ifdef ARROW_WITH_OPENTELEMETRY + EXPECT_GT(middleware->GetTraceContext().size(), 0); + auto span = arrow::internal::tracing::GetTracer()->GetCurrentSpan(); + const auto context = span->GetContext(); + { + const auto& span_id = context.span_id(); + ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateBuffer(span_id.Id().size())); + std::memcpy(buffer->mutable_data(), span_id.Id().data(), span_id.Id().size()); + results.emplace_back(std::move(buffer)); + } + { + const auto& trace_id = context.trace_id(); + ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateBuffer(trace_id.Id().size())); + std::memcpy(buffer->mutable_data(), trace_id.Id().data(), trace_id.Id().size()); + results.emplace_back(std::move(buffer)); + } +#else + EXPECT_EQ(middleware->GetTraceContext().size(), 0); +#endif + *result = arrow::internal::make_unique<SimpleResultStream>(std::move(results)); + return Status::OK(); + } +}; + +class TestTracing : public ::testing::Test { + public: + void SetUp() { +#ifdef ARROW_WITH_OPENTELEMETRY + // The default tracer always generates no-op spans which have no + // span/trace ID. Set up a different tracer. Note, this needs to + // be run before Arrow uses OTel as GetTracer() gets a tracer once + // and keeps it in a static. + std::vector<std::unique_ptr<opentelemetry::sdk::trace::SpanProcessor>> processors; + auto provider = + opentelemetry::nostd::shared_ptr<opentelemetry::sdk::trace::TracerProvider>( + new opentelemetry::sdk::trace::TracerProvider(std::move(processors))); + opentelemetry::trace::Provider::SetTracerProvider(std::move(provider)); + + opentelemetry::context::propagation::GlobalTextMapPropagator::SetGlobalPropagator( + opentelemetry::nostd::shared_ptr< + opentelemetry::context::propagation::TextMapPropagator>( + new opentelemetry::trace::propagation::HttpTraceContext())); Review Comment: Yes, the application would do this as part of initializing OpenTelemetry, though generally the SDKs provide conveniences to configure this. ########## cpp/src/arrow/flight/server_tracing_middleware.cc: ########## @@ -0,0 +1,182 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/flight/server_tracing_middleware.h" + +#include <string> +#include <utility> +#include <vector> + +#include "arrow/flight/transport/grpc/util_internal.h" +#include "arrow/util/make_unique.h" +#include "arrow/util/tracing_internal.h" + +#ifdef ARROW_WITH_OPENTELEMETRY +#include <opentelemetry/context/propagation/global_propagator.h> +#include <opentelemetry/context/propagation/text_map_propagator.h> +#include <opentelemetry/trace/context.h> +#include <opentelemetry/trace/experimental_semantic_conventions.h> +#include <opentelemetry/trace/propagation/http_trace_context.h> +#endif + +namespace arrow { +namespace flight { + +#ifdef ARROW_WITH_OPENTELEMETRY +namespace otel = opentelemetry; +namespace { +class FlightServerCarrier : public otel::context::propagation::TextMapCarrier { + public: + explicit FlightServerCarrier(const CallHeaders& incoming_headers) + : incoming_headers_(incoming_headers) {} + + otel::nostd::string_view Get(otel::nostd::string_view key) const noexcept override { + util::string_view arrow_key(key.data(), key.size()); + auto it = incoming_headers_.find(arrow_key); + if (it == incoming_headers_.end()) return ""; + util::string_view result = it->second; + return {result.data(), result.size()}; + } + + void Set(otel::nostd::string_view, otel::nostd::string_view) noexcept override {} + + const CallHeaders& incoming_headers_; +}; +class KeyValueCarrier : public otel::context::propagation::TextMapCarrier { + public: + explicit KeyValueCarrier(std::vector<TracingServerMiddleware::TraceKey>* items) + : items_(items) {} + otel::nostd::string_view Get(otel::nostd::string_view key) const noexcept override { + return {}; + } + void Set(otel::nostd::string_view key, + otel::nostd::string_view value) noexcept override { + items_->emplace_back(std::string(key), std::string(value)); + } + + private: + std::vector<TracingServerMiddleware::TraceKey>* items_; +}; +} // namespace + +class TracingServerMiddleware::Impl { + public: + Impl(otel::trace::Scope scope, otel::nostd::shared_ptr<otel::trace::Span> span) + : scope_(std::move(scope)), span_(std::move(span)) {} + void CallCompleted(const Status& status) { + if (!status.ok()) { + auto grpc_status = transport::grpc::ToGrpcStatus(status, /*ctx=*/nullptr); + span_->SetStatus(otel::trace::StatusCode::kError, status.ToString()); + span_->SetAttribute(OTEL_GET_TRACE_ATTR(AttrRpcGrpcStatusCode), + static_cast<int32_t>(grpc_status.error_code())); + } else { + span_->SetStatus(otel::trace::StatusCode::kOk, ""); + span_->SetAttribute(OTEL_GET_TRACE_ATTR(AttrRpcGrpcStatusCode), int32_t(0)); + } Review Comment: It'll end when the Span goes out of scope, but I'll just manually end it here to be explicit. ########## python/pyarrow/tests/test_flight.py: ########## @@ -2177,3 +2177,32 @@ def test_interpreter_shutdown(): See https://issues.apache.org/jira/browse/ARROW-16597. """ util.invoke_script("arrow_16597.py") + + +class TracingFlightServer(FlightServerBase): + """A server that echoes back trace context values.""" + + def do_action(self, context, action): + trace_context = context.get_middleware("tracing").trace_context + # Don't turn this method into a generator since then it'll be + # lazily evaluated, and the trace context will be lost Review Comment: Evaluating `.trace_context` is side-effectful and depends on implicit state maintained by OpenTelemetry, so if this is a generator it'll be evaluated after OpenTelemetry has already cleaned up the state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org