This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ce7f043e15 Consolidate the call of change_state to fail or success in
the core executors (#35901)
ce7f043e15 is described below
commit ce7f043e15534c3d9ba6d59c3bb6b851e36a60b9
Author: Hussein Awala <[email protected]>
AuthorDate: Tue Nov 28 20:14:13 2023 +0200
Consolidate the call of change_state to fail or success in the core
executors (#35901)
---
airflow/executors/debug_executor.py | 6 +++---
airflow/executors/sequential_executor.py | 5 ++---
tests/executors/test_debug_executor.py | 4 ++--
3 files changed, 7 insertions(+), 8 deletions(-)
diff --git a/airflow/executors/debug_executor.py
b/airflow/executors/debug_executor.py
index b601c2b7c9..bb5f46b1f7 100644
--- a/airflow/executors/debug_executor.py
+++ b/airflow/executors/debug_executor.py
@@ -74,7 +74,7 @@ class DebugExecutor(BaseExecutor):
elif self._terminated.is_set():
self.log.info("Executor is terminated! Stopping %s to %s",
ti.key, TaskInstanceState.FAILED)
ti.set_state(TaskInstanceState.FAILED)
- self.change_state(ti.key, TaskInstanceState.FAILED)
+ self.fail(ti.key)
else:
task_succeeded = self._run_task(ti)
@@ -84,11 +84,11 @@ class DebugExecutor(BaseExecutor):
try:
params = self.tasks_params.pop(ti.key, {})
ti.run(job_id=ti.job_id, **params)
- self.change_state(key, TaskInstanceState.SUCCESS)
+ self.success(key)
return True
except Exception as e:
ti.set_state(TaskInstanceState.FAILED)
- self.change_state(key, TaskInstanceState.FAILED)
+ self.fail(key)
self.log.exception("Failed to execute task: %s.", e)
return False
diff --git a/airflow/executors/sequential_executor.py
b/airflow/executors/sequential_executor.py
index 8ea3e42dc5..227bf879f3 100644
--- a/airflow/executors/sequential_executor.py
+++ b/airflow/executors/sequential_executor.py
@@ -28,7 +28,6 @@ import subprocess
from typing import TYPE_CHECKING, Any
from airflow.executors.base_executor import BaseExecutor
-from airflow.utils.state import TaskInstanceState
if TYPE_CHECKING:
from airflow.executors.base_executor import CommandType
@@ -75,9 +74,9 @@ class SequentialExecutor(BaseExecutor):
try:
subprocess.check_call(command, close_fds=True)
- self.change_state(key, TaskInstanceState.SUCCESS)
+ self.success(key)
except subprocess.CalledProcessError as e:
- self.change_state(key, TaskInstanceState.FAILED)
+ self.fail(key)
self.log.error("Failed to execute task %s.", e)
self.commands_to_run = []
diff --git a/tests/executors/test_debug_executor.py
b/tests/executors/test_debug_executor.py
index 03a91f9c92..20ee821842 100644
--- a/tests/executors/test_debug_executor.py
+++ b/tests/executors/test_debug_executor.py
@@ -111,7 +111,7 @@ class TestDebugExecutor:
assert not executor.tasks_to_run
change_state_mock.assert_has_calls(
[
- mock.call(ti1.key, State.FAILED),
+ mock.call(ti1.key, State.FAILED, None),
mock.call(ti2.key, State.UPSTREAM_FAILED),
]
)
@@ -145,6 +145,6 @@ class TestDebugExecutor:
change_state_mock.assert_has_calls(
[
- mock.call(ti1.key, State.FAILED),
+ mock.call(ti1.key, State.FAILED, None),
]
)