amoghrajesh commented on code in PR #54505:
URL: https://github.com/apache/airflow/pull/54505#discussion_r2556251493


##########
airflow-core/src/airflow/exceptions.py:
##########
@@ -518,23 +303,43 @@ def __init__(self, dag_id: str | None = None, message: 
str | None = None):
             super().__init__(f"An unexpected error occurred while trying to 
deserialize Dag '{dag_id}'")
 
 
+class AirflowClearRunningTaskException(AirflowException):
+    """Raise when the user attempts to clear currently running tasks."""
+
+
+_DEPRECATED_EXCEPTIONS = {
+    "AirflowTaskTerminated": "airflow.sdk.exceptions.AirflowTaskTerminated",
+    "DuplicateTaskIdFound": "airflow.sdk.exceptions.DuplicateTaskIdFound",
+    "FailFastDagInvalidTriggerRule": 
"airflow.sdk.exceptions.FailFastDagInvalidTriggerRule",
+    "TaskAlreadyInTaskGroup": "airflow.sdk.exceptions.TaskAlreadyInTaskGroup",
+    "TaskDeferralTimeout": "airflow.sdk.exceptions.TaskDeferralTimeout",
+    "XComNotFound": "airflow.sdk.exceptions.XComNotFound",
+    "DownstreamTasksSkipped": "airflow.sdk.exceptions.DownstreamTasksSkipped",
+    "AirflowSensorTimeout": "airflow.sdk.exceptions.AirflowSensorTimeout",
+    "DagRunTriggerException": "airflow.sdk.exceptions.DagRunTriggerException",
+    "TaskDeferralError": "airflow.sdk.exceptions.TaskDeferralError",
+    "AirflowDagCycleException": 
"airflow.sdk.exceptions.AirflowDagCycleException",
+    "AirflowInactiveAssetInInletOrOutletException": 
"airflow.sdk.exceptions.AirflowInactiveAssetInInletOrOutletException",
+    "AirflowSkipException": "airflow.sdk.exceptions.AirflowSkipException",
+    "AirflowTaskTimeout": "airflow.sdk.exceptions.AirflowTaskTimeout",
+    "AirflowFailException": "airflow.sdk.exceptions.AirflowFailException",
+    "ParamValidationError": "airflow.sdk.exceptions.ParamValidationError",
+    "TaskDeferred": "airflow.sdk.exceptions.TaskDeferred",
+}
+
+
 def __getattr__(name: str):
     """Provide backward compatibility for moved exceptions."""
-    if name == "AirflowDagCycleException":
+    if name in _DEPRECATED_EXCEPTIONS:
         import warnings
 
-        from airflow.sdk.exceptions import AirflowDagCycleException
+        from airflow.utils.module_loading import import_string
 
+        target_path = _DEPRECATED_EXCEPTIONS[name]
         warnings.warn(
-            "airflow.exceptions.AirflowDagCycleException is deprecated. "
-            "Use airflow.sdk.exceptions.AirflowDagCycleException instead.",
+            f"airflow.exceptions.{name} is deprecated. Use {target_path} 
instead.",
             DeprecationWarning,
             stacklevel=2,
         )
-        return AirflowDagCycleException
-
+        return import_string(target_path)

Review Comment:
   Looks far better now!



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py:
##########
@@ -16,9 +16,7 @@
 # under the License.
 from __future__ import annotations
 
-from airflow.exceptions import (
-    AirflowException,
-)
+from airflow.exceptions import AirflowException

Review Comment:
   I left a comment about this earlier, but it can be done in a follow-up, no 
need to do in this PR.



##########
task-sdk/src/airflow/sdk/exceptions.py:
##########
@@ -18,15 +18,38 @@
 from __future__ import annotations
 
 import enum
+from http import HTTPStatus
 from typing import TYPE_CHECKING, Any
 
-from airflow.exceptions import AirflowException
 from airflow.sdk import TriggerRule
 
 if TYPE_CHECKING:
+    from collections.abc import Collection
+
+    from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, 
AssetUriRef
     from airflow.sdk.execution_time.comms import ErrorResponse
 
 
+class AirflowException(RuntimeError):

Review Comment:
   I have just one q regarding the backcompat here now. If there's a custom 
provider code like this one:
   
   ```python
   Python 3.13.3 (main, Apr  8 2025, 13:54:08) [Clang 17.0.0 
(clang-1700.0.13.3)] on darwin
   class AirflowExceptionOld(Exception):
       pass
   def submit_to_backend():
       raise AirflowException("Invalid job config")
   def run_job():
       try:
           submit_to_backend()
       except RuntimeError:
           return "retry"
       except AirflowException:
           return "validation_failed"
       
   AirflowException = AirflowExceptionOld
   run_job()
   Out[6]: 'validation_failed'
   class AirflowExceptionNew(RuntimeError):
       pass
   AirflowException = AirflowExceptionNew
   run_job()
   Out[9]: 'retry'
   ```
   
   Previously this would have executed a different branch but now it executes a 
different branch



##########
task-sdk/src/airflow/sdk/definitions/operator_resources.py:
##########
@@ -41,7 +40,7 @@ class Resource:
 
     def __init__(self, name, units_str, qty):
         if qty < 0:
-            raise AirflowException(
+            raise RuntimeError(

Review Comment:
   Shouldn't this be a ValueError?



##########
task-sdk/src/airflow/sdk/definitions/taskgroup.py:
##########
@@ -254,7 +253,7 @@ def add(self, task: DAGNode) -> DAGNode:
                     )
                 task.dag = self.dag
             if task.children:
-                raise AirflowException("Cannot add a non-empty TaskGroup")
+                raise RuntimeError("Cannot add a non-empty TaskGroup")

Review Comment:
   Should this be ValueError



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to