GitHub user mxmrlt added a comment to the discussion: Propagated OpenTelemetry 
Trace Information to downstream KubernetesPodOperator jobs

Hi.

For now you can manage to do it thanks to @howardyoo OTEL Provider 
https://github.com/howardyoo/airflow_otel_provider and AOP programming.

Sure it's not the perfect solution but it works. That's how I can monitor my 
Airflow instance within Elastic APM.
Of course I'd like it to be a native feature.

Here's an example with Kafka Operator.

```python
@aspectlib.Aspect
def instrument_producer(*args, **kwargs):
    producer = yield aspectlib.Proceed
    instrumentation = ConfluentKafkaInstrumentor()
    producer = instrumentation.instrument_producer(
        producer=producer, tracer_provider=otel_hook.tracer_provider
    )

    yield aspectlib.Return(producer)

@aspectlib.Aspect(bind=True)
def instrument_execute(cutpoint, *args, **kwargs):

    from airflow.operators.python import get_current_context

    dag_context = get_current_context()

    with otel_hook.start_as_current_span(
        name=cutpoint.__name__, dag_context=dag_context
    ) as s:
        yield aspectlib.Proceed(*args, **kwargs)


aspectlib.weave(KafkaProducerHook.get_producer, instrument_producer)
aspectlib.weave(target=ProduceToTopicOperator.execute, 
aspects=instrument_execute)

produce_treats = ProduceToTopicOperator(
    task_id="produce_treats",
    kafka_config_id="kafka_default",
    topic=KAFKA_TOPIC,
    producer_function=prod_function,
    producer_function_args=["{{ 
ti.xcom_pull(task_ids='get_number_of_treats')}}"],
    producer_function_kwargs={
        "pet_name": "{{ ti.xcom_pull(task_ids='get_your_pet_name')}}"
    },
    poll_timeout=10,
)
```

GitHub link: 
https://github.com/apache/airflow/discussions/43352#discussioncomment-11110035

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to