This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b2f8b9fce27 Make SerializedParam.resolve not raise by default (#56923)
b2f8b9fce27 is described below
commit b2f8b9fce27935c9ddd5c0bc98d6d1daf6fd37d3
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Thu Oct 23 12:13:18 2025 +0800
Make SerializedParam.resolve not raise by default (#56923)
---
.../src/airflow/serialization/definitions/param.py | 77 +++++++---------------
airflow-core/tests/unit/models/test_dag.py | 8 +--
2 files changed, 27 insertions(+), 58 deletions(-)
diff --git a/airflow-core/src/airflow/serialization/definitions/param.py
b/airflow-core/src/airflow/serialization/definitions/param.py
index e01a0844374..29d173c61a5 100644
--- a/airflow-core/src/airflow/serialization/definitions/param.py
+++ b/airflow-core/src/airflow/serialization/definitions/param.py
@@ -20,78 +20,50 @@ from __future__ import annotations
import collections.abc
import copy
-import json
from typing import TYPE_CHECKING, Any
-from airflow.exceptions import ParamValidationError
from airflow.serialization.definitions.notset import NOTSET, ArgNotSet
if TYPE_CHECKING:
from collections.abc import Iterator, Mapping
-def _check_json(value):
- try:
- json.dumps(value)
- except Exception:
- raise ParamValidationError(
- f"All provided parameters must be JSON-serializable. The value
'{value}' is not."
- )
-
-
class SerializedParam:
- """Server-side Param class for deserialization."""
+ """Server-side param class for deserialization."""
- def __init__(self, default: Any = None, description: str | None = None,
**schema):
+ def __init__(self, default: Any = NOTSET, description: str | None = None,
**schema):
# No validation needed - the SDK already validated the default.
self.value = default
self.description = description
self.schema = schema
- def resolve(self, value: Any = NOTSET, suppress_exception: bool = False)
-> Any:
+ def resolve(self, *, raises: bool = False) -> Any:
"""
- Run the validations and returns the Param's final value.
+ Run the validations and returns the param's final value.
- May raise ValueError on failed validations, or TypeError
- if no value is passed and no value already exists.
- We first check that value is json-serializable; if not, warn.
- In future release we will require the value to be json-serializable.
+ Different from SDK Param, this function never raises by default. *None*
+ is returned if validation fails, no value is available, or the return
+ value is not JSON-serializable.
- :param value: The value to be updated for the Param
- :param suppress_exception: To raise an exception or not when validation
- fails. If true and validations fails, *None* is returned.
+ :param raises: All exceptions during validation are suppressed by
+ default. They are only raised if this is set to *True* instead.
"""
import jsonschema
- from jsonschema import FormatChecker
- from jsonschema.exceptions import ValidationError
- if not isinstance(value, ArgNotSet):
- try:
- _check_json(value)
- except ParamValidationError:
- if suppress_exception:
- return None
- raise
- final_val = value
- elif isinstance(self.value, ArgNotSet):
- if suppress_exception:
- return None
- raise ParamValidationError("No value passed and Param has no
default value")
- else:
- final_val = self.value
try:
- jsonschema.validate(final_val, self.schema,
format_checker=FormatChecker())
- except ValidationError as err:
- if suppress_exception:
+ if isinstance(value := self.value, ArgNotSet):
+ raise ValueError("No value passed")
+ jsonschema.validate(value, self.schema,
format_checker=jsonschema.FormatChecker())
+ except Exception:
+ if not raises:
return None
- raise ParamValidationError(err) from None
- self.value = final_val
- return final_val
+ raise
+ return value
def dump(self) -> dict[str, Any]:
"""Return the full param spec for API consumers."""
return {
- "value": None if isinstance(self.value, ArgNotSet) else self.value,
+ "value": self.resolve(),
"schema": self.schema,
"description": self.description,
}
@@ -115,12 +87,11 @@ class SerializedParamsDict(collections.abc.Mapping[str,
Any]):
__dict: dict[str, SerializedParam]
- def __init__(self, d: Mapping[str, Any] | None = None, *,
suppress_exception: bool = False) -> None:
+ def __init__(self, d: Mapping[str, Any] | None = None) -> None:
self.__dict = dict(_collect_params(d))
- self.suppress_exception = suppress_exception
def __eq__(self, other: Any) -> bool:
- """Compare ParamsDict objects using their dumped content, matching SDK
behavior."""
+ """Compare params dicts using their dumped content, matching SDK
behavior."""
if hasattr(other, "dump"): # ParamsDict or SerializedParamsDict
return self.dump() == other.dump()
if isinstance(other, collections.abc.Mapping):
@@ -155,19 +126,19 @@ class SerializedParamsDict(collections.abc.Mapping[str,
Any]):
return collections.abc.ValuesView(self.__dict)
def validate(self) -> dict[str, Any]:
- """Validate & returns all the Params object stored in the
dictionary."""
+ """Validate & returns all the params stored in the dictionary."""
def _validate_one(k: str, v: SerializedParam):
try:
- return v.resolve(suppress_exception=self.suppress_exception)
- except ParamValidationError as e:
- raise ParamValidationError(f"Invalid input for param {k}:
{e}") from None
+ return v.resolve(raises=True)
+ except Exception as e:
+ raise ValueError(f"Invalid input for param {k}: {e}") from None
return {k: _validate_one(k, v) for k, v in self.__dict.items()}
def dump(self) -> Mapping[str, Any]:
"""Dump the resolved values as a mapping."""
- return {k: v.resolve(suppress_exception=True) for k, v in
self.__dict.items()}
+ return {k: v.resolve() for k, v in self.__dict.items()}
def deep_merge(self, data: Mapping[str, Any] | None) ->
SerializedParamsDict:
"""Create a new params dict by merging incoming data into this params
dict."""
diff --git a/airflow-core/tests/unit/models/test_dag.py
b/airflow-core/tests/unit/models/test_dag.py
index 09835f58e2c..389ce091ea7 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -38,7 +38,7 @@ from airflow._shared.timezones import timezone
from airflow._shared.timezones.timezone import datetime as datetime_tz
from airflow.configuration import conf
from airflow.dag_processing.dagbag import DagBag
-from airflow.exceptions import AirflowException, ParamValidationError
+from airflow.exceptions import AirflowException
from airflow.models.asset import (
AssetAliasModel,
AssetDagRunQueue,
@@ -1733,7 +1733,7 @@ my_postgres_conn:
def test_validate_params_on_trigger_dag(self, testing_dag_bundle):
dag = DAG("dummy-dag", schedule=None, params={"param1":
Param(type="string")})
- with pytest.raises(ParamValidationError, match="No value passed and
Param has no default value"):
+ with pytest.raises(ValueError, match="No value passed"):
sync_dag_to_db(dag).create_dagrun(
run_id="test_dagrun_missing_param",
run_type=DagRunType.MANUAL,
@@ -1745,9 +1745,7 @@ my_postgres_conn:
)
dag = DAG("dummy-dag", schedule=None, params={"param1":
Param(type="string")})
- with pytest.raises(
- ParamValidationError, match="Invalid input for param param1: None
is not of type 'string'"
- ):
+ with pytest.raises(ValueError, match="None is not of type 'string'"):
sync_dag_to_db(dag).create_dagrun(
run_id="test_dagrun_missing_param",
run_type=DagRunType.MANUAL,