amoghrajesh commented on code in PR #54505:
URL: https://github.com/apache/airflow/pull/54505#discussion_r2558864280


##########
task-sdk/src/airflow/sdk/definitions/param.py:
##########
@@ -297,7 +297,7 @@ def resolve(self, context: Context) -> Any:
             return self._default
         with contextlib.suppress(KeyError):
             return context["params"][self._name]
-        raise AirflowException(f"No value could be resolved for parameter 
{self._name}")
+        raise RuntimeError(f"No value could be resolved for parameter 
{self._name}")

Review Comment:
   This seems risky in terms of compat, I can imagine someone doing a 
`Param.resolve` -> earlier it used to throw AirflowException, now it is 
`RuntimeError`, their catch code will fail



##########
airflow-core/tests/unit/dags/test_on_failure_callback.py:
##########
@@ -19,10 +19,10 @@
 import os
 from datetime import datetime
 
-from airflow.exceptions import AirflowFailException
 from airflow.models.dag import DAG
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.providers.standard.operators.python import PythonOperator
+from airflow.sdk.exceptions import AirflowFailException

Review Comment:
   Same here



##########
task-sdk/src/airflow/sdk/definitions/taskgroup.py:
##########
@@ -247,14 +246,14 @@ def add(self, task: DAGNode) -> DAGNode:
         if isinstance(task, TaskGroup):
             if self.dag:
                 if task.dag is not None and self.dag is not task.dag:
-                    raise RuntimeError(
+                    raise ValueError(
                         "Cannot mix TaskGroups from different Dags: %s and %s",
                         self.dag,
                         task.dag,
                     )
                 task.dag = self.dag
             if task.children:
-                raise AirflowException("Cannot add a non-empty TaskGroup")
+                raise ValueError("Cannot add a non-empty TaskGroup")

Review Comment:
   Same with this one? I can imagine someone doing this:
   ```python
       with DAG("test_dag", schedule=None, 
start_date=pendulum.parse("20200101")):
           with TaskGroup("section_with_tasks") as tg_with_children:
               task1 = EmptyOperator(task_id="task1")
   
           parent_tg = TaskGroup("parent_section")
           parent_tg.add(tg_with_children)
   ```
   
   This would have thrown AirflowException earlier, now its ValueError



##########
task-sdk/src/airflow/sdk/definitions/operator_resources.py:
##########
@@ -41,7 +40,7 @@ class Resource:
 
     def __init__(self, name, units_str, qty):
         if qty < 0:
-            raise AirflowException(
+            raise ValueError(

Review Comment:
   Since this one is public facing too, maybe we should consider compat here or 
document it well in a newsfragment: ```python
   Resource(name="res1", qty=-1, units_str="MB")
   Traceback (most recent call last):
     File 
"/Users/amoghdesai/Documents/OSS/repos/airflow/.venv/lib/python3.13/site-packages/IPython/core/interactiveshell.py",
 line 3699, in run_code
       exec(code_obj, self.user_global_ns, self.user_ns)
       ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "<ipython-input-6-eef0e426632d>", line 1, in <module>
       Resource(name="res1", qty=-1, units_str="MB")
       ~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/Users/amoghdesai/Documents/OSS/repos/airflow/task-sdk/src/airflow/sdk/definitions/operator_resources.py",
 line 43, in __init__
       raise ValueError(
       ...<2 lines>...
       )
   ValueError: Received resource quantity -1 for resource res1, but resource 
quantity must be non-negative.
   ```
   
   Earlier it would have been AirflowException



##########
airflow-core/tests/unit/dags/test_assets.py:
##########
@@ -19,11 +19,11 @@
 
 from datetime import datetime
 
-from airflow.exceptions import AirflowFailException, AirflowSkipException
 from airflow.models.dag import DAG
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.providers.standard.operators.python import PythonOperator
 from airflow.sdk.definitions.asset import Asset
+from airflow.sdk.exceptions import AirflowFailException, AirflowSkipException

Review Comment:
   We should not be importing from airflow sdk in core, even in tests if 
possible. Since a compat shim is present, we should just import from core
   ```
   from airflow.exceptions import AirflowFailException
   <ipython-input-3-b7c9ca91b112>:1 DeprecationWarning: 
airflow.exceptions.AirflowFailException is deprecated. Use 
airflow.sdk.exceptions.AirflowFailException instead.
   ```



##########
airflow-core/src/airflow/exceptions.py:
##########
@@ -518,23 +303,43 @@ def __init__(self, dag_id: str | None = None, message: 
str | None = None):
             super().__init__(f"An unexpected error occurred while trying to 
deserialize Dag '{dag_id}'")
 
 
+class AirflowClearRunningTaskException(AirflowException):
+    """Raise when the user attempts to clear currently running tasks."""
+
+
+_DEPRECATED_EXCEPTIONS = {
+    "AirflowTaskTerminated": "airflow.sdk.exceptions.AirflowTaskTerminated",
+    "DuplicateTaskIdFound": "airflow.sdk.exceptions.DuplicateTaskIdFound",
+    "FailFastDagInvalidTriggerRule": 
"airflow.sdk.exceptions.FailFastDagInvalidTriggerRule",
+    "TaskAlreadyInTaskGroup": "airflow.sdk.exceptions.TaskAlreadyInTaskGroup",
+    "TaskDeferralTimeout": "airflow.sdk.exceptions.TaskDeferralTimeout",
+    "XComNotFound": "airflow.sdk.exceptions.XComNotFound",
+    "DownstreamTasksSkipped": "airflow.sdk.exceptions.DownstreamTasksSkipped",
+    "AirflowSensorTimeout": "airflow.sdk.exceptions.AirflowSensorTimeout",
+    "DagRunTriggerException": "airflow.sdk.exceptions.DagRunTriggerException",
+    "TaskDeferralError": "airflow.sdk.exceptions.TaskDeferralError",
+    "AirflowDagCycleException": 
"airflow.sdk.exceptions.AirflowDagCycleException",
+    "AirflowInactiveAssetInInletOrOutletException": 
"airflow.sdk.exceptions.AirflowInactiveAssetInInletOrOutletException",
+    "AirflowSkipException": "airflow.sdk.exceptions.AirflowSkipException",
+    "AirflowTaskTimeout": "airflow.sdk.exceptions.AirflowTaskTimeout",
+    "AirflowFailException": "airflow.sdk.exceptions.AirflowFailException",
+    "ParamValidationError": "airflow.sdk.exceptions.ParamValidationError",
+    "TaskDeferred": "airflow.sdk.exceptions.TaskDeferred",
+}
+
+
 def __getattr__(name: str):
     """Provide backward compatibility for moved exceptions."""
-    if name == "AirflowDagCycleException":
+    if name in _DEPRECATED_EXCEPTIONS:
         import warnings
 
-        from airflow.sdk.exceptions import AirflowDagCycleException
+        from airflow.utils.module_loading import import_string
 
+        target_path = _DEPRECATED_EXCEPTIONS[name]
         warnings.warn(
-            "airflow.exceptions.AirflowDagCycleException is deprecated. "
-            "Use airflow.sdk.exceptions.AirflowDagCycleException instead.",
+            f"airflow.exceptions.{name} is deprecated. Use {target_path} 
instead.",
             DeprecationWarning,

Review Comment:
   This will not show up in task logs, we need it to show if there is a task 
like this:
   ```python
   from airflow.exceptions import AirflowException
   from airflow.providers.standard.operators.python import PythonOperator
   
   
   def print_hello():
       raise AirflowException("hi i am AirflowException!")
   ```
   
   You need to use `DeprecatedImportWarning` like this: 
https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/dag.py#L755-L774



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to