This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3e340797ab Make param validation consistent for DAG validation and
triggering (#34248)
3e340797ab is described below
commit 3e340797ab98a06b51b2930610b0abb0ad20a750
Author: Jens Scheffler <[email protected]>
AuthorDate: Sun Sep 10 09:03:13 2023 +0200
Make param validation consistent for DAG validation and triggering (#34248)
---
airflow/models/dag.py | 19 ++++++++++---------
1 file changed, 10 insertions(+), 9 deletions(-)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 2cdcfc197a..ceb2ca2eb7 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -84,6 +84,7 @@ from airflow.exceptions import (
AirflowSkipException,
DuplicateTaskIdFound,
FailStopDagInvalidTriggerRule,
+ ParamValidationError,
RemovedInAirflow3Warning,
TaskNotFound,
)
@@ -3276,20 +3277,20 @@ class DAG(LoggingMixin):
def validate_schedule_and_params(self):
"""
- Validate Param values when the schedule_interval is not None.
+ Validate Param values when the DAG has schedule defined.
- Raise exception if there are any Params in the DAG which neither have
a default value nor
- have the null in schema['type'] list, but the DAG have a
schedule_interval which is not None.
+ Raise exception if there are any Params which can not be resolved by
their schema definition.
"""
if not self.timetable.can_be_scheduled:
return
- for v in self.params.values():
- # As type can be an array, we would check if `null` is an allowed
type or not
- if not v.has_value and ("type" not in v.schema or "null" not in
v.schema["type"]):
- raise AirflowException(
- "DAG Schedule must be None, if there are any required
params without default values"
- )
+ try:
+ self.params.validate()
+ except ParamValidationError as pverr:
+ raise AirflowException(
+ "DAG is not allowed to define a Schedule, "
+ "if there are any required params without default values or
default values are not valid."
+ ) from pverr
def iter_invalid_owner_links(self) -> Iterator[tuple[str, str]]:
"""