This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d504bfa5d1 Aip-61: Add validation on task `executor` field (#40030)
d504bfa5d1 is described below
commit d504bfa5d148e0d9470375c1c9d35c117deebc31
Author: Syed Hussain <[email protected]>
AuthorDate: Mon Jun 10 10:18:55 2024 -0700
Aip-61: Add validation on task `executor` field (#40030)
---
airflow/executors/executor_loader.py | 4 ++--
airflow/models/dag.py | 14 ++++++++++++++
tests/models/test_dag.py | 23 +++++++++++++++++++++++
3 files changed, 39 insertions(+), 2 deletions(-)
diff --git a/airflow/executors/executor_loader.py
b/airflow/executors/executor_loader.py
index b8b886954b..d433ef183e 100644
--- a/airflow/executors/executor_loader.py
+++ b/airflow/executors/executor_loader.py
@@ -25,7 +25,7 @@ from contextlib import suppress
from typing import TYPE_CHECKING
from airflow.api_internal.internal_api_call import InternalApiConfig
-from airflow.exceptions import AirflowConfigException, AirflowException
+from airflow.exceptions import AirflowConfigException
from airflow.executors.executor_constants import (
CELERY_EXECUTOR,
CELERY_KUBERNETES_EXECUTOR,
@@ -206,7 +206,7 @@ class ExecutorLoader:
elif executor_name := _classname_to_executors.get(executor_name_str):
return executor_name
else:
- raise AirflowException(f"Unknown executor being loaded:
{executor_name_str}")
+ raise ValueError(f"Unknown executor being loaded:
{executor_name_str}")
@classmethod
def load_executor(cls, executor_name: ExecutorName | str | None) ->
BaseExecutor:
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index b91c08317f..1bf812abda 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -94,6 +94,7 @@ from airflow.exceptions import (
TaskDeferred,
TaskNotFound,
)
+from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.job import run_job
from airflow.models.abstractoperator import AbstractOperator,
TaskStateChangeCallback
from airflow.models.base import Base, StringID
@@ -800,10 +801,23 @@ class DAG(LoggingMixin):
f"inconsistent schedule: timetable {self.timetable.summary!r} "
f"does not match schedule_interval {self.schedule_interval!r}",
)
+ self.validate_executor_field()
self.validate_schedule_and_params()
self.timetable.validate()
self.validate_setup_teardown()
+ def validate_executor_field(self):
+ for task in self.tasks:
+ if task.executor:
+ try:
+ ExecutorLoader.lookup_executor_name_by_str(task.executor)
+ except ValueError:
+ raise ValueError(
+ f"The specified executor {task.executor} for task
{task.task_id} is not "
+ "configured. Review the core.executors Airflow
configuration to add it or "
+ "update the executor configuration for this task."
+ )
+
def validate_setup_teardown(self):
"""
Validate that setup and teardown tasks are configured properly.
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 9207b3557c..6d931ffbdc 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -59,6 +59,7 @@ from airflow.models.dag import (
DagModel,
DagOwnerAttributes,
DagTag,
+ ExecutorLoader,
dag as dag_decorator,
get_dataset_triggered_next_run_info,
)
@@ -2761,6 +2762,28 @@ my_postgres_conn:
dag.access_control = outdated_permissions
assert dag.access_control == updated_permissions
+ def test_validate_executor_field_executor_not_configured(self):
+ dag = DAG(
+ "test-dag",
+ schedule=None,
+ )
+
+ EmptyOperator(task_id="t1", dag=dag, executor="test.custom.executor")
+ with pytest.raises(
+ ValueError, match="The specified executor test.custom.executor for
task t1 is not configured"
+ ):
+ dag.validate()
+
+ def test_validate_executor_field(self):
+ with patch.object(ExecutorLoader, "lookup_executor_name_by_str"):
+ dag = DAG(
+ "test-dag",
+ schedule=None,
+ )
+
+ EmptyOperator(task_id="t1", dag=dag,
executor="test.custom.executor")
+ dag.validate()
+
def test_validate_params_on_trigger_dag(self):
dag = DAG("dummy-dag", schedule=None, params={"param1":
Param(type="string")})
with pytest.raises(ParamValidationError, match="No value passed and
Param has no default value"):