This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3e340797ab Make param validation consistent for DAG validation and 
triggering (#34248)
3e340797ab is described below

commit 3e340797ab98a06b51b2930610b0abb0ad20a750
Author: Jens Scheffler <[email protected]>
AuthorDate: Sun Sep 10 09:03:13 2023 +0200

    Make param validation consistent for DAG validation and triggering (#34248)
---
 airflow/models/dag.py | 19 ++++++++++---------
 1 file changed, 10 insertions(+), 9 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 2cdcfc197a..ceb2ca2eb7 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -84,6 +84,7 @@ from airflow.exceptions import (
     AirflowSkipException,
     DuplicateTaskIdFound,
     FailStopDagInvalidTriggerRule,
+    ParamValidationError,
     RemovedInAirflow3Warning,
     TaskNotFound,
 )
@@ -3276,20 +3277,20 @@ class DAG(LoggingMixin):
 
     def validate_schedule_and_params(self):
         """
-        Validate Param values when the schedule_interval is not None.
+        Validate Param values when the DAG has schedule defined.
 
-        Raise exception if there are any Params in the DAG which neither have 
a default value nor
-        have the null in schema['type'] list, but the DAG have a 
schedule_interval which is not None.
+        Raise exception if there are any Params which can not be resolved by 
their schema definition.
         """
         if not self.timetable.can_be_scheduled:
             return
 
-        for v in self.params.values():
-            # As type can be an array, we would check if `null` is an allowed 
type or not
-            if not v.has_value and ("type" not in v.schema or "null" not in 
v.schema["type"]):
-                raise AirflowException(
-                    "DAG Schedule must be None, if there are any required 
params without default values"
-                )
+        try:
+            self.params.validate()
+        except ParamValidationError as pverr:
+            raise AirflowException(
+                "DAG is not allowed to define a Schedule, "
+                "if there are any required params without default values or 
default values are not valid."
+            ) from pverr
 
     def iter_invalid_owner_links(self) -> Iterator[tuple[str, str]]:
         """

Reply via email to