lidavidm commented on code in PR #11920:
URL: https://github.com/apache/arrow/pull/11920#discussion_r887299997


##########
cpp/src/arrow/flight/flight_test.cc:
##########
@@ -1534,5 +1556,136 @@ TEST_F(TestCancel, DoExchange) {
   ARROW_UNUSED(do_exchange_result.writer->Close());
 }
 
+class TracingTestServer : public FlightServerBase {
+ public:
+  Status DoAction(const ServerCallContext& call_context, const Action&,
+                  std::unique_ptr<ResultStream>* result) override {
+    std::vector<Result> results;
+    auto* middleware =
+        
reinterpret_cast<TracingServerMiddleware*>(call_context.GetMiddleware("tracing"));
+    if (!middleware) return Status::Invalid("Could not find middleware");
+#ifdef ARROW_WITH_OPENTELEMETRY
+    EXPECT_GT(middleware->GetTraceContext().size(), 0);
+    auto span = arrow::internal::tracing::GetTracer()->GetCurrentSpan();
+    const auto context = span->GetContext();
+    {
+      const auto& span_id = context.span_id();
+      ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateBuffer(span_id.Id().size()));
+      std::memcpy(buffer->mutable_data(), span_id.Id().data(), 
span_id.Id().size());
+      results.emplace_back(std::move(buffer));
+    }
+    {
+      const auto& trace_id = context.trace_id();
+      ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateBuffer(trace_id.Id().size()));
+      std::memcpy(buffer->mutable_data(), trace_id.Id().data(), 
trace_id.Id().size());
+      results.emplace_back(std::move(buffer));
+    }
+#else
+    EXPECT_EQ(middleware->GetTraceContext().size(), 0);
+#endif
+    *result = 
arrow::internal::make_unique<SimpleResultStream>(std::move(results));
+    return Status::OK();
+  }
+};
+
+class TestTracing : public ::testing::Test {
+ public:
+  void SetUp() {
+#ifdef ARROW_WITH_OPENTELEMETRY
+    // The default tracer always generates no-op spans which have no
+    // span/trace ID. Set up a different tracer. Note, this needs to
+    // be run before Arrow uses OTel as GetTracer() gets a tracer once
+    // and keeps it in a static.
+    std::vector<std::unique_ptr<opentelemetry::sdk::trace::SpanProcessor>> 
processors;
+    auto provider =
+        
opentelemetry::nostd::shared_ptr<opentelemetry::sdk::trace::TracerProvider>(
+            new 
opentelemetry::sdk::trace::TracerProvider(std::move(processors)));
+    opentelemetry::trace::Provider::SetTracerProvider(std::move(provider));
+
+    
opentelemetry::context::propagation::GlobalTextMapPropagator::SetGlobalPropagator(
+        opentelemetry::nostd::shared_ptr<
+            opentelemetry::context::propagation::TextMapPropagator>(
+            new opentelemetry::trace::propagation::HttpTraceContext()));

Review Comment:
   Yes, the application would do this as part of initializing OpenTelemetry, 
though generally the SDKs provide conveniences to configure this. 



##########
cpp/src/arrow/flight/server_tracing_middleware.cc:
##########
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/flight/server_tracing_middleware.h"
+
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/flight/transport/grpc/util_internal.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/tracing_internal.h"
+
+#ifdef ARROW_WITH_OPENTELEMETRY
+#include <opentelemetry/context/propagation/global_propagator.h>
+#include <opentelemetry/context/propagation/text_map_propagator.h>
+#include <opentelemetry/trace/context.h>
+#include <opentelemetry/trace/experimental_semantic_conventions.h>
+#include <opentelemetry/trace/propagation/http_trace_context.h>
+#endif
+
+namespace arrow {
+namespace flight {
+
+#ifdef ARROW_WITH_OPENTELEMETRY
+namespace otel = opentelemetry;
+namespace {
+class FlightServerCarrier : public otel::context::propagation::TextMapCarrier {
+ public:
+  explicit FlightServerCarrier(const CallHeaders& incoming_headers)
+      : incoming_headers_(incoming_headers) {}
+
+  otel::nostd::string_view Get(otel::nostd::string_view key) const noexcept 
override {
+    util::string_view arrow_key(key.data(), key.size());
+    auto it = incoming_headers_.find(arrow_key);
+    if (it == incoming_headers_.end()) return "";
+    util::string_view result = it->second;
+    return {result.data(), result.size()};
+  }
+
+  void Set(otel::nostd::string_view, otel::nostd::string_view) noexcept 
override {}
+
+  const CallHeaders& incoming_headers_;
+};
+class KeyValueCarrier : public otel::context::propagation::TextMapCarrier {
+ public:
+  explicit KeyValueCarrier(std::vector<TracingServerMiddleware::TraceKey>* 
items)
+      : items_(items) {}
+  otel::nostd::string_view Get(otel::nostd::string_view key) const noexcept 
override {
+    return {};
+  }
+  void Set(otel::nostd::string_view key,
+           otel::nostd::string_view value) noexcept override {
+    items_->emplace_back(std::string(key), std::string(value));
+  }
+
+ private:
+  std::vector<TracingServerMiddleware::TraceKey>* items_;
+};
+}  // namespace
+
+class TracingServerMiddleware::Impl {
+ public:
+  Impl(otel::trace::Scope scope, otel::nostd::shared_ptr<otel::trace::Span> 
span)
+      : scope_(std::move(scope)), span_(std::move(span)) {}
+  void CallCompleted(const Status& status) {
+    if (!status.ok()) {
+      auto grpc_status = transport::grpc::ToGrpcStatus(status, 
/*ctx=*/nullptr);
+      span_->SetStatus(otel::trace::StatusCode::kError, status.ToString());
+      span_->SetAttribute(OTEL_GET_TRACE_ATTR(AttrRpcGrpcStatusCode),
+                          static_cast<int32_t>(grpc_status.error_code()));
+    } else {
+      span_->SetStatus(otel::trace::StatusCode::kOk, "");
+      span_->SetAttribute(OTEL_GET_TRACE_ATTR(AttrRpcGrpcStatusCode), 
int32_t(0));
+    }

Review Comment:
   It'll end when the Span goes out of scope, but I'll just manually end it 
here to be explicit.



##########
python/pyarrow/tests/test_flight.py:
##########
@@ -2177,3 +2177,32 @@ def test_interpreter_shutdown():
     See https://issues.apache.org/jira/browse/ARROW-16597.
     """
     util.invoke_script("arrow_16597.py")
+
+
+class TracingFlightServer(FlightServerBase):
+    """A server that echoes back trace context values."""
+
+    def do_action(self, context, action):
+        trace_context = context.get_middleware("tracing").trace_context
+        # Don't turn this method into a generator since then it'll be
+        # lazily evaluated, and the trace context will be lost

Review Comment:
   Evaluating `.trace_context` is side-effectful and depends on implicit state 
maintained by OpenTelemetry, so if this is a generator it'll be evaluated after 
OpenTelemetry has already cleaned up the state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to