This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b2f8b9fce27 Make SerializedParam.resolve not raise by default (#56923)
b2f8b9fce27 is described below

commit b2f8b9fce27935c9ddd5c0bc98d6d1daf6fd37d3
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Thu Oct 23 12:13:18 2025 +0800

    Make SerializedParam.resolve not raise by default (#56923)
---
 .../src/airflow/serialization/definitions/param.py | 77 +++++++---------------
 airflow-core/tests/unit/models/test_dag.py         |  8 +--
 2 files changed, 27 insertions(+), 58 deletions(-)

diff --git a/airflow-core/src/airflow/serialization/definitions/param.py 
b/airflow-core/src/airflow/serialization/definitions/param.py
index e01a0844374..29d173c61a5 100644
--- a/airflow-core/src/airflow/serialization/definitions/param.py
+++ b/airflow-core/src/airflow/serialization/definitions/param.py
@@ -20,78 +20,50 @@ from __future__ import annotations
 
 import collections.abc
 import copy
-import json
 from typing import TYPE_CHECKING, Any
 
-from airflow.exceptions import ParamValidationError
 from airflow.serialization.definitions.notset import NOTSET, ArgNotSet
 
 if TYPE_CHECKING:
     from collections.abc import Iterator, Mapping
 
 
-def _check_json(value):
-    try:
-        json.dumps(value)
-    except Exception:
-        raise ParamValidationError(
-            f"All provided parameters must be JSON-serializable. The value 
'{value}' is not."
-        )
-
-
 class SerializedParam:
-    """Server-side Param class for deserialization."""
+    """Server-side param class for deserialization."""
 
-    def __init__(self, default: Any = None, description: str | None = None, 
**schema):
+    def __init__(self, default: Any = NOTSET, description: str | None = None, 
**schema):
         # No validation needed - the SDK already validated the default.
         self.value = default
         self.description = description
         self.schema = schema
 
-    def resolve(self, value: Any = NOTSET, suppress_exception: bool = False) 
-> Any:
+    def resolve(self, *, raises: bool = False) -> Any:
         """
-        Run the validations and returns the Param's final value.
+        Run the validations and returns the param's final value.
 
-        May raise ValueError on failed validations, or TypeError
-        if no value is passed and no value already exists.
-        We first check that value is json-serializable; if not, warn.
-        In future release we will require the value to be json-serializable.
+        Different from SDK Param, this function never raises by default. *None*
+        is returned if validation fails, no value is available, or the return
+        value is not JSON-serializable.
 
-        :param value: The value to be updated for the Param
-        :param suppress_exception: To raise an exception or not when validation
-            fails. If true and validations fails, *None* is returned.
+        :param raises: All exceptions during validation are suppressed by
+            default. They are only raised if this is set to *True* instead.
         """
         import jsonschema
-        from jsonschema import FormatChecker
-        from jsonschema.exceptions import ValidationError
 
-        if not isinstance(value, ArgNotSet):
-            try:
-                _check_json(value)
-            except ParamValidationError:
-                if suppress_exception:
-                    return None
-                raise
-            final_val = value
-        elif isinstance(self.value, ArgNotSet):
-            if suppress_exception:
-                return None
-            raise ParamValidationError("No value passed and Param has no 
default value")
-        else:
-            final_val = self.value
         try:
-            jsonschema.validate(final_val, self.schema, 
format_checker=FormatChecker())
-        except ValidationError as err:
-            if suppress_exception:
+            if isinstance(value := self.value, ArgNotSet):
+                raise ValueError("No value passed")
+            jsonschema.validate(value, self.schema, 
format_checker=jsonschema.FormatChecker())
+        except Exception:
+            if not raises:
                 return None
-            raise ParamValidationError(err) from None
-        self.value = final_val
-        return final_val
+            raise
+        return value
 
     def dump(self) -> dict[str, Any]:
         """Return the full param spec for API consumers."""
         return {
-            "value": None if isinstance(self.value, ArgNotSet) else self.value,
+            "value": self.resolve(),
             "schema": self.schema,
             "description": self.description,
         }
@@ -115,12 +87,11 @@ class SerializedParamsDict(collections.abc.Mapping[str, 
Any]):
 
     __dict: dict[str, SerializedParam]
 
-    def __init__(self, d: Mapping[str, Any] | None = None, *, 
suppress_exception: bool = False) -> None:
+    def __init__(self, d: Mapping[str, Any] | None = None) -> None:
         self.__dict = dict(_collect_params(d))
-        self.suppress_exception = suppress_exception
 
     def __eq__(self, other: Any) -> bool:
-        """Compare ParamsDict objects using their dumped content, matching SDK 
behavior."""
+        """Compare params dicts using their dumped content, matching SDK 
behavior."""
         if hasattr(other, "dump"):  # ParamsDict or SerializedParamsDict
             return self.dump() == other.dump()
         if isinstance(other, collections.abc.Mapping):
@@ -155,19 +126,19 @@ class SerializedParamsDict(collections.abc.Mapping[str, 
Any]):
         return collections.abc.ValuesView(self.__dict)
 
     def validate(self) -> dict[str, Any]:
-        """Validate & returns all the Params object stored in the 
dictionary."""
+        """Validate & returns all the params stored in the dictionary."""
 
         def _validate_one(k: str, v: SerializedParam):
             try:
-                return v.resolve(suppress_exception=self.suppress_exception)
-            except ParamValidationError as e:
-                raise ParamValidationError(f"Invalid input for param {k}: 
{e}") from None
+                return v.resolve(raises=True)
+            except Exception as e:
+                raise ValueError(f"Invalid input for param {k}: {e}") from None
 
         return {k: _validate_one(k, v) for k, v in self.__dict.items()}
 
     def dump(self) -> Mapping[str, Any]:
         """Dump the resolved values as a mapping."""
-        return {k: v.resolve(suppress_exception=True) for k, v in 
self.__dict.items()}
+        return {k: v.resolve() for k, v in self.__dict.items()}
 
     def deep_merge(self, data: Mapping[str, Any] | None) -> 
SerializedParamsDict:
         """Create a new params dict by merging incoming data into this params 
dict."""
diff --git a/airflow-core/tests/unit/models/test_dag.py 
b/airflow-core/tests/unit/models/test_dag.py
index 09835f58e2c..389ce091ea7 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -38,7 +38,7 @@ from airflow._shared.timezones import timezone
 from airflow._shared.timezones.timezone import datetime as datetime_tz
 from airflow.configuration import conf
 from airflow.dag_processing.dagbag import DagBag
-from airflow.exceptions import AirflowException, ParamValidationError
+from airflow.exceptions import AirflowException
 from airflow.models.asset import (
     AssetAliasModel,
     AssetDagRunQueue,
@@ -1733,7 +1733,7 @@ my_postgres_conn:
 
     def test_validate_params_on_trigger_dag(self, testing_dag_bundle):
         dag = DAG("dummy-dag", schedule=None, params={"param1": 
Param(type="string")})
-        with pytest.raises(ParamValidationError, match="No value passed and 
Param has no default value"):
+        with pytest.raises(ValueError, match="No value passed"):
             sync_dag_to_db(dag).create_dagrun(
                 run_id="test_dagrun_missing_param",
                 run_type=DagRunType.MANUAL,
@@ -1745,9 +1745,7 @@ my_postgres_conn:
             )
 
         dag = DAG("dummy-dag", schedule=None, params={"param1": 
Param(type="string")})
-        with pytest.raises(
-            ParamValidationError, match="Invalid input for param param1: None 
is not of type 'string'"
-        ):
+        with pytest.raises(ValueError, match="None is not of type 'string'"):
             sync_dag_to_db(dag).create_dagrun(
                 run_id="test_dagrun_missing_param",
                 run_type=DagRunType.MANUAL,

Reply via email to