This is an automated email from the ASF dual-hosted git repository.
wjones127 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-cookbook.git
The following commit(s) were added to refs/heads/main by this push:
new 55f7d6d [Python][Flight] Show how to propagate opentelemetry spans to
Flight (#299)
55f7d6d is described below
commit 55f7d6dcf7888e4aa4af84cdbd659bfa6acd7d58
Author: Will Jones <[email protected]>
AuthorDate: Thu Mar 16 12:19:28 2023 -0700
[Python][Flight] Show how to propagate opentelemetry spans to Flight (#299)
Closes #297
---------
Co-authored-by: Bryce Mecum <[email protected]>
---
python/requirements.txt | 2 +
python/source/flight.rst | 217 ++++++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 218 insertions(+), 1 deletion(-)
diff --git a/python/requirements.txt b/python/requirements.txt
index 8d2a3f0..1d6e61a 100644
--- a/python/requirements.txt
+++ b/python/requirements.txt
@@ -1,3 +1,5 @@
Sphinx>=4.0.2
pyarrow==10.0.1
pandas>=1.2.5
+opentelemetry-api>=1.0.0
+opentelemetry-sdk>=1.0.0
\ No newline at end of file
diff --git a/python/source/flight.rst b/python/source/flight.rst
index 6343e2b..67c359c 100644
--- a/python/source/flight.rst
+++ b/python/source/flight.rst
@@ -752,4 +752,219 @@ Running the client script, you should see the server
printing out information ab
.. _IBM article:
https://www.ibm.com/docs/en/arl/9.7?topic=certification-extracting-certificate-keys-from-pfx-file
.. _Windows:
https://docs.microsoft.com/en-us/dotnet/core/additional-tools/self-signed-certificates-guide
.. _Arrow testing data repository:
https://github.com/apache/arrow-testing/tree/master/data/flight
-.. _openssl:
https://www.ibm.com/docs/en/api-connect/2018.x?topic=overview-generating-self-signed-certificate-using-openssl
\ No newline at end of file
+.. _openssl:
https://www.ibm.com/docs/en/api-connect/2018.x?topic=overview-generating-self-signed-certificate-using-openssl
+
+Propagating OpenTelemetry Traces
+================================
+
+Distributed tracing with OpenTelemetry_ allows collecting call-level
performance
+measurements across a Flight service. In order to correlate spans across a
Flight
+client and server, trace context must be passed between the two. This can be
passed
+manually through headers in :class:`pyarrow.flight.FlightCallOptions`, or can
+be automatically propagated using middleware.
+
+This example shows how to accomplish trace propagation through middleware.
+The client middleware needs to inject the trace context into the call headers.
+The server middleware needs to extract the trace context from the headers and
+pass the context into a new span. Optionally, the client middleware can also
+create a new span to time the client-side call.
+
+.. _OpenTelemetry:
https://opentelemetry.io/docs/instrumentation/python/getting-started/
+
+**Step 1: define the client middleware:**
+
+.. testcode::
+
+ import pyarrow.flight as flight
+ from opentelemetry import trace
+ from opentelemetry.propagate import inject
+ from opentelemetry.trace.status import StatusCode
+
+ class ClientTracingMiddlewareFactory(flight.ClientMiddlewareFactory):
+ def __init__(self):
+ self._tracer = trace.get_tracer(__name__)
+
+ def start_call(self, info):
+ span = self._tracer.start_span(f"client.{info.method}")
+ return ClientTracingMiddleware(span)
+
+ class ClientTracingMiddleware(flight.ClientMiddleware):
+ def __init__(self, span):
+ self._span = span
+
+ def sending_headers(self):
+ ctx = trace.set_span_in_context(self._span)
+ carrier = {}
+ inject(carrier=carrier, context=ctx)
+ return carrier
+
+ def call_completed(self, exception):
+ if exception:
+ self._span.record_exception(exception)
+ self._span.set_status(StatusCode.ERROR)
+ print(exception)
+ else:
+ self._span.set_status(StatusCode.OK)
+ self._span.end()
+
+**Step 2: define the server middleware:**
+
+.. testcode::
+
+ import pyarrow.flight as flight
+ from opentelemetry import trace
+ from opentelemetry.propagate import extract
+ from opentelemetry.trace.status import StatusCode
+
+ class ServerTracingMiddlewareFactory(flight.ServerMiddlewareFactory):
+ def __init__(self):
+ self._tracer = trace.get_tracer(__name__)
+
+ def start_call(self, info, headers):
+ context = extract(headers)
+ span = self._tracer.start_span(f"server.{info.method}",
context=context)
+ return ServerTracingMiddleware(span)
+
+ class ServerTracingMiddleware(flight.ServerMiddleware):
+ def __init__(self, span):
+ self._span = span
+
+ def call_completed(self, exception):
+ if exception:
+ self._span.record_exception(exception)
+ self._span.set_status(StatusCode.ERROR)
+ print(exception)
+ else:
+ self._span.set_status(StatusCode.OK)
+ self._span.end()
+
+**Step 3: configure the trace exporter, processor, and provider:**
+
+Both the server and client will need to be configured with the OpenTelemetry
SDK
+to record spans and export them somewhere. For the sake of the example, we'll
+collect the spans into a Python list, but this is normally where you would set
+them up to be exported to some service like `Jaeger`_. See other examples of
+exporters at `OpenTelemetry Exporters`_.
+
+As part of this, you will need to define the resource where spans are running.
+At a minimum this is the service name, but it could include other information
like
+a hostname, process id, service version, and operating system.
+
+.. _Jaeger: https://www.jaegertracing.io/
+.. _`OpenTelemetry Exporters`:
https://opentelemetry.io/docs/instrumentation/python/exporters/
+
+.. testcode::
+
+ from opentelemetry import trace
+ from opentelemetry.sdk.trace import TracerProvider
+ from opentelemetry.sdk.trace.export import SimpleSpanProcessor
+ from opentelemetry.sdk.resources import SERVICE_NAME, Resource
+ from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
+
+ class TestSpanExporter(SpanExporter):
+ def __init__(self):
+ self.spans = []
+
+ def export(self, spans):
+ self.spans.extend(spans)
+ return SpanExportResult.SUCCESS
+
+ def configure_tracing():
+ # Service name is required for most backends,
+ # and although it's not necessary for console export,
+ # it's good to set service name anyways.
+ resource = Resource(attributes={
+ SERVICE_NAME: "my-service"
+ })
+ exporter = TestSpanExporter()
+ provider = TracerProvider(resource=resource)
+ processor = SimpleSpanProcessor(exporter)
+ provider.add_span_processor(processor)
+ trace.set_tracer_provider(provider)
+ return exporter
+
+**Step 4: add the middleware to the server:**
+
+We can use the middleware now in our EchoServer from earlier.
+
+.. code-block::
+
+ if __name__ == '__main__':
+ exporter = configure_tracing()
+ server = EchoServer(
+ location="grpc://0.0.0.0:8816",
+ middleware={
+ "tracing": ServerTracingMiddlewareFactory()
+ },
+ )
+ server.serve()
+
+.. testcode::
+ :hide:
+
+ # Code block to start for real a server in background
+ # and wait for it to be available.
+ # Previous code block is just to show to user how to start it.
+ import threading
+ exporter = configure_tracing()
+ server = EchoServer(
+ location="grpc://0.0.0.0:8816",
+ middleware={
+ "tracing": ServerTracingMiddlewareFactory()
+ },
+ )
+ t = threading.Thread(target=server.serve)
+ t.start()
+
+**Step 5: add the middleware to the client:**
+
+.. testcode::
+
+ client = pa.flight.connect(
+ "grpc://0.0.0.0:8816",
+ middleware=[ClientTracingMiddlewareFactory()],
+ )
+
+**Step 6: use the client within active spans:**
+
+When we make a call with our client within an OpenTelemetry span, our client
+middleware will create a child span for the client-side Flight call and then
+propagate the span context to the server. Our server middleware will pick up
+that trace context and create another child span.
+
+.. testcode::
+
+ from opentelemetry import trace
+
+ # Client would normally also need to configure tracing, but for this example
+ # the client and server are running in the same Python process.
+ # exporter = configure_tracing()
+
+ tracer = trace.get_tracer(__name__)
+
+ with tracer.start_as_current_span("hello_world") as span:
+ action = pa.flight.Action("echo", b"Hello, world!")
+ # Call list() on do_action to drain all results.
+ list(client.do_action(action=action))
+
+ print(f"There are {len(exporter.spans)} spans.")
+ print(f"The span names are:\n {list(span.name for span in
exporter.spans)}.")
+ print(f"The span status codes are:\n "
+ f"{list(span.status.status_code for span in exporter.spans)}.")
+
+.. testoutput::
+
+ There are 3 spans.
+ The span names are:
+ ['server.FlightMethod.DO_ACTION', 'client.FlightMethod.DO_ACTION',
'hello_world'].
+ The span status codes are:
+ [<StatusCode.OK: 1>, <StatusCode.OK: 1>, <StatusCode.UNSET: 0>].
+
+As expected, we have three spans: one in our client code, one in the client
+middleware, and one in the server middleware.
+
+.. testcode::
+ :hide:
+
+ # Shutdown the server
+ server.shutdown()
\ No newline at end of file