This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6f112cfbc1 Improve Code Coverage for Base Executor (#35260)
6f112cfbc1 is described below
commit 6f112cfbc1091ccde518aed2a6c99f65a75e9dc2
Author: Owen Leung <[email protected]>
AuthorDate: Mon Oct 30 20:07:57 2023 +0800
Improve Code Coverage for Base Executor (#35260)
---
scripts/cov/core_coverage.py | 1 -
tests/executors/test_base_executor.py | 126 ++++++++++++++++++++++++++++++++++
2 files changed, 126 insertions(+), 1 deletion(-)
diff --git a/scripts/cov/core_coverage.py b/scripts/cov/core_coverage.py
index ba77c35fd1..38d001d2db 100644
--- a/scripts/cov/core_coverage.py
+++ b/scripts/cov/core_coverage.py
@@ -35,7 +35,6 @@ source_files = [
files_not_fully_covered = [
# executors
- "airflow/executors/base_executor.py",
"airflow/executors/debug_executor.py",
"airflow/executors/executor_loader.py",
"airflow/executors/local_executor.py",
diff --git a/tests/executors/test_base_executor.py
b/tests/executors/test_base_executor.py
index 7c21949b53..c990d88726 100644
--- a/tests/executors/test_base_executor.py
+++ b/tests/executors/test_base_executor.py
@@ -17,6 +17,8 @@
# under the License.
from __future__ import annotations
+import logging
+import sys
from datetime import timedelta
from unittest import mock
@@ -24,6 +26,9 @@ import pendulum
import pytest
import time_machine
+from airflow.callbacks.callback_requests import CallbackRequest
+from airflow.cli.cli_config import DefaultHelpParser, GroupCommand
+from airflow.cli.cli_parser import AirflowHelpFormatter
from airflow.executors.base_executor import BaseExecutor,
RunningRetryAttemptType
from airflow.models.baseoperator import BaseOperator
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
@@ -51,6 +56,11 @@ def test_is_production_default_value():
assert BaseExecutor.is_production
+def test_infinite_slotspool():
+ executor = BaseExecutor(0)
+ assert executor.slots_available == sys.maxsize
+
+
def test_get_task_log():
executor = BaseExecutor()
ti = TaskInstance(task=BaseOperator(task_id="dummy"))
@@ -83,6 +93,24 @@ def test_get_event_buffer():
assert len(executor.event_buffer) == 0
+def test_fail_and_success():
+ executor = BaseExecutor()
+
+ date = timezone.utcnow()
+ try_number = 1
+ success_state = State.SUCCESS
+ fail_state = State.FAILED
+ key1 = TaskInstanceKey("my_dag1", "my_task1", date, try_number)
+ key2 = TaskInstanceKey("my_dag2", "my_task1", date, try_number)
+ key3 = TaskInstanceKey("my_dag2", "my_task2", date, try_number)
+ executor.fail(key1, fail_state)
+ executor.fail(key2, fail_state)
+ executor.success(key3, success_state)
+
+ assert len(executor.running) == 0
+ assert len(executor.get_event_buffer()) == 3
+
+
@mock.patch("airflow.executors.base_executor.BaseExecutor.sync")
@mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_tasks")
@mock.patch("airflow.executors.base_executor.Stats.gauge")
@@ -99,6 +127,22 @@ def test_gauge_executor_metrics(mock_stats_gauge,
mock_trigger_tasks, mock_sync)
mock_stats_gauge.assert_has_calls(calls)
[email protected]("airflow.executors.base_executor.BaseExecutor.sync")
[email protected]("airflow.executors.base_executor.BaseExecutor.trigger_tasks")
[email protected]("airflow.executors.base_executor.Stats.gauge")
+def test_gauge_executor_with_infinite_pool_metrics(mock_stats_gauge,
mock_trigger_tasks, mock_sync):
+ executor = BaseExecutor(0)
+ executor.heartbeat()
+ calls = [
+ mock.call("executor.open_slots", value=mock.ANY, tags={"status":
"open", "name": "BaseExecutor"}),
+ mock.call("executor.queued_tasks", value=mock.ANY, tags={"status":
"queued", "name": "BaseExecutor"}),
+ mock.call(
+ "executor.running_tasks", value=mock.ANY, tags={"status":
"running", "name": "BaseExecutor"}
+ ),
+ ]
+ mock_stats_gauge.assert_has_calls(calls)
+
+
def setup_dagrun(dag_maker):
date = timezone.utcnow()
start_date = date - timedelta(days=2)
@@ -200,10 +244,92 @@ def test_trigger_running_tasks(can_try_mock, dag_maker,
can_try_num, change_stat
def test_validate_airflow_tasks_run_command(dag_maker):
dagrun = setup_dagrun(dag_maker)
tis = dagrun.task_instances
+ print(f"command: {tis[0].command_as_list()}")
dag_id, task_id =
BaseExecutor.validate_airflow_tasks_run_command(tis[0].command_as_list())
+ print(f"dag_id: {dag_id}, task_id: {task_id}")
assert dag_id == dagrun.dag_id and task_id == tis[0].task_id
[email protected](
+ "airflow.models.taskinstance.TaskInstance.generate_command",
+ return_value=["airflow", "tasks", "run", "--test_dag", "--test_task"],
+)
+def
test_validate_airflow_tasks_run_command_with_complete_forloop(generate_command_mock,
dag_maker):
+ dagrun = setup_dagrun(dag_maker)
+ tis = dagrun.task_instances
+ dag_id, task_id =
BaseExecutor.validate_airflow_tasks_run_command(tis[0].command_as_list())
+ assert dag_id is None and task_id is None
+
+
[email protected](
+ "airflow.models.taskinstance.TaskInstance.generate_command",
return_value=["airflow", "task", "run"]
+)
+def test_invalid_airflow_tasks_run_command(generate_command_mock, dag_maker):
+ dagrun = setup_dagrun(dag_maker)
+ tis = dagrun.task_instances
+ with pytest.raises(ValueError):
+
BaseExecutor.validate_airflow_tasks_run_command(tis[0].command_as_list())
+
+
[email protected](
+ "airflow.models.taskinstance.TaskInstance.generate_command",
return_value=["airflow", "tasks", "run"]
+)
+def test_empty_airflow_tasks_run_command(generate_command_mock, dag_maker):
+ dagrun = setup_dagrun(dag_maker)
+ tis = dagrun.task_instances
+ dag_id, task_id =
BaseExecutor.validate_airflow_tasks_run_command(tis[0].command_as_list())
+ assert dag_id is None, task_id is None
+
+
+def test_deprecate_validate_api(dag_maker):
+ dagrun = setup_dagrun(dag_maker)
+ tis = dagrun.task_instances
+ with pytest.warns(DeprecationWarning):
+ BaseExecutor.validate_command(tis[0].command_as_list())
+
+
+def test_debug_dump(caplog):
+ executor = BaseExecutor()
+ with caplog.at_level(logging.INFO):
+ executor.debug_dump()
+ assert "executor.queued" in caplog.text
+ assert "executor.running" in caplog.text
+ assert "executor.event_buffer" in caplog.text
+
+
+def test_base_executor_cannot_send_callback():
+ cbr = CallbackRequest("some_file_path_for_callback")
+ executor = BaseExecutor()
+ with pytest.raises(ValueError):
+ executor.send_callback(cbr)
+
+
+def test_parser_and_formatter_class():
+ executor = BaseExecutor()
+ parser = executor._get_parser()
+ assert isinstance(parser, DefaultHelpParser)
+ assert parser.formatter_class is AirflowHelpFormatter
+
+
[email protected]("airflow.cli.cli_parser._add_command")
[email protected](
+ "airflow.executors.base_executor.BaseExecutor.get_cli_commands",
+ return_value=[
+ GroupCommand(
+ name="some_name",
+ help="some_help",
+ subcommands=["A", "B", "C"],
+ description="some_description",
+ epilog="some_epilog",
+ )
+ ],
+)
+def test_parser_add_command(mock_add_command, mock_get_cli_command):
+ executor = BaseExecutor()
+ executor._get_parser()
+ mock_add_command.assert_called_once()
+
+
@pytest.mark.parametrize("loop_duration, total_tries", [(0.5, 12), (1.0, 7),
(1.7, 4), (10, 2)])
def test_running_retry_attempt_type(loop_duration, total_tries):
"""