GitHub user sona26G added a comment to the discussion: Why is my TaskInstance 
Listener not Working?

I have deployed Apache airflow 2.8.3 on Kubernetes with Kubernetes executor. I 
have also created a custom event listener to fetch dag_run and task_instance 
events. I have uploaded the listener files in /opt/airflow/plugins in the 
scheduler pod. I ran some custom dag and I am able to fetch the dag related 
events but the task instance events are not coming in. Is there something that 
im missing? Do I need to put the event listener in the individual pods that 
come up for the dag task? I assume that scheduler should be able to fetch both 
dag and task events??

attaching the event listener code below:

`from __future__ import annotations
import os
import logging
from configparser import ConfigParser
from typing import TYPE_CHECKING
import pathlib
import requests
from airflow.listeners import hookimpl
import asyncio
from event_logging_plugin.ad_nats_client import AdNatsClient

if TYPE_CHECKING:
    from sqlalchemy.orm.session import Session
    from airflow.models.dagrun import DagRun
    from airflow.models.taskinstance import TaskInstance
    from airflow.utils.state import TaskInstanceState
    from airflow.sdk.definitions.asset import Asset, AssetAlias
    from airflow.listeners.task_instance import TaskInstanceListener

logging.basicConfig(filename="airflow_event_listener.log",
                    filemode='a',
                    format='%(asctime)s,%(msecs)d %(name)s %(levelname)s 
%(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    level=logging.INFO)

config_path = pathlib.Path(__file__).parent.absolute() / "config.ini"
parser = ConfigParser()
parser.read(config_path)

nats_client = None

def get_nats_client():
    global nats_client

    if nats_client is None:
        raw = parser.get("pulse", "ad.events.url")
        servers = [s.strip() for s in raw.split(",")]
        nats_client = AdNatsClient(servers)

    return nats_client

async def publish_to_nats(event_detail: dict):
    try:
        cluster = parser.get("pulse", "ad.cluster")
        subject = f"airflow_events_{cluster}"

        client = get_nats_client()
        await client.publish_event(subject, event_detail)

    except Exception as e:
        logging.error("Failed pushing event to NATS: %s", e)


def post_event(event_detail: dict):
    try:
        loop = asyncio.get_event_loop()
        if loop.is_running():
            logging.debug("Airflow event loop already running → using 
create_task()")
            loop.create_task(publish_to_nats(event_detail))
        else:
            loop.run_until_complete(publish_to_nats(event_detail))
    except RuntimeError:
        logging.debug("No event loop → using asyncio.run()")
        asyncio.run(publish_to_nats(event_detail))



def convert_to_millis(dt):
    """Convert a datetime object to milliseconds since the epoch."""
    if dt is None:
        return None
    return int(dt.timestamp() * 1000)


if parser.has_option("pulse", "ad.events.url"):
    cluster = parser.get("pulse", "ad.cluster")
else:
    cluster = "UnknownCluster"
    logging.error("Missing cluster in config.ini")

connectorContext = {
    "monitor": "airflow",
    "group": cluster,
    "userId": "",
    "uuid": ""
}


def get_connector_context():
    group = connectorContext.get("group")
    if not group:
        logging.error("Missing group in connector context")
        connectorContext["group"] = "UnknownGroup"
        return connectorContext
    return connectorContext


def extract_task_instance_details(event: str, previous_state: TaskInstanceState,
                                  task_instance: TaskInstance):
    eventType = "airflow_task_instance"
    try:
        details = {
            "eventType": eventType,
            "event": event,
            "task_id": task_instance.task_id,
            "dag_id": task_instance.dag_id,
            "run_id": task_instance.run_id,
            "previous_state": str(previous_state),
            "map_index": task_instance.map_index,
            "start_date": convert_to_millis(task_instance.start_date),
            "end_date": convert_to_millis(task_instance.end_date),
            "duration": task_instance.duration,
            "state": task_instance.state,
            "max_tries": task_instance.max_tries,
            "hostname": task_instance.hostname,
            "unixname": task_instance.unixname,
            "job_id": task_instance.job_id,
            "pool": task_instance.pool,
            "pool_slots": task_instance.pool_slots,
            "queue": task_instance.queue,
            "priority_weight": task_instance.priority_weight,
            "operator": task_instance.operator,
            "custom_operator_name": task_instance.custom_operator_name,
            "queued_dttm": convert_to_millis(task_instance.queued_dttm),
            "queued_by_job_id": task_instance.queued_by_job_id,
            "pid": task_instance.pid,
            "executor_config": task_instance.executor_config,
            "updated_at": convert_to_millis(task_instance.updated_at),
            "external_executor_id": task_instance.external_executor_id,
            "trigger_id": task_instance.trigger_id,
            "trigger_timeout": str(task_instance.trigger_timeout),
            "next_method": task_instance.next_method,
            "next_kwargs": task_instance.next_kwargs,
        }
        adevent = {
            "event": details,
        }
        return adevent
    except Exception as e:
        error_details = {
            "eventType": eventType,
            "event": event,
            "message": "Exception while parsing: " + str(e),
            "instance_details": str(task_instance)
        }
        adevent = {
            "event": error_details,
        }
        return adevent


@hookimpl
def on_task_instance_running(previous_state: TaskInstanceState, task_instance: 
TaskInstance,
                             session: Session = None):
    """
    This method is called when task state changes to RUNNING.
    """
    event_detail = extract_task_instance_details("on_task_instance_running", 
previous_state,
                                                 task_instance)
    logging.info("Pushing to nats for task instance event")
    post_event(event_detail)


@hookimpl
def on_task_instance_success(previous_state: TaskInstanceState, task_instance: 
TaskInstance,
                             session: Session = None):
    """
    This method is called when task state changes to SUCCESS.
    """
    logging.info("HOOK-FIRED: on_task_instance_success")

    try:
        event_detail = 
extract_task_instance_details("on_task_instance_success", previous_state, 
task_instance)
        post_event(event_detail)
    except Exception:
        logging.exception("Exception in on_task_instance_success")


@hookimpl
def on_task_instance_failed(previous_state: TaskInstanceState, task_instance: 
TaskInstance,
                            session: Session = None):
    """
    This method is called when task state changes to FAILED.
    """
    details = extract_task_instance_details("on_task_instance_failed", 
previous_state,
                                            task_instance)
    post_event(details)


def extract_dag_run_instance_details(event: str, dag_run: DagRun):
    eventType = "airflow_dag_run_instance"
    try:
        details = {
            "eventType": eventType,
            "event": event,
            "dag_id": dag_run.dag_id,
            "queued_at": convert_to_millis(dag_run.queued_at),
            "execution_date": convert_to_millis(dag_run.execution_date),
            "start_date": convert_to_millis(dag_run.start_date),
            "end_date": convert_to_millis(dag_run.end_date),
            "state": str(dag_run.state),
            "run_id": dag_run.run_id,
            "creating_job_id": dag_run.creating_job_id,
            "external_trigger": dag_run.external_trigger,
            "run_type": dag_run.run_type,
            "conf": str(dag_run.conf),
            "data_interval_start": 
convert_to_millis(dag_run.data_interval_start),
            "data_interval_end": convert_to_millis(dag_run.data_interval_end),
            "last_scheduling_decision": 
convert_to_millis(dag_run.last_scheduling_decision),
            "dag_hash": dag_run.dag_hash,
            "log_template_id": dag_run.log_template_id,
            "updated_at": convert_to_millis(dag_run.updated_at),
            "clear_number": dag_run.clear_number
        }
        adevent = {
            "event": details,
        }
        return adevent
    except Exception as e:
        error_details = {
            "eventType": eventType,
            "event": event,
            "message": "Exception parsing: " + str(e),
            "dag_run_details": str(dag_run)
        }
        adevent = {
            "event": error_details,
        }
        return adevent


@hookimpl
def on_dag_run_success(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to SUCCESS.
    """
    details = extract_dag_run_instance_details("on_dag_run_success", dag_run)
    logging.info(f"Pushing to nats for dag run event---- {str(dag_run)}-----")
    post_event(details)


@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to FAILED.
    """
    details = extract_dag_run_instance_details("on_dag_run_failed", dag_run)
    post_event(details)


@hookimpl
def on_dag_run_running(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to RUNNING.
    """
    details = extract_dag_run_instance_details("on_dag_run_running", dag_run)
    post_event(details)


def extract_asset_details(event: str, asset: Asset):
    eventType = "airflow_asset_instance"
    try:
        details = {
            "eventType": eventType,
            "event": event,
            "asset_data": str(asset),
        }
        adevent = {
            "event": details,
        }
        return adevent
    except Exception as e:
        error_details = {
            "eventType": eventType,
            "event": event,
            "message": "Exception parsing: " + str(e),
            "asset_details": str(asset)
        }
        adevent = {
            "event": error_details,
        }
        return adevent



@hookimpl
def on_asset_created(asset: Asset):
    """Execute when a new asset is created."""
    details = extract_asset_details("on_asset_created", asset)
    logging.info("Pushing to nats for asset created event")
    post_event(details)



@hookimpl
def on_asset_alias_created(asset_alias: AssetAlias):
    """Execute when a new asset alias is created."""


@hookimpl
def on_asset_changed(asset: Asset):
    """Execute when asset change is registered."""
    details = extract_asset_details("on_asset_changed", asset)
    logging.info("Pushing to nats for asset changed event")
    post_event(details)`

GitHub link: 
https://github.com/apache/airflow/discussions/33683#discussioncomment-15010973

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to