This is an automated email from the ASF dual-hosted git repository.

wjones127 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-cookbook.git


The following commit(s) were added to refs/heads/main by this push:
     new 55f7d6d  [Python][Flight] Show how to propagate opentelemetry spans to 
Flight (#299)
55f7d6d is described below

commit 55f7d6dcf7888e4aa4af84cdbd659bfa6acd7d58
Author: Will Jones <[email protected]>
AuthorDate: Thu Mar 16 12:19:28 2023 -0700

    [Python][Flight] Show how to propagate opentelemetry spans to Flight (#299)
    
    Closes #297
    
    ---------
    
    Co-authored-by: Bryce Mecum <[email protected]>
---
 python/requirements.txt  |   2 +
 python/source/flight.rst | 217 ++++++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 218 insertions(+), 1 deletion(-)

diff --git a/python/requirements.txt b/python/requirements.txt
index 8d2a3f0..1d6e61a 100644
--- a/python/requirements.txt
+++ b/python/requirements.txt
@@ -1,3 +1,5 @@
 Sphinx>=4.0.2
 pyarrow==10.0.1
 pandas>=1.2.5
+opentelemetry-api>=1.0.0
+opentelemetry-sdk>=1.0.0
\ No newline at end of file
diff --git a/python/source/flight.rst b/python/source/flight.rst
index 6343e2b..67c359c 100644
--- a/python/source/flight.rst
+++ b/python/source/flight.rst
@@ -752,4 +752,219 @@ Running the client script, you should see the server 
printing out information ab
 .. _IBM article: 
https://www.ibm.com/docs/en/arl/9.7?topic=certification-extracting-certificate-keys-from-pfx-file
 .. _Windows: 
https://docs.microsoft.com/en-us/dotnet/core/additional-tools/self-signed-certificates-guide
 .. _Arrow testing data repository: 
https://github.com/apache/arrow-testing/tree/master/data/flight
-.. _openssl: 
https://www.ibm.com/docs/en/api-connect/2018.x?topic=overview-generating-self-signed-certificate-using-openssl
\ No newline at end of file
+.. _openssl: 
https://www.ibm.com/docs/en/api-connect/2018.x?topic=overview-generating-self-signed-certificate-using-openssl
+
+Propagating OpenTelemetry Traces
+================================
+
+Distributed tracing with OpenTelemetry_ allows collecting call-level 
performance
+measurements across a Flight service. In order to correlate spans across a 
Flight
+client and server, trace context must be passed between the two. This can be 
passed
+manually through headers in :class:`pyarrow.flight.FlightCallOptions`, or can 
+be automatically propagated using middleware.
+
+This example shows how to accomplish trace propagation through middleware.
+The client middleware needs to inject the trace context into the call headers.
+The server middleware needs to extract the trace context from the headers and
+pass the context into a new span. Optionally, the client middleware can also
+create a new span to time the client-side call.
+
+.. _OpenTelemetry: 
https://opentelemetry.io/docs/instrumentation/python/getting-started/
+
+**Step 1: define the client middleware:**
+
+.. testcode::
+
+    import pyarrow.flight as flight
+    from opentelemetry import trace
+    from opentelemetry.propagate import inject
+    from opentelemetry.trace.status import StatusCode
+
+    class ClientTracingMiddlewareFactory(flight.ClientMiddlewareFactory):
+        def __init__(self):
+            self._tracer = trace.get_tracer(__name__)
+
+        def start_call(self, info):
+            span = self._tracer.start_span(f"client.{info.method}")
+            return ClientTracingMiddleware(span)
+
+    class ClientTracingMiddleware(flight.ClientMiddleware):
+        def __init__(self, span):
+            self._span = span
+
+        def sending_headers(self):
+            ctx = trace.set_span_in_context(self._span)
+            carrier = {}
+            inject(carrier=carrier, context=ctx)
+            return carrier
+
+        def call_completed(self, exception):
+            if exception:
+                self._span.record_exception(exception)
+                self._span.set_status(StatusCode.ERROR)
+                print(exception)
+            else:
+                self._span.set_status(StatusCode.OK)
+            self._span.end()
+
+**Step 2: define the server middleware:**
+
+.. testcode::
+
+    import pyarrow.flight as flight
+    from opentelemetry import trace
+    from opentelemetry.propagate import extract
+    from opentelemetry.trace.status import StatusCode
+
+    class ServerTracingMiddlewareFactory(flight.ServerMiddlewareFactory):
+        def __init__(self):
+            self._tracer = trace.get_tracer(__name__)
+        
+        def start_call(self, info, headers):
+            context = extract(headers)
+            span = self._tracer.start_span(f"server.{info.method}", 
context=context)
+            return ServerTracingMiddleware(span)
+    
+    class ServerTracingMiddleware(flight.ServerMiddleware):
+        def __init__(self, span):
+            self._span = span
+        
+        def call_completed(self, exception):
+            if exception:
+                self._span.record_exception(exception)
+                self._span.set_status(StatusCode.ERROR)
+                print(exception)
+            else:
+                self._span.set_status(StatusCode.OK)
+            self._span.end()
+
+**Step 3: configure the trace exporter, processor, and provider:**
+
+Both the server and client will need to be configured with the OpenTelemetry 
SDK
+to record spans and export them somewhere. For the sake of the example, we'll 
+collect the spans into a Python list, but this is normally where you would set
+them up to be exported to some service like `Jaeger`_. See other examples of 
+exporters at `OpenTelemetry Exporters`_.
+
+As part of this, you will need to define the resource where spans are running.
+At a minimum this is the service name, but it could include other information 
like
+a hostname, process id, service version, and operating system.
+
+.. _Jaeger: https://www.jaegertracing.io/
+.. _`OpenTelemetry Exporters`: 
https://opentelemetry.io/docs/instrumentation/python/exporters/
+
+.. testcode::
+
+    from opentelemetry import trace
+    from opentelemetry.sdk.trace import TracerProvider
+    from opentelemetry.sdk.trace.export import SimpleSpanProcessor
+    from opentelemetry.sdk.resources import SERVICE_NAME, Resource
+    from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
+
+    class TestSpanExporter(SpanExporter):
+        def __init__(self):
+            self.spans = []
+    
+        def export(self, spans):
+            self.spans.extend(spans)
+            return SpanExportResult.SUCCESS
+
+    def configure_tracing():
+        # Service name is required for most backends,
+        # and although it's not necessary for console export,
+        # it's good to set service name anyways.
+        resource = Resource(attributes={
+            SERVICE_NAME: "my-service"
+        })
+        exporter = TestSpanExporter()
+        provider = TracerProvider(resource=resource)
+        processor = SimpleSpanProcessor(exporter)
+        provider.add_span_processor(processor)
+        trace.set_tracer_provider(provider)
+        return exporter
+
+**Step 4: add the middleware to the server:**
+
+We can use the middleware now in our EchoServer from earlier. 
+
+.. code-block::
+
+    if __name__ == '__main__':
+        exporter = configure_tracing()
+        server = EchoServer(
+            location="grpc://0.0.0.0:8816",
+            middleware={
+                "tracing": ServerTracingMiddlewareFactory()
+            },
+        )
+        server.serve()
+
+.. testcode::
+    :hide:
+
+    # Code block to start for real a server in background
+    # and wait for it to be available.
+    # Previous code block is just to show to user how to start it.
+    import threading
+    exporter = configure_tracing()
+    server = EchoServer(
+        location="grpc://0.0.0.0:8816",
+        middleware={
+            "tracing": ServerTracingMiddlewareFactory()
+        },
+    )
+    t = threading.Thread(target=server.serve)
+    t.start()
+
+**Step 5: add the middleware to the client:**
+
+.. testcode::
+
+   client = pa.flight.connect(
+       "grpc://0.0.0.0:8816",
+       middleware=[ClientTracingMiddlewareFactory()],
+   )
+
+**Step 6: use the client within active spans:**
+
+When we make a call with our client within an OpenTelemetry span, our client 
+middleware will create a child span for the client-side Flight call and then 
+propagate the span context to the server. Our server middleware will pick up 
+that trace context and create another child span.
+
+.. testcode::
+
+   from opentelemetry import trace
+
+   # Client would normally also need to configure tracing, but for this example
+   # the client and server are running in the same Python process.
+   # exporter = configure_tracing()
+
+   tracer = trace.get_tracer(__name__)
+
+   with tracer.start_as_current_span("hello_world") as span:
+       action = pa.flight.Action("echo", b"Hello, world!")
+       # Call list() on do_action to drain all results.
+       list(client.do_action(action=action))
+    
+   print(f"There are {len(exporter.spans)} spans.")
+   print(f"The span names are:\n  {list(span.name for span in 
exporter.spans)}.")
+   print(f"The span status codes are:\n  "
+         f"{list(span.status.status_code for span in exporter.spans)}.")
+
+.. testoutput::
+
+   There are 3 spans.
+   The span names are:
+     ['server.FlightMethod.DO_ACTION', 'client.FlightMethod.DO_ACTION', 
'hello_world'].
+   The span status codes are:
+     [<StatusCode.OK: 1>, <StatusCode.OK: 1>, <StatusCode.UNSET: 0>].
+
+As expected, we have three spans: one in our client code, one in the client
+middleware, and one in the server middleware.
+
+.. testcode::
+    :hide:
+
+    # Shutdown the server
+    server.shutdown()
\ No newline at end of file

Reply via email to