This is an automated email from the ASF dual-hosted git repository. pierrejeambrun pushed a commit to branch v2-5-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 1456f11a99a1a3fe5f8513b4082a89e64e417866 Author: Bowrna <[email protected]> AuthorDate: Sat Jan 21 02:58:27 2023 +0530 listener plugin example added (#27905) (cherry picked from commit 100bb8d79a1e0c5fe6fca4b69c529b447cc992d1) --- airflow/example_dags/plugins/event_listener.py | 156 ++++++++++++++++++++++++ airflow/example_dags/plugins/listener_plugin.py | 26 ++++ docs/apache-airflow/howto/index.rst | 1 + docs/apache-airflow/howto/listener-plugin.rst | 95 +++++++++++++++ docs/spelling_wordlist.txt | 1 + 5 files changed, 279 insertions(+) diff --git a/airflow/example_dags/plugins/event_listener.py b/airflow/example_dags/plugins/event_listener.py new file mode 100644 index 0000000000..2e2d01800b --- /dev/null +++ b/airflow/example_dags/plugins/event_listener.py @@ -0,0 +1,156 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from airflow.listeners import hookimpl + +if TYPE_CHECKING: + from airflow.models.dagrun import DagRun + from airflow.models.taskinstance import TaskInstance + from airflow.utils.state import TaskInstanceState + + +# [START howto_listen_ti_running_task] +@hookimpl +def on_task_instance_running(previous_state: TaskInstanceState, task_instance: TaskInstance, session): + """ + This method is called when task state changes to RUNNING. + Through callback, parameters like previous_task_state, task_instance object can be accessed. + This will give more information about current task_instance that is running its dag_run, + task and dag information. + """ + print("Task instance is in running state") + print(" Previous state of the Task instance:", previous_state) + + state: TaskInstanceState = task_instance.state + name: str = task_instance.task_id + start_date = task_instance.start_date + + dagrun = task_instance.dag_run + dagrun_status = dagrun.state + + task = task_instance.task + + dag = task.dag + dag_name = None + if dag: + dag_name = dag.dag_id + print(f"Current task name:{name} state:{state} start_date:{start_date}") + print(f"Dag name:{dag_name} and current dag run status:{dagrun_status}") + + +# [END howto_listen_ti_running_task] + +# [START howto_listen_ti_success_task] +@hookimpl +def on_task_instance_success(previous_state: TaskInstanceState, task_instance: TaskInstance, session): + """ + This method is called when task state changes to SUCCESS. + Through callback, parameters like previous_task_state, task_instance object can be accessed. + This will give more information about current task_instance that has succeeded its + dag_run, task and dag information. + """ + print("Task instance in success state") + print(" Previous state of the Task instance:", previous_state) + + dag_id = task_instance.dag_id + hostname = task_instance.hostname + operator = task_instance.operator + + dagrun = task_instance.dag_run + queued_at = dagrun.queued_at + print(f"Dag name:{dag_id} queued_at:{queued_at}") + print(f"Task hostname:{hostname} operator:{operator}") + + +# [END howto_listen_ti_success_task] + +# [START howto_listen_ti_failure_task] +@hookimpl +def on_task_instance_failed(previous_state: TaskInstanceState, task_instance: TaskInstance, session): + """ + This method is called when task state changes to FAILED. + Through callback, parameters like previous_task_state, task_instance object can be accessed. + This will give more information about current task_instance that has failed its dag_run, + task and dag information. + """ + print("Task instance in failure state") + + start_date = task_instance.start_date + end_date = task_instance.end_date + duration = task_instance.duration + + dagrun = task_instance.dag_run + + task = task_instance.task + + dag = task_instance.task.dag + + print(f"Task start:{start_date} end:{end_date} duration:{duration}") + print(f"Task:{task} dag:{dag} dagrun:{dagrun}") + + +# [END howto_listen_ti_failure_task] + +# [START howto_listen_dagrun_success_task] +@hookimpl +def on_dag_run_success(dag_run: DagRun, message: str): + """ + This method is called when dag run state changes to SUCCESS. + """ + print("Dag run in success state") + start_date = dag_run.start_date + end_date = dag_run.end_date + + print(f"Dag run start:{start_date} end:{end_date}") + + +# [END howto_listen_dagrun_success_task] + +# [START howto_listen_dagrun_failure_task] +@hookimpl +def on_dag_run_failed(dag_run: DagRun, message: str): + """ + This method is called when dag run state changes to FAILED. + """ + print("Dag run in failure state") + dag_id = dag_run.dag_id + run_id = dag_run.run_id + external_trigger = dag_run.external_trigger + + print(f"Dag information:{dag_id} Run id: {run_id} external trigger: {external_trigger}") + + +# [END howto_listen_dagrun_failure_task] + +# [START howto_listen_dagrun_running_task] +@hookimpl +def on_dag_run_running(dag_run: DagRun, message: str): + """ + This method is called when dag run state changes to RUNNING. + """ + print("Dag run in running state") + queued_at = dag_run.queued_at + dag_hash_info = dag_run.dag_hash + + print(f"Dag information Queued at: {queued_at} hash info: {dag_hash_info}") + + +# [END howto_listen_dagrun_running_task] diff --git a/airflow/example_dags/plugins/listener_plugin.py b/airflow/example_dags/plugins/listener_plugin.py new file mode 100644 index 0000000000..a365d57e3f --- /dev/null +++ b/airflow/example_dags/plugins/listener_plugin.py @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from airflow.example_dags.plugins import event_listener +from airflow.plugins_manager import AirflowPlugin + + +class MetadataCollectionPlugin(AirflowPlugin): + name = "MetadataCollectionPlugin" + listeners = [event_listener] diff --git a/docs/apache-airflow/howto/index.rst b/docs/apache-airflow/howto/index.rst index 5d6b54fd77..7a170828d5 100644 --- a/docs/apache-airflow/howto/index.rst +++ b/docs/apache-airflow/howto/index.rst @@ -37,6 +37,7 @@ configuring an Airflow environment. operator/index timetable custom-view-plugin + listener-plugin customize-ui custom-operator create-custom-decorator diff --git a/docs/apache-airflow/howto/listener-plugin.rst b/docs/apache-airflow/howto/listener-plugin.rst new file mode 100644 index 0000000000..7b46a9de8a --- /dev/null +++ b/docs/apache-airflow/howto/listener-plugin.rst @@ -0,0 +1,95 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +Listener Plugin of Airflow +========================== + +Airflow has feature that allows to add listener for monitoring and tracking +the task state using Plugins. + +This is a simple example listener plugin of Airflow that helps to track the task +state and collect useful metadata information about the task, dag run and dag. + +This is an example plugin for Airflow that allows to create listener plugin of Airflow. +This plugin works by using SQLAlchemy's event mechanism. It watches +the task instance state change in the table level and triggers event. +This will be notified for all the tasks across all the DAGs. + +In this plugin, an object reference is derived from the base class +``airflow.plugins_manager.AirflowPlugin``. + +Listener plugin uses pluggy app under the hood. Pluggy is an app built for plugin +management and hook calling for Pytest. Pluggy enables function hooking so it allows +building "pluggable" systems with your own customization over that hooking. + +Using this plugin, following events can be listened: + * task instance is in running state. + * task instance is in success state. + * task instance is in failure state. + * dag run is in running state. + * dag run is in success state. + * dag run is in failure state. + * on start before event like airflow job, scheduler or backfilljob + * before stop for event like airflow job, scheduler or backfilljob + +Listener Registration +--------------------- + +A listener plugin with object reference to listener object is registered +as part of airflow plugin. The following is a +skeleton for us to implement a new listener: + +.. code-block:: python + + from airflow.plugins_manager import AirflowPlugin + + # This is the listener file created where custom code to monitor is added over hookimpl + import listener + + + class MetadataCollectionPlugin(AirflowPlugin): + name = "MetadataCollectionPlugin" + listeners = [listener] + + +Next, we can check code added into ``listener`` and see implementation +methods for each of those listeners. After the implementation, the listener part +gets executed during all the task execution across all the DAGs + +For reference, here's the plugin code within ``listener.py`` class that shows list of tables in the database: + +This example listens when the task instance is in running state + +.. exampleinclude:: ../../../airflow/example_dags/plugins/event_listener.py + :language: python + :start-after: [START howto_listen_ti_running_task] + :end-before: [END howto_listen_ti_running_task] + +Similarly, code to listen after task_instance success and failure can be implemented. + +This example listens when the dag run is change to failed state + +.. exampleinclude:: ../../../airflow/example_dags/plugins/event_listener.py + :language: python + :start-after: [START howto_listen_dagrun_failure_task] + :end-before: [END howto_listen_dagrun_failure_task] + +Similarly, code to listen after dag_run success and during running state can be implemented. + +The listener plugin files required to add the listener implementation is added as part of the +Airflow plugin into ``$AIRFLOW_HOME/plugins/`` folder and loaded during Airflow startup. diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 734723894a..5a62b70939 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -123,6 +123,7 @@ backfill backfillable backfilled backfilling +backfilljob BackfillJobTest Backfills backfills
