mtraynham edited a comment on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-1035159599


   I'm using OpenTelemetry with Celery on our worker processes, so maybe this 
will help.
   
   We have a python module that hooks into the Celery worker processes directly 
via a Signal as suggested by the [OpenTelemetry Celery 
docs](https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/celery/celery.html).
  
   
   For us to hook into Celery from Airflow, we have a Python module that 
re-exports Airflow's 
[`DEFAULT_CELERY_CONFIG`](https://github.com/apache/airflow/blob/2.2.3/airflow/config_templates/default_celery.py#L39),
 provided the environment variable 
[`AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS`](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#celery-config-options).
  This helps us load our own module to inject the Celery signal when [prior to 
the Celery app is being 
created](https://github.com/apache/airflow/blob/2.2.3/airflow/executors/celery_executor.py#L71-L74).
  We then instrument the worker processes with whatever OpenTelemetry providers 
we need in a 
[`worker_process_init`](https://docs.celeryproject.org/en/stable/userguide/signals.html#worker-process-init)
 signal.
   
   *my_app/tracing.py*
   ```python
   from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
   from celery.signals import worker_process_init
   from opentelemetry import trace
   from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import 
OTLPSpanExporter
   from opentelemetry.instrumentation.flask import FlaskInstrumentor
   from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient
   from opentelemetry.instrumentation.requests import RequestsInstrumentor
   from opentelemetry.sdk.resources import Resource
   from opentelemetry.sdk.trace import sampling
   from opentelemetry.sdk.trace import TracerProvider
   from opentelemetry.sdk.trace.export import SimpleSpanProcessor
   
   CELERY_CONFIG = DEFAULT_CELERY_CONFIG
   
   
   @worker_process_init.connect(weak=False)  # type: ignore
   def instrument_worker(*args: typing.Any, **kwargs: typing.Any) -> None:
       tracer_provider = TracerProvider(
           resource=Resource(attributes={'service.name': 'my-service'}),
           sampler=sampling.ALWAYS_ON
       )
       
tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter()))
       trace.set_tracer_provider(tracer_provider)
       GrpcInstrumentorClient().instrument()
       RequestsInstrumentor().instrument()
   ```
   
   Our worker process is then started with the following configuration option:
   ```shell
   export AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS=my_app.tracing.CELERY_CONFIG
   ```
   
   For parent context's when executing spans, we embed a OpenTelemetry trace ID 
into the DAG run configuration from whatever service is executing the DAG.
   
   There is however two strange things that occur with this.
   1. We would have preferred using the `BatchSpanProcessor` as this runs in a 
separate thread queuing up spans, but it seems that not all the spans were 
being exported to our OTLP collector and records were missing.  Using the 
`SimpleSpanProcessor` did not have that issue.
   2. The `OTLPSpanExporter` may cause the following errors below.  I haven't 
been able to pin-point why that occurs, but the `ConsoleSpanExporter` does not 
suffer from that issue.  It doesn't seem like any tasks are failing and largely 
this seems to occur on Workers that have no tasks being currently ran.  
Celery's `worker_process_init` is limited to a 4-second blocking call, so maybe 
it's a startup timing issue on the Worker process that causes it?  I attempted 
to increase Celery's timeout 
([worker_proc_alive_timeout](https://docs.celeryproject.org/en/stable/userguide/configuration.html#worker-proc-alive-timeout))
 there to something higher like 20 seconds and still saw the same issue though.
   
   > [2022-02-07 20:34:21,364: ERROR/MainProcess] Process 'ForkPoolWorker-32' 
pid:3506 exited with 'signal 11 (SIGSEGV)'
   > [2022-02-07 20:34:21,421: ERROR/MainProcess] Task handler raised error: 
WorkerLostError('Worker exited prematurely: signal 11 (SIGSEGV) Job: 245.')
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to