mtraynham edited a comment on issue #12771: URL: https://github.com/apache/airflow/issues/12771#issuecomment-1035159599
I'm using OpenTelemetry with Celery on our worker processes, so maybe this will help. We have a python module that hooks into the Celery worker processes directly via a Signal as suggested by the [OpenTelemetry Celery docs](https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/celery/celery.html). For us to hook into Celery from Airflow, we have a Python module that re-exports Airflow's [`DEFAULT_CELERY_CONFIG`](https://github.com/apache/airflow/blob/2.2.3/airflow/config_templates/default_celery.py#L39), provided the environment variable [`AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS`](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#celery-config-options). This helps us load our own module to inject the Celery signal when [prior to the Celery app is being created](https://github.com/apache/airflow/blob/2.2.3/airflow/executors/celery_executor.py#L71-L74). We then instrument the worker processes with whatever OpenTelemetry providers we need in a [`worker_process_init`](https://docs.celeryproject.org/en/stable/userguide/signals.html#worker-process-init) signal. *my_app/tracing.py* ```python from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG from celery.signals import worker_process_init from opentelemetry import trace from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.instrumentation.flask import FlaskInstrumentor from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient from opentelemetry.instrumentation.requests import RequestsInstrumentor from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import sampling from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor CELERY_CONFIG = DEFAULT_CELERY_CONFIG @worker_process_init.connect(weak=False) # type: ignore def instrument_worker(*args: typing.Any, **kwargs: typing.Any) -> None: tracer_provider = TracerProvider( resource=Resource(attributes={'service.name': 'my-service'}), sampler=sampling.ALWAYS_ON ) tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter())) trace.set_tracer_provider(tracer_provider) GrpcInstrumentorClient().instrument() RequestsInstrumentor().instrument() ``` Our worker process is then started with the following configuration option: ```shell export AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS=my_app.tracing.CELERY_CONFIG ``` For parent context's when executing spans, we embed a OpenTelemetry trace ID into the DAG run configuration from whatever service is executing the DAG. There is however two strange things that occur with this. 1. We would have preferred using the `BatchSpanProcessor` as this runs in a separate thread queuing up spans, but it seems that not all the spans were being exported to our OTLP collector and records were missing. Using the `SimpleSpanProcessor` did not have that issue. 2. The `OTLPSpanExporter` may cause the following errors below. I haven't been able to pin-point why that occurs, but the `ConsoleSpanExporter` does not suffer from that issue. It doesn't seem like any tasks are failing and largely this seems to occur on Workers that have no tasks being currently ran. Celery's `worker_process_init` is limited to a 4-second blocking call, so maybe it's a startup timing issue on the Worker process that causes it? I attempted to increase Celery's timeout ([worker_proc_alive_timeout](https://docs.celeryproject.org/en/stable/userguide/configuration.html#worker-proc-alive-timeout)) there to something higher like 20 seconds and still saw the same issue though. > [2022-02-07 20:34:21,364: ERROR/MainProcess] Process 'ForkPoolWorker-32' pid:3506 exited with 'signal 11 (SIGSEGV)' > [2022-02-07 20:34:21,421: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 11 (SIGSEGV) Job: 245.') -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org