dygger commented on issue #52574:
URL: https://github.com/apache/airflow/issues/52574#issuecomment-3095991507

   @sjyangkevin, thank you for looking into this issue.
   
   The listener is defined in a separate file `test_listener.py` and is 
registered as part of Airflow plugin (see `test_plugin.py`) as described at 
[Listener 
Registration](https://airflow.apache.org/docs/apache-airflow/stable/howto/listener-plugin.html)
 section. Both files should be placed in `$AIRFLOW_HOME/plugins/` folder.
   
   ### plugins/test_plugin.py
   ```python
   import test_listener
   from airflow.plugins_manager import AirflowPlugin
   
   
   class TestPlugin(AirflowPlugin):
       name = "TestPlugin"
       listeners = [test_listener]
   ```
   
   ### plugins/test_listener.py
   ```python
   from __future__ import annotations
   
   import logging
   
   from airflow.listeners import hookimpl
   from airflow.models.taskinstance import TaskInstance
   from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance
   from airflow.utils.state import TaskInstanceState
   
   logger = logging.getLogger(__name__)
   
   
   @hookimpl
   def on_task_instance_success(
       previous_state: TaskInstanceState | None, task_instance: 
RuntimeTaskInstance | TaskInstance
   ):
       logger.info(f"on_task_instance_success: {task_instance.task_id}")
       if isinstance(task_instance, RuntimeTaskInstance):
           context = task_instance.get_template_context()
           outlets = context["outlets"]
           logger.info(f"on_task_instance_success outlets: {outlets}")
           outlet_events = context["outlet_events"]
           for outlet in outlets:
               event = outlet_events[outlet]
               logger.info(f"on_task_instance_success outlet event: {event}")
   
   ```
   
   ### dags/outlet_test_dag.py
   ```python
   from datetime import datetime
   
   from airflow.sdk import DAG, Asset, task
   
   asset = Asset("test-asset")
   
   with DAG(
       dag_id="asset-producer",
       start_date=datetime(2021, 1, 1),
       schedule=None,
       catchup=False,
       is_paused_upon_creation=False,
       tags=["test"],
   ):
   
       @task(outlets=[asset])
       def produce(
           *,
           outlet_events=None,
       ):
           outlet_events[asset].extra = {"name1": "value1", "nested_obj": 
{"name2": "value2"}}
           print(f"Outlet events: {outlet_events}")
   
       produce()
   
   
   with DAG(
       dag_id="asset-consumer",
       start_date=datetime(2021, 1, 1),
       schedule=[asset],
       catchup=False,
       is_paused_upon_creation=False,
       tags=["test"],
   ):
   
       @task(inlets=[asset])
       def consume(*, triggering_asset_events=None):
           for k, v in triggering_asset_events.items():
               print(f"[Consumer] triggering_asset_event: {k}: {v}")
   
       consume()
   
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to