This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d504bfa5d1 Aip-61: Add validation on task `executor` field  (#40030)
d504bfa5d1 is described below

commit d504bfa5d148e0d9470375c1c9d35c117deebc31
Author: Syed Hussain <[email protected]>
AuthorDate: Mon Jun 10 10:18:55 2024 -0700

    Aip-61: Add validation on task `executor` field  (#40030)
---
 airflow/executors/executor_loader.py |  4 ++--
 airflow/models/dag.py                | 14 ++++++++++++++
 tests/models/test_dag.py             | 23 +++++++++++++++++++++++
 3 files changed, 39 insertions(+), 2 deletions(-)

diff --git a/airflow/executors/executor_loader.py 
b/airflow/executors/executor_loader.py
index b8b886954b..d433ef183e 100644
--- a/airflow/executors/executor_loader.py
+++ b/airflow/executors/executor_loader.py
@@ -25,7 +25,7 @@ from contextlib import suppress
 from typing import TYPE_CHECKING
 
 from airflow.api_internal.internal_api_call import InternalApiConfig
-from airflow.exceptions import AirflowConfigException, AirflowException
+from airflow.exceptions import AirflowConfigException
 from airflow.executors.executor_constants import (
     CELERY_EXECUTOR,
     CELERY_KUBERNETES_EXECUTOR,
@@ -206,7 +206,7 @@ class ExecutorLoader:
         elif executor_name := _classname_to_executors.get(executor_name_str):
             return executor_name
         else:
-            raise AirflowException(f"Unknown executor being loaded: 
{executor_name_str}")
+            raise ValueError(f"Unknown executor being loaded: 
{executor_name_str}")
 
     @classmethod
     def load_executor(cls, executor_name: ExecutorName | str | None) -> 
BaseExecutor:
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index b91c08317f..1bf812abda 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -94,6 +94,7 @@ from airflow.exceptions import (
     TaskDeferred,
     TaskNotFound,
 )
+from airflow.executors.executor_loader import ExecutorLoader
 from airflow.jobs.job import run_job
 from airflow.models.abstractoperator import AbstractOperator, 
TaskStateChangeCallback
 from airflow.models.base import Base, StringID
@@ -800,10 +801,23 @@ class DAG(LoggingMixin):
                 f"inconsistent schedule: timetable {self.timetable.summary!r} "
                 f"does not match schedule_interval {self.schedule_interval!r}",
             )
+        self.validate_executor_field()
         self.validate_schedule_and_params()
         self.timetable.validate()
         self.validate_setup_teardown()
 
+    def validate_executor_field(self):
+        for task in self.tasks:
+            if task.executor:
+                try:
+                    ExecutorLoader.lookup_executor_name_by_str(task.executor)
+                except ValueError:
+                    raise ValueError(
+                        f"The specified executor {task.executor} for task 
{task.task_id} is not "
+                        "configured. Review the core.executors Airflow 
configuration to add it or "
+                        "update the executor configuration for this task."
+                    )
+
     def validate_setup_teardown(self):
         """
         Validate that setup and teardown tasks are configured properly.
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 9207b3557c..6d931ffbdc 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -59,6 +59,7 @@ from airflow.models.dag import (
     DagModel,
     DagOwnerAttributes,
     DagTag,
+    ExecutorLoader,
     dag as dag_decorator,
     get_dataset_triggered_next_run_info,
 )
@@ -2761,6 +2762,28 @@ my_postgres_conn:
             dag.access_control = outdated_permissions
         assert dag.access_control == updated_permissions
 
+    def test_validate_executor_field_executor_not_configured(self):
+        dag = DAG(
+            "test-dag",
+            schedule=None,
+        )
+
+        EmptyOperator(task_id="t1", dag=dag, executor="test.custom.executor")
+        with pytest.raises(
+            ValueError, match="The specified executor test.custom.executor for 
task t1 is not configured"
+        ):
+            dag.validate()
+
+    def test_validate_executor_field(self):
+        with patch.object(ExecutorLoader, "lookup_executor_name_by_str"):
+            dag = DAG(
+                "test-dag",
+                schedule=None,
+            )
+
+            EmptyOperator(task_id="t1", dag=dag, 
executor="test.custom.executor")
+            dag.validate()
+
     def test_validate_params_on_trigger_dag(self):
         dag = DAG("dummy-dag", schedule=None, params={"param1": 
Param(type="string")})
         with pytest.raises(ParamValidationError, match="No value passed and 
Param has no default value"):

Reply via email to