kacpermuda commented on code in PR #59467:
URL: https://github.com/apache/airflow/pull/59467#discussion_r2619816175
##########
airflow-core/src/airflow/listeners/spec/taskinstance.py:
##########
@@ -48,3 +48,29 @@ def on_task_instance_failed(
error: None | str | BaseException,
):
"""Execute when task state changes to FAIL. previous_state can be None."""
+
+
+@hookspec
+def on_task_instance_skipped(
+ previous_state: TaskInstanceState | None,
+ task_instance: RuntimeTaskInstance | TaskInstance,
+):
+ """
+ Execute when a task instance skips itself by raising AirflowSkipException.
+
+ This hook is called only when a task has started execution and then
+ intentionally skips itself by raising AirflowSkipException.
+
+ Note: This does NOT cover tasks skipped by:
+ - Trigger rules (e.g., upstream failures)
+ - BranchPythonOperator (tasks not in selected branch)
+ - ShortCircuitOperator
+ - Scheduler-level decisions
+
+ For comprehensive skip tracking, use DAG-level listeners
Review Comment:
```suggestion
For comprehensive tracking of skipped tasks, use DAG-level listeners
```
##########
airflow-core/src/airflow/listeners/spec/taskinstance.py:
##########
@@ -48,3 +48,29 @@ def on_task_instance_failed(
error: None | str | BaseException,
):
"""Execute when task state changes to FAIL. previous_state can be None."""
+
+
+@hookspec
+def on_task_instance_skipped(
+ previous_state: TaskInstanceState | None,
+ task_instance: RuntimeTaskInstance | TaskInstance,
+):
+ """
+ Execute when a task instance skips itself by raising AirflowSkipException.
Review Comment:
```suggestion
Execute when a task instance skips itself during execution.
```
##########
airflow-core/src/airflow/listeners/spec/taskinstance.py:
##########
@@ -48,3 +48,29 @@ def on_task_instance_failed(
error: None | str | BaseException,
):
"""Execute when task state changes to FAIL. previous_state can be None."""
+
+
+@hookspec
+def on_task_instance_skipped(
+ previous_state: TaskInstanceState | None,
+ task_instance: RuntimeTaskInstance | TaskInstance,
+):
+ """
+ Execute when a task instance skips itself by raising AirflowSkipException.
+
+ This hook is called only when a task has started execution and then
+ intentionally skips itself by raising AirflowSkipException.
+
+ Note: This does NOT cover tasks skipped by:
+ - Trigger rules (e.g., upstream failures)
+ - BranchPythonOperator (tasks not in selected branch)
+ - ShortCircuitOperator
+ - Scheduler-level decisions
Review Comment:
```suggestion
Note: This function will NOT cover tasks that were skipped by scheduler,
before execution began, such as:
- Skips due to trigger rules (e.g., upstream failures)
- Skips from operators like BranchPythonOperator,
ShortCircuitOperator, or similar mechanisms
- Any other situation in which the scheduler decides not to schedule
a task for execution
```
##########
airflow-core/src/airflow/listeners/spec/taskinstance.py:
##########
@@ -48,3 +48,29 @@ def on_task_instance_failed(
error: None | str | BaseException,
):
"""Execute when task state changes to FAIL. previous_state can be None."""
+
+
+@hookspec
+def on_task_instance_skipped(
+ previous_state: TaskInstanceState | None,
+ task_instance: RuntimeTaskInstance | TaskInstance,
+):
+ """
+ Execute when a task instance skips itself by raising AirflowSkipException.
+
+ This hook is called only when a task has started execution and then
+ intentionally skips itself by raising AirflowSkipException.
+
+ Note: This does NOT cover tasks skipped by:
+ - Trigger rules (e.g., upstream failures)
+ - BranchPythonOperator (tasks not in selected branch)
+ - ShortCircuitOperator
+ - Scheduler-level decisions
+
+ For comprehensive skip tracking, use DAG-level listeners
+ (on_dag_run_success/on_dag_run_failed) which provide complete task state.
Review Comment:
```suggestion
(on_dag_run_success/on_dag_run_failed) which may have access to all task
states.
```
##########
airflow-core/src/airflow/listeners/spec/taskinstance.py:
##########
@@ -48,3 +48,29 @@ def on_task_instance_failed(
error: None | str | BaseException,
):
"""Execute when task state changes to FAIL. previous_state can be None."""
+
+
+@hookspec
+def on_task_instance_skipped(
+ previous_state: TaskInstanceState | None,
+ task_instance: RuntimeTaskInstance | TaskInstance,
+):
+ """
+ Execute when a task instance skips itself by raising AirflowSkipException.
+
+ This hook is called only when a task has started execution and then
+ intentionally skips itself by raising AirflowSkipException.
Review Comment:
```suggestion
intentionally skips itself (e.g., by raising AirflowSkipException).
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]