ashb commented on code in PR #54505:
URL: https://github.com/apache/airflow/pull/54505#discussion_r2472656186


##########
airflow-core/src/airflow/exceptions.py:
##########
@@ -21,29 +21,26 @@
 
 from __future__ import annotations
 
-from collections.abc import Collection, Sequence
-from datetime import datetime, timedelta
 from http import HTTPStatus
-from typing import TYPE_CHECKING, Any, NamedTuple
+from typing import TYPE_CHECKING, NamedTuple
 
 if TYPE_CHECKING:
     from airflow.models import DagRun
-    from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, 
AssetUriRef
-    from airflow.utils.state import DagRunState
 
+try:
+    from airflow.sdk.exceptions import AirflowException
+except ModuleNotFoundError:
+    # The shared libraries are unable to see the 'sdk' package, so redefine 
here

Review Comment:
   I don't quite get what this comment means?



##########
providers/common/sql/tests/unit/common/sql/operators/test_sql.py:
##########
@@ -59,7 +59,11 @@
 from tests_common.test_utils.db import clear_db_dag_bundles, clear_db_dags, 
clear_db_runs, clear_db_xcom
 from tests_common.test_utils.markers import 
skip_if_force_lowest_dependencies_marker
 from tests_common.test_utils.providers import get_provider_min_airflow_version
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_1, 
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import (
+    AIRFLOW_V_3_0_1,
+    AIRFLOW_V_3_0_PLUS,
+    AirflowFailException,

Review Comment:
   Ditto, `AirflowException` from this module? 🤷🏻 



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py:
##########
@@ -81,13 +76,18 @@
     PodNotFoundException,
     PodPhase,
 )
-from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_1_PLUS
+from airflow.providers.cncf.kubernetes.version_compat import (
+    AIRFLOW_V_3_1_PLUS,
+    AirflowSkipException,
+    TaskDeferred,
+)
 from airflow.providers.common.compat.sdk import XCOM_RETURN_KEY
 
 if AIRFLOW_V_3_1_PLUS:
     from airflow.sdk import BaseOperator
 else:
     from airflow.models import BaseOperator
+from airflow.exceptions import AirflowException

Review Comment:
   Shouldn't this be in `airflow.providers.cncf.kubernetes.version_compat` too?



##########
task-sdk/src/airflow/sdk/bases/sensor.py:
##########
@@ -145,40 +145,38 @@ def _coerce_poke_interval(poke_interval: float | 
timedelta) -> timedelta:
             return poke_interval
         if isinstance(poke_interval, (int, float)) and poke_interval >= 0:
             return timedelta(seconds=poke_interval)
-        raise AirflowException(
-            "Operator arg `poke_interval` must be timedelta object or a 
non-negative number"
-        )
+        raise RuntimeError("Operator arg `poke_interval` must be timedelta 
object or a non-negative number")
 
     @staticmethod
     def _coerce_timeout(timeout: float | timedelta) -> timedelta:
         if isinstance(timeout, timedelta):
             return timeout
         if isinstance(timeout, (int, float)) and timeout >= 0:
             return timedelta(seconds=timeout)
-        raise AirflowException("Operator arg `timeout` must be timedelta 
object or a non-negative number")
+        raise RuntimeError("Operator arg `timeout` must be timedelta object or 
a non-negative number")
 
     @staticmethod
     def _coerce_max_wait(max_wait: float | timedelta | None) -> timedelta | 
None:
         if max_wait is None or isinstance(max_wait, timedelta):
             return max_wait
         if isinstance(max_wait, (int, float)) and max_wait >= 0:
             return timedelta(seconds=max_wait)
-        raise AirflowException("Operator arg `max_wait` must be timedelta 
object or a non-negative number")
+        raise RuntimeError("Operator arg `max_wait` must be timedelta object 
or a non-negative number")
 
     def _validate_input_values(self) -> None:
         if not isinstance(self.poke_interval, (int, float)) or 
self.poke_interval < 0:
-            raise AirflowException("The poke_interval must be a non-negative 
number")
+            raise RuntimeError("The poke_interval must be a non-negative 
number")

Review Comment:
   ```suggestion
               raise ValueError("The poke_interval must be a non-negative 
number")
   ```



##########
task-sdk/src/airflow/sdk/bases/sensor.py:
##########
@@ -145,40 +145,38 @@ def _coerce_poke_interval(poke_interval: float | 
timedelta) -> timedelta:
             return poke_interval
         if isinstance(poke_interval, (int, float)) and poke_interval >= 0:
             return timedelta(seconds=poke_interval)
-        raise AirflowException(
-            "Operator arg `poke_interval` must be timedelta object or a 
non-negative number"
-        )
+        raise RuntimeError("Operator arg `poke_interval` must be timedelta 
object or a non-negative number")
 
     @staticmethod
     def _coerce_timeout(timeout: float | timedelta) -> timedelta:
         if isinstance(timeout, timedelta):
             return timeout
         if isinstance(timeout, (int, float)) and timeout >= 0:
             return timedelta(seconds=timeout)
-        raise AirflowException("Operator arg `timeout` must be timedelta 
object or a non-negative number")
+        raise RuntimeError("Operator arg `timeout` must be timedelta object or 
a non-negative number")
 
     @staticmethod
     def _coerce_max_wait(max_wait: float | timedelta | None) -> timedelta | 
None:
         if max_wait is None or isinstance(max_wait, timedelta):
             return max_wait
         if isinstance(max_wait, (int, float)) and max_wait >= 0:
             return timedelta(seconds=max_wait)
-        raise AirflowException("Operator arg `max_wait` must be timedelta 
object or a non-negative number")
+        raise RuntimeError("Operator arg `max_wait` must be timedelta object 
or a non-negative number")
 
     def _validate_input_values(self) -> None:
         if not isinstance(self.poke_interval, (int, float)) or 
self.poke_interval < 0:
-            raise AirflowException("The poke_interval must be a non-negative 
number")
+            raise RuntimeError("The poke_interval must be a non-negative 
number")
         if not isinstance(self.timeout, (int, float)) or self.timeout < 0:
-            raise AirflowException("The timeout must be a non-negative number")
+            raise RuntimeError("The timeout must be a non-negative number")

Review Comment:
   ```suggestion
               raise ValueError("The timeout must be a non-negative number")
   ```



##########
task-sdk/src/airflow/sdk/exceptions.py:
##########
@@ -18,15 +18,32 @@
 from __future__ import annotations
 
 import enum
+from http import HTTPStatus
 from typing import TYPE_CHECKING, Any
 
-from airflow.exceptions import AirflowException
 from airflow.sdk import TriggerRule
 
 if TYPE_CHECKING:
+    from collections.abc import Collection
+
+    from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, 
AssetUriRef
     from airflow.sdk.execution_time.comms import ErrorResponse
 
 
+class AirflowException(Exception):

Review Comment:
   I wonder if this would be easier for future compat if we made this? Dunno
   
   ```suggestion
   class AirflowException(RuntimeError):
   ```



##########
providers/standard/src/airflow/providers/standard/example_dags/example_bash_decorator.py:
##########
@@ -19,12 +19,16 @@
 
 import pendulum
 
-from airflow.exceptions import AirflowSkipException
 from airflow.providers.common.compat.sdk import TriggerRule
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.providers.standard.utils.weekday import WeekDay
 from airflow.sdk import chain, dag, task
 
+try:
+    from airflow.sdk.exceptions import AirflowSkipException
+except ImportError:
+    from airflow.exceptions import AirflowSkipException  # type: 
ignore[no-redef,attr-defined]

Review Comment:
   I don't think we need compat in example dags -- on L25 we unconditionally 
import from airflow.sdk



##########
task-sdk/src/airflow/sdk/bases/sensor.py:
##########
@@ -145,40 +145,38 @@ def _coerce_poke_interval(poke_interval: float | 
timedelta) -> timedelta:
             return poke_interval
         if isinstance(poke_interval, (int, float)) and poke_interval >= 0:
             return timedelta(seconds=poke_interval)
-        raise AirflowException(
-            "Operator arg `poke_interval` must be timedelta object or a 
non-negative number"
-        )
+        raise RuntimeError("Operator arg `poke_interval` must be timedelta 
object or a non-negative number")
 
     @staticmethod
     def _coerce_timeout(timeout: float | timedelta) -> timedelta:
         if isinstance(timeout, timedelta):
             return timeout
         if isinstance(timeout, (int, float)) and timeout >= 0:
             return timedelta(seconds=timeout)
-        raise AirflowException("Operator arg `timeout` must be timedelta 
object or a non-negative number")
+        raise RuntimeError("Operator arg `timeout` must be timedelta object or 
a non-negative number")
 
     @staticmethod
     def _coerce_max_wait(max_wait: float | timedelta | None) -> timedelta | 
None:
         if max_wait is None or isinstance(max_wait, timedelta):
             return max_wait
         if isinstance(max_wait, (int, float)) and max_wait >= 0:
             return timedelta(seconds=max_wait)
-        raise AirflowException("Operator arg `max_wait` must be timedelta 
object or a non-negative number")
+        raise RuntimeError("Operator arg `max_wait` must be timedelta object 
or a non-negative number")

Review Comment:
   ```suggestion
           raise ValueError("Operator arg `max_wait` must be timedelta object 
or a non-negative number")
   ```



##########
task-sdk/src/airflow/sdk/bases/sensor.py:
##########
@@ -145,40 +145,38 @@ def _coerce_poke_interval(poke_interval: float | 
timedelta) -> timedelta:
             return poke_interval
         if isinstance(poke_interval, (int, float)) and poke_interval >= 0:
             return timedelta(seconds=poke_interval)
-        raise AirflowException(
-            "Operator arg `poke_interval` must be timedelta object or a 
non-negative number"
-        )
+        raise RuntimeError("Operator arg `poke_interval` must be timedelta 
object or a non-negative number")
 
     @staticmethod
     def _coerce_timeout(timeout: float | timedelta) -> timedelta:
         if isinstance(timeout, timedelta):
             return timeout
         if isinstance(timeout, (int, float)) and timeout >= 0:
             return timedelta(seconds=timeout)
-        raise AirflowException("Operator arg `timeout` must be timedelta 
object or a non-negative number")
+        raise RuntimeError("Operator arg `timeout` must be timedelta object or 
a non-negative number")

Review Comment:
   ```suggestion
           raise ValueError("Operator arg `timeout` must be timedelta object or 
a non-negative number")
   ```



##########
task-sdk/src/airflow/sdk/bases/sensor.py:
##########
@@ -145,40 +145,38 @@ def _coerce_poke_interval(poke_interval: float | 
timedelta) -> timedelta:
             return poke_interval
         if isinstance(poke_interval, (int, float)) and poke_interval >= 0:
             return timedelta(seconds=poke_interval)
-        raise AirflowException(
-            "Operator arg `poke_interval` must be timedelta object or a 
non-negative number"
-        )
+        raise RuntimeError("Operator arg `poke_interval` must be timedelta 
object or a non-negative number")

Review Comment:
   If we are changing it:
   
   ```suggestion
           raise ValueError("Operator arg `poke_interval` must be timedelta 
object or a non-negative number")
   ```
   
   Or maybe a `TypeError`, but it's not 100% worth us separating TypeError 
(wrong type) from ValueError (negative number)
   



##########
providers/standard/tests/unit/standard/operators/test_datetime.py:
##########
@@ -112,7 +112,7 @@ def test_no_target_time(self):
         """Check if BranchDateTimeOperator raises exception on missing 
target"""
         with pytest.raises(AirflowException):
             BranchDateTimeOperator(
-                task_id="datetime_branch",
+                task_id="datetime_branch_2",

Review Comment:
   Unrelated change?



##########
task-sdk/src/airflow/sdk/definitions/operator_resources.py:
##########
@@ -41,7 +40,7 @@ class Resource:
 
     def __init__(self, name, units_str, qty):
         if qty < 0:
-            raise AirflowException(
+            raise RuntimeError(

Review Comment:
   ```suggestion
               raise ValueError(
   ```



##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -355,15 +353,13 @@ def __eq__(self, other):
         return True
 
     def get(self, conn_id: str, default_conn: Any = None) -> Any:
-        from airflow.exceptions import AirflowNotFoundException
-
         try:
             return _get_connection(conn_id)
         except AirflowRuntimeError as e:
             if e.error.error == ErrorType.CONNECTION_NOT_FOUND:
                 return default_conn
             raise
-        except AirflowNotFoundException:
+        except RuntimeError:

Review Comment:
   I wonder if this is too broad an exception, and if we need to keep a 
dedicated exception class to throw.
   
   The thing I worry about here is the API request throwing with a 500 error, 
and instead of that error propagating up, it might end up falling back to the 
default conn, which I don't think is the right behaviour.



##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1118,8 +1122,8 @@ def _handle_trigger_dag_run(
     ti.xcom_push(key="trigger_run_id", value=drte.dag_run_id)
 
     if drte.deferrable:
-        from airflow.exceptions import TaskDeferred
         from airflow.providers.standard.triggers.external_task import 
DagStateTrigger
+        from airflow.sdk.exceptions import TaskDeferred

Review Comment:
   This can be top level now since it's in the same module.



##########
task-sdk/src/airflow/sdk/bases/sensor.py:
##########
@@ -145,40 +145,38 @@ def _coerce_poke_interval(poke_interval: float | 
timedelta) -> timedelta:
             return poke_interval
         if isinstance(poke_interval, (int, float)) and poke_interval >= 0:
             return timedelta(seconds=poke_interval)
-        raise AirflowException(
-            "Operator arg `poke_interval` must be timedelta object or a 
non-negative number"
-        )
+        raise RuntimeError("Operator arg `poke_interval` must be timedelta 
object or a non-negative number")
 
     @staticmethod
     def _coerce_timeout(timeout: float | timedelta) -> timedelta:
         if isinstance(timeout, timedelta):
             return timeout
         if isinstance(timeout, (int, float)) and timeout >= 0:
             return timedelta(seconds=timeout)
-        raise AirflowException("Operator arg `timeout` must be timedelta 
object or a non-negative number")
+        raise RuntimeError("Operator arg `timeout` must be timedelta object or 
a non-negative number")
 
     @staticmethod
     def _coerce_max_wait(max_wait: float | timedelta | None) -> timedelta | 
None:
         if max_wait is None or isinstance(max_wait, timedelta):
             return max_wait
         if isinstance(max_wait, (int, float)) and max_wait >= 0:
             return timedelta(seconds=max_wait)
-        raise AirflowException("Operator arg `max_wait` must be timedelta 
object or a non-negative number")
+        raise RuntimeError("Operator arg `max_wait` must be timedelta object 
or a non-negative number")
 
     def _validate_input_values(self) -> None:
         if not isinstance(self.poke_interval, (int, float)) or 
self.poke_interval < 0:
-            raise AirflowException("The poke_interval must be a non-negative 
number")
+            raise RuntimeError("The poke_interval must be a non-negative 
number")
         if not isinstance(self.timeout, (int, float)) or self.timeout < 0:
-            raise AirflowException("The timeout must be a non-negative number")
+            raise RuntimeError("The timeout must be a non-negative number")
         if self.mode not in self.valid_modes:
-            raise AirflowException(
+            raise RuntimeError(

Review Comment:
   ```suggestion
               raise ValueError(
   ```



##########
task-sdk/tests/task_sdk/bases/test_hook.py:
##########
@@ -85,7 +84,7 @@ def test_get_connection_not_found(self, 
sdk_connection_not_found):
         conn_id = "test_conn"
         hook = BaseHook()
 
-        with pytest.raises(AirflowNotFoundException, match="The conn_id 
`test_conn` isn't defined"):
+        with pytest.raises(RuntimeError, match="The conn_id `test_conn` isn't 
defined"):

Review Comment:
   See previous comment about Runtime vs dedicated not found error.



##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -995,7 +999,7 @@ def _on_term(signum, frame):
         )
         state = TaskInstanceState.FAILED
         error = e
-    except (AirflowTaskTimeout, AirflowException, AirflowRuntimeError) as e:
+    except (AirflowTaskTimeout, AirflowException, AirflowRuntimeError, 
RuntimeError) as e:

Review Comment:
   If we re-order this to after the `AirflowTaskTerminated` case, couldn't this 
be simplified to `except Exception`.
   
   And now I look at this, I don't think this case is needed at all, and the 
`except BaseException as e:` we already have is enough?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to