amoghrajesh commented on code in PR #54505:
URL: https://github.com/apache/airflow/pull/54505#discussion_r2554992216


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py:
##########
@@ -81,13 +76,18 @@
     PodNotFoundException,
     PodPhase,
 )
-from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_1_PLUS
+from airflow.providers.cncf.kubernetes.version_compat import (
+    AIRFLOW_V_3_1_PLUS,
+    AirflowSkipException,
+    TaskDeferred,
+)
 from airflow.providers.common.compat.sdk import XCOM_RETURN_KEY
 
 if AIRFLOW_V_3_1_PLUS:
     from airflow.sdk import BaseOperator
 else:
     from airflow.models import BaseOperator
+from airflow.exceptions import AirflowException

Review Comment:
   Or from version compat since you have defined them



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py:
##########
@@ -81,13 +76,18 @@
     PodNotFoundException,
     PodPhase,
 )
-from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_1_PLUS
+from airflow.providers.cncf.kubernetes.version_compat import (
+    AIRFLOW_V_3_1_PLUS,
+    AirflowSkipException,
+    TaskDeferred,
+)
 from airflow.providers.common.compat.sdk import XCOM_RETURN_KEY
 
 if AIRFLOW_V_3_1_PLUS:
     from airflow.sdk import BaseOperator
 else:
     from airflow.models import BaseOperator
+from airflow.exceptions import AirflowException

Review Comment:
   Should this not be from airflow.sdk?



##########
airflow-core/src/airflow/exceptions.py:
##########
@@ -21,29 +21,35 @@
 
 from __future__ import annotations
 
-from collections.abc import Collection, Sequence
-from datetime import datetime, timedelta
+import os
 from http import HTTPStatus
-from typing import TYPE_CHECKING, Any, NamedTuple
+from typing import TYPE_CHECKING, NamedTuple
 
 if TYPE_CHECKING:
     from airflow.models import DagRun
-    from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, 
AssetUriRef
-    from airflow.utils.state import DagRunState
 
+# When _AIRFLOW__AS_LIBRARY is set, airflow.sdk may not be installed
+# In that case, we define fallback exception classes
+if os.environ.get("_AIRFLOW__AS_LIBRARY"):
+    try:
+        from airflow.sdk.exceptions import AirflowException, 
AirflowNotFoundException
+    except ImportError:
+        # Fallback exception classes when airflow.sdk is not installed
+        class AirflowException(RuntimeError):  # type: ignore[no-redef]
+            """Base exception for Airflow errors."""
 
-class AirflowException(Exception):
-    """
-    Base class for all Airflow's errors.
+            pass
 
-    Each custom exception should be derived from this class.
-    """
+        class AirflowNotFoundException(AirflowException):  # type: 
ignore[no-redef]
+            """Raise when a requested object is not found."""
 
-    status_code = HTTPStatus.INTERNAL_SERVER_ERROR
+            pass
+else:
+    from airflow.sdk.exceptions import AirflowException, 
AirflowNotFoundException
 
-    def serialize(self):
-        cls = self.__class__
-        return f"{cls.__module__}.{cls.__name__}", (str(self),), {}
+
+class TaskNotFound(AirflowException):
+    """Raise when a Task is not available in the system."""

Review Comment:
   This one is present in sdk already right? Why do we need to redefine?



##########
airflow-core/src/airflow/exceptions.py:
##########
@@ -519,19 +304,40 @@ def __init__(self, dag_id: str | None = None, message: 
str | None = None):
             super().__init__(f"An unexpected error occurred while trying to 
deserialize Dag '{dag_id}'")
 
 
+_DEPRECATED_EXCEPTIONS = {
+    "AirflowTaskTerminated": "airflow.sdk.exceptions.AirflowTaskTerminated",
+    "DuplicateTaskIdFound": "airflow.sdk.exceptions.DuplicateTaskIdFound",
+    "FailFastDagInvalidTriggerRule": 
"airflow.sdk.exceptions.FailFastDagInvalidTriggerRule",
+    "TaskAlreadyInTaskGroup": "airflow.sdk.exceptions.TaskAlreadyInTaskGroup",
+    "TaskDeferralTimeout": "airflow.sdk.exceptions.TaskDeferralTimeout",
+    "XComNotFound": "airflow.sdk.exceptions.XComNotFound",
+    "DownstreamTasksSkipped": "airflow.sdk.exceptions.DownstreamTasksSkipped",
+    "AirflowSensorTimeout": "airflow.sdk.exceptions.AirflowSensorTimeout",
+    "DagRunTriggerException": "airflow.sdk.exceptions.DagRunTriggerException",
+    "TaskDeferralError": "airflow.sdk.exceptions.TaskDeferralError",
+    "AirflowDagCycleException": 
"airflow.sdk.exceptions.AirflowDagCycleException",
+    "AirflowInactiveAssetInInletOrOutletException": 
"airflow.sdk.exceptions.AirflowInactiveAssetInInletOrOutletException",
+    "AirflowSkipException": "airflow.sdk.exceptions.AirflowSkipException",
+    "AirflowTaskTimeout": "airflow.sdk.exceptions.AirflowTaskTimeout",
+    "AirflowFailException": "airflow.sdk.exceptions.AirflowFailException",
+    "ParamValidationError": "airflow.sdk.exceptions.ParamValidationError",
+    "TaskDeferred": "airflow.sdk.exceptions.TaskDeferred",
+}
+
+
 def __getattr__(name: str):
     """Provide backward compatibility for moved exceptions."""
-    if name == "AirflowDagCycleException":
+    if name in _DEPRECATED_EXCEPTIONS:
         import warnings
+        from importlib import import_module
+        from operator import attrgetter
 
-        from airflow.sdk.exceptions import AirflowDagCycleException
-
+        target_path = _DEPRECATED_EXCEPTIONS[name]
         warnings.warn(
-            "airflow.exceptions.AirflowDagCycleException is deprecated. "
-            "Use airflow.sdk.exceptions.AirflowDagCycleException instead.",
+            f"airflow.exceptions.{name} is deprecated. Use {target_path} 
instead.",
             DeprecationWarning,
             stacklevel=2,
         )
-        return AirflowDagCycleException
-
+        module_path, attr_name = target_path.rsplit(".", 1)
+        return attrgetter(attr_name)(import_module(module_path))

Review Comment:
   You could also use a simpler pattern like this:
   ```python
   def __getattr__(name: str):
       """Provide backward compatibility for moved functions in this module."""
       if name == "render_template_as_native":
           import warnings
   
           from airflow.sdk.definitions.context import render_template_as_native
   
           warnings.warn(
               "airflow.utils.helpers.render_template_as_native is deprecated. "
               "Use airflow.sdk.definitions.context.render_template_as_native 
instead.",
               DeprecationWarning,
               stacklevel=2,
           )
           return render_template_as_native
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to