GitHub user mxmrlt added a comment to the discussion: Propagated OpenTelemetry Trace Information to downstream KubernetesPodOperator jobs
Hi. For now you can manage to do it thanks to @howardyoo OTEL Provider https://github.com/howardyoo/airflow_otel_provider and AOP programming. Sure it's not the perfect solution but it works. That's how I can monitor my Airflow instance within Elastic APM. Of course I'd like it to be a native feature. Here's an example with Kafka Operator. ```python @aspectlib.Aspect def instrument_producer(*args, **kwargs): producer = yield aspectlib.Proceed instrumentation = ConfluentKafkaInstrumentor() producer = instrumentation.instrument_producer( producer=producer, tracer_provider=otel_hook.tracer_provider ) yield aspectlib.Return(producer) @aspectlib.Aspect(bind=True) def instrument_execute(cutpoint, *args, **kwargs): from airflow.operators.python import get_current_context dag_context = get_current_context() with otel_hook.start_as_current_span( name=cutpoint.__name__, dag_context=dag_context ) as s: yield aspectlib.Proceed(*args, **kwargs) aspectlib.weave(KafkaProducerHook.get_producer, instrument_producer) aspectlib.weave(target=ProduceToTopicOperator.execute, aspects=instrument_execute) produce_treats = ProduceToTopicOperator( task_id="produce_treats", kafka_config_id="kafka_default", topic=KAFKA_TOPIC, producer_function=prod_function, producer_function_args=["{{ ti.xcom_pull(task_ids='get_number_of_treats')}}"], producer_function_kwargs={ "pet_name": "{{ ti.xcom_pull(task_ids='get_your_pet_name')}}" }, poll_timeout=10, ) ``` GitHub link: https://github.com/apache/airflow/discussions/43352#discussioncomment-11110035 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
