This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5ae4e37eb3 Hybrid-ize tasks run_task executor entrypoint (#40762)
5ae4e37eb3 is described below
commit 5ae4e37eb33397f84835c183547aa03230a530f5
Author: Niko Oliveira <[email protected]>
AuthorDate: Fri Jul 12 22:32:56 2024 -0700
Hybrid-ize tasks run_task executor entrypoint (#40762)
There is an entrypoint to running tasks on executors other than the
backfill and scheduler jobs, that is the cli command tasks run_task. If
neither --local or --raw are provided, an executor instance is created
to run the task. Before this change, that was always the default
executor. This change updates that logic to check if the task instance
has been configured to run on a specific executor, if so, load that
executor to run the task instead of the default.
---
airflow/cli/commands/task_command.py | 5 ++-
tests/cli/commands/test_task_command.py | 60 ++++++++++++++++++++++++++++++++-
2 files changed, 63 insertions(+), 2 deletions(-)
diff --git a/airflow/cli/commands/task_command.py
b/airflow/cli/commands/task_command.py
index caced495be..91f64c7cf4 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -278,7 +278,10 @@ def _run_task_by_executor(args, dag: DAG, ti:
TaskInstance) -> None:
print("Could not pickle the DAG")
print(e)
raise e
- executor = ExecutorLoader.get_default_executor()
+ if ti.executor:
+ executor = ExecutorLoader.load_executor(ti.executor)
+ else:
+ executor = ExecutorLoader.get_default_executor()
executor.job_id = None
executor.start()
print("Sending to executor.")
diff --git a/tests/cli/commands/test_task_command.py
b/tests/cli/commands/test_task_command.py
index 1e7c29cca2..13ea381e60 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -26,6 +26,7 @@ import shutil
import sys
from argparse import ArgumentParser
from contextlib import contextmanager, redirect_stdout
+from importlib import reload
from io import StringIO
from pathlib import Path
from typing import TYPE_CHECKING
@@ -41,9 +42,11 @@ from airflow.cli.commands import task_command
from airflow.cli.commands.task_command import LoggerMutationHelper
from airflow.configuration import conf
from airflow.exceptions import AirflowException, DagRunNotFound
+from airflow.executors.local_executor import LocalExecutor
from airflow.models import DagBag, DagRun, Pool, TaskInstance
from airflow.models.serialized_dag import SerializedDagModel
from airflow.operators.bash import BashOperator
+from airflow.operators.empty import EmptyOperator
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import State, TaskInstanceState
@@ -179,7 +182,7 @@ class TestCliTasks:
def test_cli_test_different_path(self, session, tmp_path):
"""
- When thedag processor has a different dags folder
+ When the dag processor has a different dags folder
from the worker, ``airflow tasks run --local`` should still work.
"""
repo_root = Path(__file__).parents[3]
@@ -452,6 +455,61 @@ class TestCliTasks:
)
)
+ def test_cli_run_no_local_no_raw_runs_executor(self, dag_maker):
+ from airflow.cli.commands import task_command
+
+ with dag_maker(dag_id="test_executor", schedule="@daily") as dag:
+ with mock.patch(
+
"airflow.executors.executor_loader.ExecutorLoader.load_executor"
+ ) as loader_mock, mock.patch(
+
"airflow.executors.executor_loader.ExecutorLoader.get_default_executor"
+ ) as get_default_mock:
+ EmptyOperator(task_id="task1")
+ EmptyOperator(task_id="task2", executor="foo_executor_alias")
+
+ dag_maker.create_dagrun()
+
+ # Reload module to consume newly mocked executor loader
+ reload(task_command)
+
+ loader_mock.return_value = LocalExecutor()
+ get_default_mock.return_value = LocalExecutor()
+
+ # In the task1 case we will use the default executor
+ task_command.task_run(
+ self.parser.parse_args(
+ [
+ "tasks",
+ "run",
+ "test_executor",
+ "task1",
+ DEFAULT_DATE.isoformat(),
+ ]
+ ),
+ dag,
+ )
+ get_default_mock.assert_called_once()
+ loader_mock.assert_not_called()
+
+ # In the task2 case we will use the executor configured on the
task
+ task_command.task_run(
+ self.parser.parse_args(
+ [
+ "tasks",
+ "run",
+ "test_executor",
+ "task2",
+ DEFAULT_DATE.isoformat(),
+ ]
+ ),
+ dag,
+ )
+ get_default_mock.assert_called_once() # Call from previous
task
+ loader_mock.assert_called_once_with("foo_executor_alias")
+
+ # Reload module to remove mocked version of executor loader
+ reload(task_command)
+
def test_task_render(self):
"""
tasks render should render and displays templated fields for a given
task