This is an automated email from the ASF dual-hosted git repository.

shahar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 24f27d94d30 Add on_task_instance_skipped listener hookspec (#59467)
24f27d94d30 is described below

commit 24f27d94d306b054c1c4a8278eabfa7f98cc61ca
Author: tstrilka <[email protected]>
AuthorDate: Sat Jan 3 10:34:33 2026 +0100

    Add on_task_instance_skipped listener hookspec (#59467)
    
    Co-authored-by: Kacper Muda <[email protected]>
---
 .../src/airflow/listeners/spec/taskinstance.py     | 25 ++++++++++++++++
 .../src/airflow/sdk/execution_time/task_runner.py  |  6 ++++
 .../task_sdk/execution_time/test_task_runner.py    | 35 ++++++++++++++++++++++
 3 files changed, 66 insertions(+)

diff --git a/airflow-core/src/airflow/listeners/spec/taskinstance.py 
b/airflow-core/src/airflow/listeners/spec/taskinstance.py
index 7e4bd5ecd87..75b98e8a7b5 100644
--- a/airflow-core/src/airflow/listeners/spec/taskinstance.py
+++ b/airflow-core/src/airflow/listeners/spec/taskinstance.py
@@ -48,3 +48,28 @@ def on_task_instance_failed(
     error: None | str | BaseException,
 ):
     """Execute when task state changes to FAIL. previous_state can be None."""
+
+
+@hookspec
+def on_task_instance_skipped(
+    previous_state: TaskInstanceState | None,
+    task_instance: RuntimeTaskInstance | TaskInstance,
+):
+    """
+    Execute when a task instance skips itself during execution.
+
+    This hook is called only when a task has started execution and then
+    intentionally skips itself (e.g., by raising AirflowSkipException).
+
+    Note: This function will NOT cover tasks that were skipped by scheduler, 
before execution began, such as:
+        - Skips due to trigger rules (e.g., upstream failures)
+        - Skips from operators like BranchPythonOperator, 
ShortCircuitOperator, or similar mechanisms
+        - Any other situation in which the scheduler decides not to schedule a 
task for execution
+
+    For comprehensive tracking of skipped tasks, use DAG-level listeners
+    (on_dag_run_success/on_dag_run_failed) which may have access to all task 
states.
+
+    :param previous_state: Previous state of the task instance (can be None)
+    :param task_instance: The task instance object (RuntimeTaskInstance when 
called
+        from task execution context, TaskInstance when called from API server)
+    """
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 302c2152854..e910d8de6d7 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -1608,6 +1608,12 @@ def finalize(
             log.exception("error calling listener")
     elif state == TaskInstanceState.SKIPPED:
         _run_task_state_change_callbacks(task, "on_skipped_callback", context, 
log)
+        try:
+            get_listener_manager().hook.on_task_instance_skipped(
+                previous_state=TaskInstanceState.RUNNING, task_instance=ti
+            )
+        except Exception:
+            log.exception("error calling listener")
     elif state == TaskInstanceState.UP_FOR_RETRY:
         _run_task_state_change_callbacks(task, "on_retry_callback", context, 
log)
         try:
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py 
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 406a48a7ba8..64c4c56719a 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -3236,6 +3236,10 @@ class TestTaskRunnerCallsListeners:
             self.state.append(TaskInstanceState.FAILED)
             self.error = error
 
+        @hookimpl
+        def on_task_instance_skipped(self, previous_state, task_instance):
+            self.state.append(TaskInstanceState.SKIPPED)
+
         @hookimpl
         def before_stopping(self, component):
             self.component = component
@@ -3386,6 +3390,37 @@ class TestTaskRunnerCallsListeners:
         assert listener.state == [TaskInstanceState.RUNNING, 
TaskInstanceState.FAILED]
         assert listener.error == error
 
+    def test_task_runner_calls_listeners_skipped(self, mocked_parse, 
mock_supervisor_comms):
+        listener = self.CustomListener()
+        get_listener_manager().add_listener(listener)
+
+        class CustomOperator(BaseOperator):
+            def execute(self, context):
+                raise AirflowSkipException("Task intentionally skipped")
+
+        task = CustomOperator(
+            task_id="test_task_runner_calls_listeners_skipped", 
do_xcom_push=True, multiple_outputs=True
+        )
+        dag = get_inline_dag(dag_id="test_dag", task=task)
+        ti = TaskInstance(
+            id=uuid7(),
+            task_id=task.task_id,
+            dag_id=dag.dag_id,
+            run_id="test_run",
+            try_number=1,
+            dag_version_id=uuid7(),
+        )
+
+        runtime_ti = RuntimeTaskInstance.model_construct(
+            **ti.model_dump(exclude_unset=True), task=task, 
start_date=timezone.utcnow()
+        )
+        log = mock.MagicMock()
+        context = runtime_ti.get_template_context()
+        state, _, _ = run(runtime_ti, context, log)
+        finalize(runtime_ti, state, context, log)
+
+        assert listener.state == [TaskInstanceState.RUNNING, 
TaskInstanceState.SKIPPED]
+
     def test_listener_access_outlet_event_on_running_and_success(self, 
mocked_parse, mock_supervisor_comms):
         """Test listener can access outlet events through invoking 
get_template_context() while task running and success"""
         listener = self.CustomOutletEventsListener()

Reply via email to