amoghrajesh commented on code in PR #54505:
URL: https://github.com/apache/airflow/pull/54505#discussion_r2564460082


##########
airflow-core/newsfragments/54505.significant.rst:
##########
@@ -0,0 +1,60 @@
+Move task-level exception imports into the Task SDK
+
+Airflow now sources task-facing exceptions (``AirflowSkipException``, 
``TaskDeferred``, etc.) from
+``airflow.sdk.exceptions``. ``airflow.exceptions`` still exposes the same 
symbols, but they are

Review Comment:
   ```suggestion
   ``airflow.sdk.exceptions``. ``airflow.exceptions`` still exposes the same 
exceptions, but they are
   ```



##########
airflow-core/newsfragments/54505.significant.rst:
##########
@@ -0,0 +1,60 @@
+Move task-level exception imports into the Task SDK
+
+Airflow now sources task-facing exceptions (``AirflowSkipException``, 
``TaskDeferred``, etc.) from
+``airflow.sdk.exceptions``. ``airflow.exceptions`` still exposes the same 
symbols, but they are
+proxies that emit ``DeprecatedImportWarning`` so Dag authors can migrate 
before the shim is removed.
+
+**What changed:**
+
+- Runtime code now consistently raises the SDK versions of task-level 
exceptions.
+- The Task SDK redefines these classes so workers no longer depend on 
``airflow-core`` at runtime.
+- ``airflow.providers.common.compat.sdk`` centralizes compatibility imports 
for providers.
+
+**Behaviour changes:**
+
+- Sensors and other helpers that validate user input now raise ``ValueError`` 
(instead of
+  ``AirflowException``) when ``poke_interval``/ ``timeout`` arguments are 
invalid.
+- Importing deprecated exception names from ``airflow.exceptions`` logs a 
warning directing users to
+  the SDK import path.
+
+**Exceptions now provided by ``airflow.sdk.exceptions``:**
+
+- ``AirflowException`` and ``AirflowNotFoundException``
+- ``AirflowRescheduleException`` and ``AirflowSensorTimeout``
+- ``AirflowSkipException``, ``AirflowFailException``, ``AirflowTaskTimeout``, 
``AirflowTaskTerminated``
+- ``TaskDeferred``, ``TaskDeferralTimeout``, ``TaskDeferralError``
+- ``DagRunTriggerException`` and ``DownstreamTasksSkipped``
+- ``AirflowDagCycleException`` and 
``AirflowInactiveAssetInInletOrOutletException``
+- ``ParamValidationError``, ``DuplicateTaskIdFound``, 
``TaskAlreadyInTaskGroup``, ``TaskNotFound``, ``XComNotFound``
+
+**Backwards compatibility:**

Review Comment:
   ```suggestion
   **Backward compatibility:**
   ```



##########
airflow-core/src/airflow/exceptions.py:
##########
@@ -21,121 +21,66 @@
 
 from __future__ import annotations
 
-from collections.abc import Collection, Sequence
-from datetime import datetime, timedelta
 from http import HTTPStatus
-from typing import TYPE_CHECKING, Any, NamedTuple
+from typing import TYPE_CHECKING, NamedTuple
 
 if TYPE_CHECKING:
     from airflow.models import DagRun
-    from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, 
AssetUriRef
-    from airflow.utils.state import DagRunState
 
 # Re exporting AirflowConfigException from shared configuration
 from airflow._shared.configuration.exceptions import AirflowConfigException as 
AirflowConfigException
 
+try:
+    from airflow.sdk.exceptions import (
+        AirflowException,
+        AirflowNotFoundException,
+        AirflowRescheduleException,
+        TaskNotFound,
+    )
+except ModuleNotFoundError:
+    # When _AIRFLOW__AS_LIBRARY is set, airflow.sdk may not be installed.
+    # In that case, we define fallback exception classes that mirror the SDK 
ones.

Review Comment:
   ```suggestion
       # Compatibility for airflow versions < 3.2
       # We define fallback exception classes that mirror the SDK ones
   ```



##########
airflow-core/src/airflow/exceptions.py:
##########
@@ -518,23 +303,44 @@ def __init__(self, dag_id: str | None = None, message: 
str | None = None):
             super().__init__(f"An unexpected error occurred while trying to 
deserialize Dag '{dag_id}'")
 
 
+class AirflowClearRunningTaskException(AirflowException):
+    """Raise when the user attempts to clear currently running tasks."""
+
+
+_DEPRECATED_EXCEPTIONS = {
+    "AirflowTaskTerminated": "airflow.sdk.exceptions.AirflowTaskTerminated",
+    "DuplicateTaskIdFound": "airflow.sdk.exceptions.DuplicateTaskIdFound",
+    "FailFastDagInvalidTriggerRule": 
"airflow.sdk.exceptions.FailFastDagInvalidTriggerRule",
+    "TaskAlreadyInTaskGroup": "airflow.sdk.exceptions.TaskAlreadyInTaskGroup",
+    "TaskDeferralTimeout": "airflow.sdk.exceptions.TaskDeferralTimeout",
+    "XComNotFound": "airflow.sdk.exceptions.XComNotFound",
+    "DownstreamTasksSkipped": "airflow.sdk.exceptions.DownstreamTasksSkipped",
+    "AirflowSensorTimeout": "airflow.sdk.exceptions.AirflowSensorTimeout",
+    "DagRunTriggerException": "airflow.sdk.exceptions.DagRunTriggerException",
+    "TaskDeferralError": "airflow.sdk.exceptions.TaskDeferralError",
+    "AirflowDagCycleException": 
"airflow.sdk.exceptions.AirflowDagCycleException",
+    "AirflowInactiveAssetInInletOrOutletException": 
"airflow.sdk.exceptions.AirflowInactiveAssetInInletOrOutletException",
+    "AirflowSkipException": "airflow.sdk.exceptions.AirflowSkipException",
+    "AirflowTaskTimeout": "airflow.sdk.exceptions.AirflowTaskTimeout",
+    "AirflowFailException": "airflow.sdk.exceptions.AirflowFailException",
+    "ParamValidationError": "airflow.sdk.exceptions.ParamValidationError",
+    "TaskDeferred": "airflow.sdk.exceptions.TaskDeferred",
+}
+
+
 def __getattr__(name: str):
     """Provide backward compatibility for moved exceptions."""
-    if name == "AirflowDagCycleException":
+    if name in _DEPRECATED_EXCEPTIONS:
         import warnings
 
-        from airflow.sdk.exceptions import AirflowDagCycleException
+        from airflow import DeprecatedImportWarning
+        from airflow.utils.module_loading import import_string
 
+        target_path = _DEPRECATED_EXCEPTIONS[name]
         warnings.warn(
-            "airflow.exceptions.AirflowDagCycleException is deprecated. "
-            "Use airflow.sdk.exceptions.AirflowDagCycleException instead.",
-            DeprecationWarning,
+            f"airflow.exceptions.{name} is deprecated. Use {target_path} 
instead.",

Review Comment:
   ```suggestion
               f"airflow.exceptions.{name} is deprecated and will be removed in 
a future version. Use {target_path} instead.",
   ```
   
   nit: Keeping it aligned with most other occurrences



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to