ashb commented on code in PR #54505:
URL: https://github.com/apache/airflow/pull/54505#discussion_r2472656186
##########
airflow-core/src/airflow/exceptions.py:
##########
@@ -21,29 +21,26 @@
from __future__ import annotations
-from collections.abc import Collection, Sequence
-from datetime import datetime, timedelta
from http import HTTPStatus
-from typing import TYPE_CHECKING, Any, NamedTuple
+from typing import TYPE_CHECKING, NamedTuple
if TYPE_CHECKING:
from airflow.models import DagRun
- from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey,
AssetUriRef
- from airflow.utils.state import DagRunState
+try:
+ from airflow.sdk.exceptions import AirflowException
+except ModuleNotFoundError:
+ # The shared libraries are unable to see the 'sdk' package, so redefine
here
Review Comment:
I don't quite get what this comment means?
##########
providers/common/sql/tests/unit/common/sql/operators/test_sql.py:
##########
@@ -59,7 +59,11 @@
from tests_common.test_utils.db import clear_db_dag_bundles, clear_db_dags,
clear_db_runs, clear_db_xcom
from tests_common.test_utils.markers import
skip_if_force_lowest_dependencies_marker
from tests_common.test_utils.providers import get_provider_min_airflow_version
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_1,
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import (
+ AIRFLOW_V_3_0_1,
+ AIRFLOW_V_3_0_PLUS,
+ AirflowFailException,
Review Comment:
Ditto, `AirflowException` from this module? 🤷🏻
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py:
##########
@@ -81,13 +76,18 @@
PodNotFoundException,
PodPhase,
)
-from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_1_PLUS
+from airflow.providers.cncf.kubernetes.version_compat import (
+ AIRFLOW_V_3_1_PLUS,
+ AirflowSkipException,
+ TaskDeferred,
+)
from airflow.providers.common.compat.sdk import XCOM_RETURN_KEY
if AIRFLOW_V_3_1_PLUS:
from airflow.sdk import BaseOperator
else:
from airflow.models import BaseOperator
+from airflow.exceptions import AirflowException
Review Comment:
Shouldn't this be in `airflow.providers.cncf.kubernetes.version_compat` too?
##########
task-sdk/src/airflow/sdk/bases/sensor.py:
##########
@@ -145,40 +145,38 @@ def _coerce_poke_interval(poke_interval: float |
timedelta) -> timedelta:
return poke_interval
if isinstance(poke_interval, (int, float)) and poke_interval >= 0:
return timedelta(seconds=poke_interval)
- raise AirflowException(
- "Operator arg `poke_interval` must be timedelta object or a
non-negative number"
- )
+ raise RuntimeError("Operator arg `poke_interval` must be timedelta
object or a non-negative number")
@staticmethod
def _coerce_timeout(timeout: float | timedelta) -> timedelta:
if isinstance(timeout, timedelta):
return timeout
if isinstance(timeout, (int, float)) and timeout >= 0:
return timedelta(seconds=timeout)
- raise AirflowException("Operator arg `timeout` must be timedelta
object or a non-negative number")
+ raise RuntimeError("Operator arg `timeout` must be timedelta object or
a non-negative number")
@staticmethod
def _coerce_max_wait(max_wait: float | timedelta | None) -> timedelta |
None:
if max_wait is None or isinstance(max_wait, timedelta):
return max_wait
if isinstance(max_wait, (int, float)) and max_wait >= 0:
return timedelta(seconds=max_wait)
- raise AirflowException("Operator arg `max_wait` must be timedelta
object or a non-negative number")
+ raise RuntimeError("Operator arg `max_wait` must be timedelta object
or a non-negative number")
def _validate_input_values(self) -> None:
if not isinstance(self.poke_interval, (int, float)) or
self.poke_interval < 0:
- raise AirflowException("The poke_interval must be a non-negative
number")
+ raise RuntimeError("The poke_interval must be a non-negative
number")
Review Comment:
```suggestion
raise ValueError("The poke_interval must be a non-negative
number")
```
##########
task-sdk/src/airflow/sdk/bases/sensor.py:
##########
@@ -145,40 +145,38 @@ def _coerce_poke_interval(poke_interval: float |
timedelta) -> timedelta:
return poke_interval
if isinstance(poke_interval, (int, float)) and poke_interval >= 0:
return timedelta(seconds=poke_interval)
- raise AirflowException(
- "Operator arg `poke_interval` must be timedelta object or a
non-negative number"
- )
+ raise RuntimeError("Operator arg `poke_interval` must be timedelta
object or a non-negative number")
@staticmethod
def _coerce_timeout(timeout: float | timedelta) -> timedelta:
if isinstance(timeout, timedelta):
return timeout
if isinstance(timeout, (int, float)) and timeout >= 0:
return timedelta(seconds=timeout)
- raise AirflowException("Operator arg `timeout` must be timedelta
object or a non-negative number")
+ raise RuntimeError("Operator arg `timeout` must be timedelta object or
a non-negative number")
@staticmethod
def _coerce_max_wait(max_wait: float | timedelta | None) -> timedelta |
None:
if max_wait is None or isinstance(max_wait, timedelta):
return max_wait
if isinstance(max_wait, (int, float)) and max_wait >= 0:
return timedelta(seconds=max_wait)
- raise AirflowException("Operator arg `max_wait` must be timedelta
object or a non-negative number")
+ raise RuntimeError("Operator arg `max_wait` must be timedelta object
or a non-negative number")
def _validate_input_values(self) -> None:
if not isinstance(self.poke_interval, (int, float)) or
self.poke_interval < 0:
- raise AirflowException("The poke_interval must be a non-negative
number")
+ raise RuntimeError("The poke_interval must be a non-negative
number")
if not isinstance(self.timeout, (int, float)) or self.timeout < 0:
- raise AirflowException("The timeout must be a non-negative number")
+ raise RuntimeError("The timeout must be a non-negative number")
Review Comment:
```suggestion
raise ValueError("The timeout must be a non-negative number")
```
##########
task-sdk/src/airflow/sdk/exceptions.py:
##########
@@ -18,15 +18,32 @@
from __future__ import annotations
import enum
+from http import HTTPStatus
from typing import TYPE_CHECKING, Any
-from airflow.exceptions import AirflowException
from airflow.sdk import TriggerRule
if TYPE_CHECKING:
+ from collections.abc import Collection
+
+ from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey,
AssetUriRef
from airflow.sdk.execution_time.comms import ErrorResponse
+class AirflowException(Exception):
Review Comment:
I wonder if this would be easier for future compat if we made this? Dunno
```suggestion
class AirflowException(RuntimeError):
```
##########
providers/standard/src/airflow/providers/standard/example_dags/example_bash_decorator.py:
##########
@@ -19,12 +19,16 @@
import pendulum
-from airflow.exceptions import AirflowSkipException
from airflow.providers.common.compat.sdk import TriggerRule
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.utils.weekday import WeekDay
from airflow.sdk import chain, dag, task
+try:
+ from airflow.sdk.exceptions import AirflowSkipException
+except ImportError:
+ from airflow.exceptions import AirflowSkipException # type:
ignore[no-redef,attr-defined]
Review Comment:
I don't think we need compat in example dags -- on L25 we unconditionally
import from airflow.sdk
##########
task-sdk/src/airflow/sdk/bases/sensor.py:
##########
@@ -145,40 +145,38 @@ def _coerce_poke_interval(poke_interval: float |
timedelta) -> timedelta:
return poke_interval
if isinstance(poke_interval, (int, float)) and poke_interval >= 0:
return timedelta(seconds=poke_interval)
- raise AirflowException(
- "Operator arg `poke_interval` must be timedelta object or a
non-negative number"
- )
+ raise RuntimeError("Operator arg `poke_interval` must be timedelta
object or a non-negative number")
@staticmethod
def _coerce_timeout(timeout: float | timedelta) -> timedelta:
if isinstance(timeout, timedelta):
return timeout
if isinstance(timeout, (int, float)) and timeout >= 0:
return timedelta(seconds=timeout)
- raise AirflowException("Operator arg `timeout` must be timedelta
object or a non-negative number")
+ raise RuntimeError("Operator arg `timeout` must be timedelta object or
a non-negative number")
@staticmethod
def _coerce_max_wait(max_wait: float | timedelta | None) -> timedelta |
None:
if max_wait is None or isinstance(max_wait, timedelta):
return max_wait
if isinstance(max_wait, (int, float)) and max_wait >= 0:
return timedelta(seconds=max_wait)
- raise AirflowException("Operator arg `max_wait` must be timedelta
object or a non-negative number")
+ raise RuntimeError("Operator arg `max_wait` must be timedelta object
or a non-negative number")
Review Comment:
```suggestion
raise ValueError("Operator arg `max_wait` must be timedelta object
or a non-negative number")
```
##########
task-sdk/src/airflow/sdk/bases/sensor.py:
##########
@@ -145,40 +145,38 @@ def _coerce_poke_interval(poke_interval: float |
timedelta) -> timedelta:
return poke_interval
if isinstance(poke_interval, (int, float)) and poke_interval >= 0:
return timedelta(seconds=poke_interval)
- raise AirflowException(
- "Operator arg `poke_interval` must be timedelta object or a
non-negative number"
- )
+ raise RuntimeError("Operator arg `poke_interval` must be timedelta
object or a non-negative number")
@staticmethod
def _coerce_timeout(timeout: float | timedelta) -> timedelta:
if isinstance(timeout, timedelta):
return timeout
if isinstance(timeout, (int, float)) and timeout >= 0:
return timedelta(seconds=timeout)
- raise AirflowException("Operator arg `timeout` must be timedelta
object or a non-negative number")
+ raise RuntimeError("Operator arg `timeout` must be timedelta object or
a non-negative number")
Review Comment:
```suggestion
raise ValueError("Operator arg `timeout` must be timedelta object or
a non-negative number")
```
##########
task-sdk/src/airflow/sdk/bases/sensor.py:
##########
@@ -145,40 +145,38 @@ def _coerce_poke_interval(poke_interval: float |
timedelta) -> timedelta:
return poke_interval
if isinstance(poke_interval, (int, float)) and poke_interval >= 0:
return timedelta(seconds=poke_interval)
- raise AirflowException(
- "Operator arg `poke_interval` must be timedelta object or a
non-negative number"
- )
+ raise RuntimeError("Operator arg `poke_interval` must be timedelta
object or a non-negative number")
Review Comment:
If we are changing it:
```suggestion
raise ValueError("Operator arg `poke_interval` must be timedelta
object or a non-negative number")
```
Or maybe a `TypeError`, but it's not 100% worth us separating TypeError
(wrong type) from ValueError (negative number)
##########
providers/standard/tests/unit/standard/operators/test_datetime.py:
##########
@@ -112,7 +112,7 @@ def test_no_target_time(self):
"""Check if BranchDateTimeOperator raises exception on missing
target"""
with pytest.raises(AirflowException):
BranchDateTimeOperator(
- task_id="datetime_branch",
+ task_id="datetime_branch_2",
Review Comment:
Unrelated change?
##########
task-sdk/src/airflow/sdk/definitions/operator_resources.py:
##########
@@ -41,7 +40,7 @@ class Resource:
def __init__(self, name, units_str, qty):
if qty < 0:
- raise AirflowException(
+ raise RuntimeError(
Review Comment:
```suggestion
raise ValueError(
```
##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -355,15 +353,13 @@ def __eq__(self, other):
return True
def get(self, conn_id: str, default_conn: Any = None) -> Any:
- from airflow.exceptions import AirflowNotFoundException
-
try:
return _get_connection(conn_id)
except AirflowRuntimeError as e:
if e.error.error == ErrorType.CONNECTION_NOT_FOUND:
return default_conn
raise
- except AirflowNotFoundException:
+ except RuntimeError:
Review Comment:
I wonder if this is too broad an exception, and if we need to keep a
dedicated exception class to throw.
The thing I worry about here is the API request throwing with a 500 error,
and instead of that error propagating up, it might end up falling back to the
default conn, which I don't think is the right behaviour.
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1118,8 +1122,8 @@ def _handle_trigger_dag_run(
ti.xcom_push(key="trigger_run_id", value=drte.dag_run_id)
if drte.deferrable:
- from airflow.exceptions import TaskDeferred
from airflow.providers.standard.triggers.external_task import
DagStateTrigger
+ from airflow.sdk.exceptions import TaskDeferred
Review Comment:
This can be top level now since it's in the same module.
##########
task-sdk/src/airflow/sdk/bases/sensor.py:
##########
@@ -145,40 +145,38 @@ def _coerce_poke_interval(poke_interval: float |
timedelta) -> timedelta:
return poke_interval
if isinstance(poke_interval, (int, float)) and poke_interval >= 0:
return timedelta(seconds=poke_interval)
- raise AirflowException(
- "Operator arg `poke_interval` must be timedelta object or a
non-negative number"
- )
+ raise RuntimeError("Operator arg `poke_interval` must be timedelta
object or a non-negative number")
@staticmethod
def _coerce_timeout(timeout: float | timedelta) -> timedelta:
if isinstance(timeout, timedelta):
return timeout
if isinstance(timeout, (int, float)) and timeout >= 0:
return timedelta(seconds=timeout)
- raise AirflowException("Operator arg `timeout` must be timedelta
object or a non-negative number")
+ raise RuntimeError("Operator arg `timeout` must be timedelta object or
a non-negative number")
@staticmethod
def _coerce_max_wait(max_wait: float | timedelta | None) -> timedelta |
None:
if max_wait is None or isinstance(max_wait, timedelta):
return max_wait
if isinstance(max_wait, (int, float)) and max_wait >= 0:
return timedelta(seconds=max_wait)
- raise AirflowException("Operator arg `max_wait` must be timedelta
object or a non-negative number")
+ raise RuntimeError("Operator arg `max_wait` must be timedelta object
or a non-negative number")
def _validate_input_values(self) -> None:
if not isinstance(self.poke_interval, (int, float)) or
self.poke_interval < 0:
- raise AirflowException("The poke_interval must be a non-negative
number")
+ raise RuntimeError("The poke_interval must be a non-negative
number")
if not isinstance(self.timeout, (int, float)) or self.timeout < 0:
- raise AirflowException("The timeout must be a non-negative number")
+ raise RuntimeError("The timeout must be a non-negative number")
if self.mode not in self.valid_modes:
- raise AirflowException(
+ raise RuntimeError(
Review Comment:
```suggestion
raise ValueError(
```
##########
task-sdk/tests/task_sdk/bases/test_hook.py:
##########
@@ -85,7 +84,7 @@ def test_get_connection_not_found(self,
sdk_connection_not_found):
conn_id = "test_conn"
hook = BaseHook()
- with pytest.raises(AirflowNotFoundException, match="The conn_id
`test_conn` isn't defined"):
+ with pytest.raises(RuntimeError, match="The conn_id `test_conn` isn't
defined"):
Review Comment:
See previous comment about Runtime vs dedicated not found error.
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -995,7 +999,7 @@ def _on_term(signum, frame):
)
state = TaskInstanceState.FAILED
error = e
- except (AirflowTaskTimeout, AirflowException, AirflowRuntimeError) as e:
+ except (AirflowTaskTimeout, AirflowException, AirflowRuntimeError,
RuntimeError) as e:
Review Comment:
If we re-order this to after the `AirflowTaskTerminated` case, couldn't this
be simplified to `except Exception`.
And now I look at this, I don't think this case is needed at all, and the
`except BaseException as e:` we already have is enough?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]