amoghrajesh commented on code in PR #54505:
URL: https://github.com/apache/airflow/pull/54505#discussion_r2564460082
##########
airflow-core/newsfragments/54505.significant.rst:
##########
@@ -0,0 +1,60 @@
+Move task-level exception imports into the Task SDK
+
+Airflow now sources task-facing exceptions (``AirflowSkipException``,
``TaskDeferred``, etc.) from
+``airflow.sdk.exceptions``. ``airflow.exceptions`` still exposes the same
symbols, but they are
Review Comment:
```suggestion
``airflow.sdk.exceptions``. ``airflow.exceptions`` still exposes the same
exceptions, but they are
```
##########
airflow-core/newsfragments/54505.significant.rst:
##########
@@ -0,0 +1,60 @@
+Move task-level exception imports into the Task SDK
+
+Airflow now sources task-facing exceptions (``AirflowSkipException``,
``TaskDeferred``, etc.) from
+``airflow.sdk.exceptions``. ``airflow.exceptions`` still exposes the same
symbols, but they are
+proxies that emit ``DeprecatedImportWarning`` so Dag authors can migrate
before the shim is removed.
+
+**What changed:**
+
+- Runtime code now consistently raises the SDK versions of task-level
exceptions.
+- The Task SDK redefines these classes so workers no longer depend on
``airflow-core`` at runtime.
+- ``airflow.providers.common.compat.sdk`` centralizes compatibility imports
for providers.
+
+**Behaviour changes:**
+
+- Sensors and other helpers that validate user input now raise ``ValueError``
(instead of
+ ``AirflowException``) when ``poke_interval``/ ``timeout`` arguments are
invalid.
+- Importing deprecated exception names from ``airflow.exceptions`` logs a
warning directing users to
+ the SDK import path.
+
+**Exceptions now provided by ``airflow.sdk.exceptions``:**
+
+- ``AirflowException`` and ``AirflowNotFoundException``
+- ``AirflowRescheduleException`` and ``AirflowSensorTimeout``
+- ``AirflowSkipException``, ``AirflowFailException``, ``AirflowTaskTimeout``,
``AirflowTaskTerminated``
+- ``TaskDeferred``, ``TaskDeferralTimeout``, ``TaskDeferralError``
+- ``DagRunTriggerException`` and ``DownstreamTasksSkipped``
+- ``AirflowDagCycleException`` and
``AirflowInactiveAssetInInletOrOutletException``
+- ``ParamValidationError``, ``DuplicateTaskIdFound``,
``TaskAlreadyInTaskGroup``, ``TaskNotFound``, ``XComNotFound``
+
+**Backwards compatibility:**
Review Comment:
```suggestion
**Backward compatibility:**
```
##########
airflow-core/src/airflow/exceptions.py:
##########
@@ -21,121 +21,66 @@
from __future__ import annotations
-from collections.abc import Collection, Sequence
-from datetime import datetime, timedelta
from http import HTTPStatus
-from typing import TYPE_CHECKING, Any, NamedTuple
+from typing import TYPE_CHECKING, NamedTuple
if TYPE_CHECKING:
from airflow.models import DagRun
- from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey,
AssetUriRef
- from airflow.utils.state import DagRunState
# Re exporting AirflowConfigException from shared configuration
from airflow._shared.configuration.exceptions import AirflowConfigException as
AirflowConfigException
+try:
+ from airflow.sdk.exceptions import (
+ AirflowException,
+ AirflowNotFoundException,
+ AirflowRescheduleException,
+ TaskNotFound,
+ )
+except ModuleNotFoundError:
+ # When _AIRFLOW__AS_LIBRARY is set, airflow.sdk may not be installed.
+ # In that case, we define fallback exception classes that mirror the SDK
ones.
Review Comment:
```suggestion
# Compatibility for airflow versions < 3.2
# We define fallback exception classes that mirror the SDK ones
```
##########
airflow-core/src/airflow/exceptions.py:
##########
@@ -518,23 +303,44 @@ def __init__(self, dag_id: str | None = None, message:
str | None = None):
super().__init__(f"An unexpected error occurred while trying to
deserialize Dag '{dag_id}'")
+class AirflowClearRunningTaskException(AirflowException):
+ """Raise when the user attempts to clear currently running tasks."""
+
+
+_DEPRECATED_EXCEPTIONS = {
+ "AirflowTaskTerminated": "airflow.sdk.exceptions.AirflowTaskTerminated",
+ "DuplicateTaskIdFound": "airflow.sdk.exceptions.DuplicateTaskIdFound",
+ "FailFastDagInvalidTriggerRule":
"airflow.sdk.exceptions.FailFastDagInvalidTriggerRule",
+ "TaskAlreadyInTaskGroup": "airflow.sdk.exceptions.TaskAlreadyInTaskGroup",
+ "TaskDeferralTimeout": "airflow.sdk.exceptions.TaskDeferralTimeout",
+ "XComNotFound": "airflow.sdk.exceptions.XComNotFound",
+ "DownstreamTasksSkipped": "airflow.sdk.exceptions.DownstreamTasksSkipped",
+ "AirflowSensorTimeout": "airflow.sdk.exceptions.AirflowSensorTimeout",
+ "DagRunTriggerException": "airflow.sdk.exceptions.DagRunTriggerException",
+ "TaskDeferralError": "airflow.sdk.exceptions.TaskDeferralError",
+ "AirflowDagCycleException":
"airflow.sdk.exceptions.AirflowDagCycleException",
+ "AirflowInactiveAssetInInletOrOutletException":
"airflow.sdk.exceptions.AirflowInactiveAssetInInletOrOutletException",
+ "AirflowSkipException": "airflow.sdk.exceptions.AirflowSkipException",
+ "AirflowTaskTimeout": "airflow.sdk.exceptions.AirflowTaskTimeout",
+ "AirflowFailException": "airflow.sdk.exceptions.AirflowFailException",
+ "ParamValidationError": "airflow.sdk.exceptions.ParamValidationError",
+ "TaskDeferred": "airflow.sdk.exceptions.TaskDeferred",
+}
+
+
def __getattr__(name: str):
"""Provide backward compatibility for moved exceptions."""
- if name == "AirflowDagCycleException":
+ if name in _DEPRECATED_EXCEPTIONS:
import warnings
- from airflow.sdk.exceptions import AirflowDagCycleException
+ from airflow import DeprecatedImportWarning
+ from airflow.utils.module_loading import import_string
+ target_path = _DEPRECATED_EXCEPTIONS[name]
warnings.warn(
- "airflow.exceptions.AirflowDagCycleException is deprecated. "
- "Use airflow.sdk.exceptions.AirflowDagCycleException instead.",
- DeprecationWarning,
+ f"airflow.exceptions.{name} is deprecated. Use {target_path}
instead.",
Review Comment:
```suggestion
f"airflow.exceptions.{name} is deprecated and will be removed in
a future version. Use {target_path} instead.",
```
nit: Keeping it aligned with most other occurrences
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]