This is an automated email from the ASF dual-hosted git repository.
shahar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 24f27d94d30 Add on_task_instance_skipped listener hookspec (#59467)
24f27d94d30 is described below
commit 24f27d94d306b054c1c4a8278eabfa7f98cc61ca
Author: tstrilka <[email protected]>
AuthorDate: Sat Jan 3 10:34:33 2026 +0100
Add on_task_instance_skipped listener hookspec (#59467)
Co-authored-by: Kacper Muda <[email protected]>
---
.../src/airflow/listeners/spec/taskinstance.py | 25 ++++++++++++++++
.../src/airflow/sdk/execution_time/task_runner.py | 6 ++++
.../task_sdk/execution_time/test_task_runner.py | 35 ++++++++++++++++++++++
3 files changed, 66 insertions(+)
diff --git a/airflow-core/src/airflow/listeners/spec/taskinstance.py
b/airflow-core/src/airflow/listeners/spec/taskinstance.py
index 7e4bd5ecd87..75b98e8a7b5 100644
--- a/airflow-core/src/airflow/listeners/spec/taskinstance.py
+++ b/airflow-core/src/airflow/listeners/spec/taskinstance.py
@@ -48,3 +48,28 @@ def on_task_instance_failed(
error: None | str | BaseException,
):
"""Execute when task state changes to FAIL. previous_state can be None."""
+
+
+@hookspec
+def on_task_instance_skipped(
+ previous_state: TaskInstanceState | None,
+ task_instance: RuntimeTaskInstance | TaskInstance,
+):
+ """
+ Execute when a task instance skips itself during execution.
+
+ This hook is called only when a task has started execution and then
+ intentionally skips itself (e.g., by raising AirflowSkipException).
+
+ Note: This function will NOT cover tasks that were skipped by scheduler,
before execution began, such as:
+ - Skips due to trigger rules (e.g., upstream failures)
+ - Skips from operators like BranchPythonOperator,
ShortCircuitOperator, or similar mechanisms
+ - Any other situation in which the scheduler decides not to schedule a
task for execution
+
+ For comprehensive tracking of skipped tasks, use DAG-level listeners
+ (on_dag_run_success/on_dag_run_failed) which may have access to all task
states.
+
+ :param previous_state: Previous state of the task instance (can be None)
+ :param task_instance: The task instance object (RuntimeTaskInstance when
called
+ from task execution context, TaskInstance when called from API server)
+ """
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 302c2152854..e910d8de6d7 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -1608,6 +1608,12 @@ def finalize(
log.exception("error calling listener")
elif state == TaskInstanceState.SKIPPED:
_run_task_state_change_callbacks(task, "on_skipped_callback", context,
log)
+ try:
+ get_listener_manager().hook.on_task_instance_skipped(
+ previous_state=TaskInstanceState.RUNNING, task_instance=ti
+ )
+ except Exception:
+ log.exception("error calling listener")
elif state == TaskInstanceState.UP_FOR_RETRY:
_run_task_state_change_callbacks(task, "on_retry_callback", context,
log)
try:
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 406a48a7ba8..64c4c56719a 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -3236,6 +3236,10 @@ class TestTaskRunnerCallsListeners:
self.state.append(TaskInstanceState.FAILED)
self.error = error
+ @hookimpl
+ def on_task_instance_skipped(self, previous_state, task_instance):
+ self.state.append(TaskInstanceState.SKIPPED)
+
@hookimpl
def before_stopping(self, component):
self.component = component
@@ -3386,6 +3390,37 @@ class TestTaskRunnerCallsListeners:
assert listener.state == [TaskInstanceState.RUNNING,
TaskInstanceState.FAILED]
assert listener.error == error
+ def test_task_runner_calls_listeners_skipped(self, mocked_parse,
mock_supervisor_comms):
+ listener = self.CustomListener()
+ get_listener_manager().add_listener(listener)
+
+ class CustomOperator(BaseOperator):
+ def execute(self, context):
+ raise AirflowSkipException("Task intentionally skipped")
+
+ task = CustomOperator(
+ task_id="test_task_runner_calls_listeners_skipped",
do_xcom_push=True, multiple_outputs=True
+ )
+ dag = get_inline_dag(dag_id="test_dag", task=task)
+ ti = TaskInstance(
+ id=uuid7(),
+ task_id=task.task_id,
+ dag_id=dag.dag_id,
+ run_id="test_run",
+ try_number=1,
+ dag_version_id=uuid7(),
+ )
+
+ runtime_ti = RuntimeTaskInstance.model_construct(
+ **ti.model_dump(exclude_unset=True), task=task,
start_date=timezone.utcnow()
+ )
+ log = mock.MagicMock()
+ context = runtime_ti.get_template_context()
+ state, _, _ = run(runtime_ti, context, log)
+ finalize(runtime_ti, state, context, log)
+
+ assert listener.state == [TaskInstanceState.RUNNING,
TaskInstanceState.SKIPPED]
+
def test_listener_access_outlet_event_on_running_and_success(self,
mocked_parse, mock_supervisor_comms):
"""Test listener can access outlet events through invoking
get_template_context() while task running and success"""
listener = self.CustomOutletEventsListener()