o-nikolas commented on code in PR #40030:
URL: https://github.com/apache/airflow/pull/40030#discussion_r1625345239
##########
tests/models/test_dag.py:
##########
@@ -2623,6 +2624,29 @@ def test_replace_outdated_access_control_actions(self):
dag.access_control = outdated_permissions
assert dag.access_control == updated_permissions
+ def test_validate_executor_field_executor_not_configured(self):
+ dag = DAG(
+ "test-dag",
+ schedule=None,
+ )
+
+ EmptyOperator(task_id="t1", dag=dag, executor="test.custom.executor")
+ with pytest.raises(
+ ValueError, match="The specified executor test.custom.executor for
task t1 is not configured"
+ ):
+ dag.validate()
+
+ def test_validate_executor_field(self):
+ with patch.object(ExecutorLoader, "lookup_executor_name_by_str") as
mock_get_executor_names:
+ dag = DAG(
+ "test-dag",
+ schedule=None,
+ )
+
+ mock_get_executor_names.return_value = None
Review Comment:
Do you need to bother specifying None as a return value here? The mock
should just return a mock object by default when it's called which should do
just as well, no?
##########
airflow/models/dag.py:
##########
@@ -800,10 +801,21 @@ def validate(self):
f"inconsistent schedule: timetable {self.timetable.summary!r} "
f"does not match schedule_interval {self.schedule_interval!r}",
)
+ self.validate_executor_field()
self.validate_schedule_and_params()
self.timetable.validate()
self.validate_setup_teardown()
+ def validate_executor_field(self):
+ for task in self.tasks:
+ if task.executor:
+ try:
+ ExecutorLoader.lookup_executor_name_by_str(task.executor)
+ except AirflowException:
Review Comment:
I agree that re-raising a precise exception here is best, since we have more
context. Plus the lookup function is meant to be generic I think, so having
that exception be more general is best.
Though I do think a ExecutorException is fine if we are worried about
AirflowException (but I don't personally like creating custom exceptions very
often).
##########
airflow/models/dag.py:
##########
@@ -800,10 +801,21 @@ def validate(self):
f"inconsistent schedule: timetable {self.timetable.summary!r} "
f"does not match schedule_interval {self.schedule_interval!r}",
)
+ self.validate_executor_field()
self.validate_schedule_and_params()
self.timetable.validate()
self.validate_setup_teardown()
+ def validate_executor_field(self):
+ for task in self.tasks:
+ if task.executor:
+ try:
+ ExecutorLoader.lookup_executor_name_by_str(task.executor)
+ except AirflowException:
+ raise ValueError(
+ f"The specified executor {task.executor} for task
{task.task_id} is not configured"
Review Comment:
```suggestion
f"The specified executor {task.executor} for task
{task.task_id} is not configured. Review the core.executors Airflow
configuration to add it or update the executor configuration for this task."
```
What do you think about this update, which adds some hints to the user on
how to rectify the situation?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]