This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5ae4e37eb3 Hybrid-ize tasks run_task executor entrypoint (#40762)
5ae4e37eb3 is described below

commit 5ae4e37eb33397f84835c183547aa03230a530f5
Author: Niko Oliveira <[email protected]>
AuthorDate: Fri Jul 12 22:32:56 2024 -0700

    Hybrid-ize tasks run_task executor entrypoint (#40762)
    
    There is an entrypoint to running tasks on executors other than the
    backfill and scheduler jobs, that is the cli command tasks run_task. If
    neither --local or --raw are provided, an executor instance is created
    to run the task. Before this change, that was always the default
    executor. This change updates that logic to check if the task instance
    has been configured to run on a specific executor, if so, load that
    executor to run the task instead of the default.
---
 airflow/cli/commands/task_command.py    |  5 ++-
 tests/cli/commands/test_task_command.py | 60 ++++++++++++++++++++++++++++++++-
 2 files changed, 63 insertions(+), 2 deletions(-)

diff --git a/airflow/cli/commands/task_command.py 
b/airflow/cli/commands/task_command.py
index caced495be..91f64c7cf4 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -278,7 +278,10 @@ def _run_task_by_executor(args, dag: DAG, ti: 
TaskInstance) -> None:
             print("Could not pickle the DAG")
             print(e)
             raise e
-    executor = ExecutorLoader.get_default_executor()
+    if ti.executor:
+        executor = ExecutorLoader.load_executor(ti.executor)
+    else:
+        executor = ExecutorLoader.get_default_executor()
     executor.job_id = None
     executor.start()
     print("Sending to executor.")
diff --git a/tests/cli/commands/test_task_command.py 
b/tests/cli/commands/test_task_command.py
index 1e7c29cca2..13ea381e60 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -26,6 +26,7 @@ import shutil
 import sys
 from argparse import ArgumentParser
 from contextlib import contextmanager, redirect_stdout
+from importlib import reload
 from io import StringIO
 from pathlib import Path
 from typing import TYPE_CHECKING
@@ -41,9 +42,11 @@ from airflow.cli.commands import task_command
 from airflow.cli.commands.task_command import LoggerMutationHelper
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException, DagRunNotFound
+from airflow.executors.local_executor import LocalExecutor
 from airflow.models import DagBag, DagRun, Pool, TaskInstance
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.operators.bash import BashOperator
+from airflow.operators.empty import EmptyOperator
 from airflow.utils import timezone
 from airflow.utils.session import create_session
 from airflow.utils.state import State, TaskInstanceState
@@ -179,7 +182,7 @@ class TestCliTasks:
 
     def test_cli_test_different_path(self, session, tmp_path):
         """
-        When thedag processor has a different dags folder
+        When the dag processor has a different dags folder
         from the worker, ``airflow tasks run --local`` should still work.
         """
         repo_root = Path(__file__).parents[3]
@@ -452,6 +455,61 @@ class TestCliTasks:
                 )
             )
 
+    def test_cli_run_no_local_no_raw_runs_executor(self, dag_maker):
+        from airflow.cli.commands import task_command
+
+        with dag_maker(dag_id="test_executor", schedule="@daily") as dag:
+            with mock.patch(
+                
"airflow.executors.executor_loader.ExecutorLoader.load_executor"
+            ) as loader_mock, mock.patch(
+                
"airflow.executors.executor_loader.ExecutorLoader.get_default_executor"
+            ) as get_default_mock:
+                EmptyOperator(task_id="task1")
+                EmptyOperator(task_id="task2", executor="foo_executor_alias")
+
+                dag_maker.create_dagrun()
+
+                # Reload module to consume newly mocked executor loader
+                reload(task_command)
+
+                loader_mock.return_value = LocalExecutor()
+                get_default_mock.return_value = LocalExecutor()
+
+                # In the task1 case we will use the default executor
+                task_command.task_run(
+                    self.parser.parse_args(
+                        [
+                            "tasks",
+                            "run",
+                            "test_executor",
+                            "task1",
+                            DEFAULT_DATE.isoformat(),
+                        ]
+                    ),
+                    dag,
+                )
+                get_default_mock.assert_called_once()
+                loader_mock.assert_not_called()
+
+                # In the task2 case we will use the executor configured on the 
task
+                task_command.task_run(
+                    self.parser.parse_args(
+                        [
+                            "tasks",
+                            "run",
+                            "test_executor",
+                            "task2",
+                            DEFAULT_DATE.isoformat(),
+                        ]
+                    ),
+                    dag,
+                )
+                get_default_mock.assert_called_once()  # Call from previous 
task
+                loader_mock.assert_called_once_with("foo_executor_alias")
+
+        # Reload module to remove mocked version of executor loader
+        reload(task_command)
+
     def test_task_render(self):
         """
         tasks render should render and displays templated fields for a given 
task

Reply via email to