dygger commented on issue #52574: URL: https://github.com/apache/airflow/issues/52574#issuecomment-3095991507
@sjyangkevin, thank you for looking into this issue. The listener is defined in a separate file `test_listener.py` and is registered as part of Airflow plugin (see `test_plugin.py`) as described at [Listener Registration](https://airflow.apache.org/docs/apache-airflow/stable/howto/listener-plugin.html) section. Both files should be placed in `$AIRFLOW_HOME/plugins/` folder. ### plugins/test_plugin.py ```python import test_listener from airflow.plugins_manager import AirflowPlugin class TestPlugin(AirflowPlugin): name = "TestPlugin" listeners = [test_listener] ``` ### plugins/test_listener.py ```python from __future__ import annotations import logging from airflow.listeners import hookimpl from airflow.models.taskinstance import TaskInstance from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance from airflow.utils.state import TaskInstanceState logger = logging.getLogger(__name__) @hookimpl def on_task_instance_success( previous_state: TaskInstanceState | None, task_instance: RuntimeTaskInstance | TaskInstance ): logger.info(f"on_task_instance_success: {task_instance.task_id}") if isinstance(task_instance, RuntimeTaskInstance): context = task_instance.get_template_context() outlets = context["outlets"] logger.info(f"on_task_instance_success outlets: {outlets}") outlet_events = context["outlet_events"] for outlet in outlets: event = outlet_events[outlet] logger.info(f"on_task_instance_success outlet event: {event}") ``` ### dags/outlet_test_dag.py ```python from datetime import datetime from airflow.sdk import DAG, Asset, task asset = Asset("test-asset") with DAG( dag_id="asset-producer", start_date=datetime(2021, 1, 1), schedule=None, catchup=False, is_paused_upon_creation=False, tags=["test"], ): @task(outlets=[asset]) def produce( *, outlet_events=None, ): outlet_events[asset].extra = {"name1": "value1", "nested_obj": {"name2": "value2"}} print(f"Outlet events: {outlet_events}") produce() with DAG( dag_id="asset-consumer", start_date=datetime(2021, 1, 1), schedule=[asset], catchup=False, is_paused_upon_creation=False, tags=["test"], ): @task(inlets=[asset]) def consume(*, triggering_asset_events=None): for k, v in triggering_asset_events.items(): print(f"[Consumer] triggering_asset_event: {k}: {v}") consume() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
