amoghrajesh commented on code in PR #54505:
URL: https://github.com/apache/airflow/pull/54505#discussion_r2556251493
##########
airflow-core/src/airflow/exceptions.py:
##########
@@ -518,23 +303,43 @@ def __init__(self, dag_id: str | None = None, message:
str | None = None):
super().__init__(f"An unexpected error occurred while trying to
deserialize Dag '{dag_id}'")
+class AirflowClearRunningTaskException(AirflowException):
+ """Raise when the user attempts to clear currently running tasks."""
+
+
+_DEPRECATED_EXCEPTIONS = {
+ "AirflowTaskTerminated": "airflow.sdk.exceptions.AirflowTaskTerminated",
+ "DuplicateTaskIdFound": "airflow.sdk.exceptions.DuplicateTaskIdFound",
+ "FailFastDagInvalidTriggerRule":
"airflow.sdk.exceptions.FailFastDagInvalidTriggerRule",
+ "TaskAlreadyInTaskGroup": "airflow.sdk.exceptions.TaskAlreadyInTaskGroup",
+ "TaskDeferralTimeout": "airflow.sdk.exceptions.TaskDeferralTimeout",
+ "XComNotFound": "airflow.sdk.exceptions.XComNotFound",
+ "DownstreamTasksSkipped": "airflow.sdk.exceptions.DownstreamTasksSkipped",
+ "AirflowSensorTimeout": "airflow.sdk.exceptions.AirflowSensorTimeout",
+ "DagRunTriggerException": "airflow.sdk.exceptions.DagRunTriggerException",
+ "TaskDeferralError": "airflow.sdk.exceptions.TaskDeferralError",
+ "AirflowDagCycleException":
"airflow.sdk.exceptions.AirflowDagCycleException",
+ "AirflowInactiveAssetInInletOrOutletException":
"airflow.sdk.exceptions.AirflowInactiveAssetInInletOrOutletException",
+ "AirflowSkipException": "airflow.sdk.exceptions.AirflowSkipException",
+ "AirflowTaskTimeout": "airflow.sdk.exceptions.AirflowTaskTimeout",
+ "AirflowFailException": "airflow.sdk.exceptions.AirflowFailException",
+ "ParamValidationError": "airflow.sdk.exceptions.ParamValidationError",
+ "TaskDeferred": "airflow.sdk.exceptions.TaskDeferred",
+}
+
+
def __getattr__(name: str):
"""Provide backward compatibility for moved exceptions."""
- if name == "AirflowDagCycleException":
+ if name in _DEPRECATED_EXCEPTIONS:
import warnings
- from airflow.sdk.exceptions import AirflowDagCycleException
+ from airflow.utils.module_loading import import_string
+ target_path = _DEPRECATED_EXCEPTIONS[name]
warnings.warn(
- "airflow.exceptions.AirflowDagCycleException is deprecated. "
- "Use airflow.sdk.exceptions.AirflowDagCycleException instead.",
+ f"airflow.exceptions.{name} is deprecated. Use {target_path}
instead.",
DeprecationWarning,
stacklevel=2,
)
- return AirflowDagCycleException
-
+ return import_string(target_path)
Review Comment:
Looks far better now!
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py:
##########
@@ -16,9 +16,7 @@
# under the License.
from __future__ import annotations
-from airflow.exceptions import (
- AirflowException,
-)
+from airflow.exceptions import AirflowException
Review Comment:
I left a comment about this earlier, but it can be done in a follow-up, no
need to do in this PR.
##########
task-sdk/src/airflow/sdk/exceptions.py:
##########
@@ -18,15 +18,38 @@
from __future__ import annotations
import enum
+from http import HTTPStatus
from typing import TYPE_CHECKING, Any
-from airflow.exceptions import AirflowException
from airflow.sdk import TriggerRule
if TYPE_CHECKING:
+ from collections.abc import Collection
+
+ from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey,
AssetUriRef
from airflow.sdk.execution_time.comms import ErrorResponse
+class AirflowException(RuntimeError):
Review Comment:
I have just one q regarding the backcompat here now. If there's a custom
provider code like this one:
```python
Python 3.13.3 (main, Apr 8 2025, 13:54:08) [Clang 17.0.0
(clang-1700.0.13.3)] on darwin
class AirflowExceptionOld(Exception):
pass
def submit_to_backend():
raise AirflowException("Invalid job config")
def run_job():
try:
submit_to_backend()
except RuntimeError:
return "retry"
except AirflowException:
return "validation_failed"
AirflowException = AirflowExceptionOld
run_job()
Out[6]: 'validation_failed'
class AirflowExceptionNew(RuntimeError):
pass
AirflowException = AirflowExceptionNew
run_job()
Out[9]: 'retry'
```
Previously this would have executed a different branch but now it executes a
different branch
##########
task-sdk/src/airflow/sdk/definitions/operator_resources.py:
##########
@@ -41,7 +40,7 @@ class Resource:
def __init__(self, name, units_str, qty):
if qty < 0:
- raise AirflowException(
+ raise RuntimeError(
Review Comment:
Shouldn't this be a ValueError?
##########
task-sdk/src/airflow/sdk/definitions/taskgroup.py:
##########
@@ -254,7 +253,7 @@ def add(self, task: DAGNode) -> DAGNode:
)
task.dag = self.dag
if task.children:
- raise AirflowException("Cannot add a non-empty TaskGroup")
+ raise RuntimeError("Cannot add a non-empty TaskGroup")
Review Comment:
Should this be ValueError
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]