This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 700b43c0731 Refactor Kubernetes Executor Types to NamedTuples (#54858)
700b43c0731 is described below
commit 700b43c0731c8c6c90604c1c3066fd30004b50dd
Author: Gary Hsu <[email protected]>
AuthorDate: Wed Aug 27 16:58:31 2025 +0800
Refactor Kubernetes Executor Types to NamedTuples (#54858)
---
.../kubernetes_tests/test_kubernetes_executor.py | 31 ++++++++---
.../kubernetes/executors/kubernetes_executor.py | 51 +++++++++---------
.../executors/kubernetes_executor_types.py | 54 +++++++++++++------
.../executors/kubernetes_executor_utils.py | 60 +++++++++++++---------
.../executors/test_kubernetes_executor.py | 36 +++++++++----
5 files changed, 149 insertions(+), 83 deletions(-)
diff --git
a/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py
b/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py
index 4d14c75864d..f7184acaab6 100644
--- a/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py
+++ b/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py
@@ -24,6 +24,7 @@ import pytest
if TYPE_CHECKING:
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types
import FailureDetails
+from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types
import KubernetesResults
from kubernetes_tests.test_base import (
EXECUTOR,
BaseK8STest, # isort:skip (needed to workaround isort bug)
@@ -138,15 +139,19 @@ class TestKubernetesExecutor(BaseK8STest):
# Create a test task key
task_key = TaskInstanceKey(dag_id="test_dag", task_id="test_task",
run_id="test_run", try_number=1)
- # Call _change_state with FAILED status and failure details
- executor._change_state(
+ # Create KubernetesResults object
+ results = KubernetesResults(
key=task_key,
state=TaskInstanceState.FAILED,
pod_name="test-pod",
namespace="test-namespace",
+ resource_version="123",
failure_details=failure_details,
)
+ # Call _change_state with KubernetesResults object
+ executor._change_state(results)
+
# Verify that the warning log was called with expected parameters
mock_log.warning.assert_called_once_with(
"Task %s failed in pod %s/%s. Pod phase: %s, reason: %s, message:
%s, "
@@ -181,15 +186,19 @@ class TestKubernetesExecutor(BaseK8STest):
# Create a test task key
task_key = TaskInstanceKey(dag_id="test_dag", task_id="test_task",
run_id="test_run", try_number=1)
- # Call _change_state with FAILED status but no failure details
- executor._change_state(
+ # Create KubernetesResults object without failure details
+ results = KubernetesResults(
key=task_key,
state=TaskInstanceState.FAILED,
pod_name="test-pod",
namespace="test-namespace",
+ resource_version="123",
failure_details=None,
)
+ # Call _change_state with KubernetesResults object
+ executor._change_state(results)
+
# Verify that the warning log was called with the correct parameters
mock_log.warning.assert_called_once_with(
"Task %s failed in pod %s/%s (no details available)",
@@ -214,11 +223,19 @@ class TestKubernetesExecutor(BaseK8STest):
# Create a test task key
task_key = TaskInstanceKey(dag_id="test_dag", task_id="test_task",
run_id="test_run", try_number=1)
- # Call _change_state with SUCCESS status
- executor._change_state(
- key=task_key, state=TaskInstanceState.SUCCESS,
pod_name="test-pod", namespace="test-namespace"
+ # Create KubernetesResults object with SUCCESS state
+ results = KubernetesResults(
+ key=task_key,
+ state=TaskInstanceState.SUCCESS,
+ pod_name="test-pod",
+ namespace="test-namespace",
+ resource_version="123",
+ failure_details=None,
)
+ # Call _change_state with KubernetesResults object
+ executor._change_state(results)
+
# Verify that no failure logs were called
mock_log.error.assert_not_called()
mock_log.warning.assert_not_called()
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index d2f6c1b5df6..ee5dfa9db73 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -66,7 +66,8 @@ from airflow.providers.cncf.kubernetes.exceptions import
PodMutationHookExceptio
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types
import (
ADOPTED,
POD_EXECUTOR_DONE_KEY,
- FailureDetails,
+ KubernetesJob,
+ KubernetesResults,
)
from airflow.providers.cncf.kubernetes.kube_config import KubeConfig
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import
annotations_to_key
@@ -86,10 +87,6 @@ if TYPE_CHECKING:
from airflow.executors import workloads
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey
- from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types
import (
- KubernetesJobType,
- KubernetesResultsType,
- )
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils
import (
AirflowKubernetesScheduler,
)
@@ -157,8 +154,8 @@ class KubernetesExecutor(BaseExecutor):
def __init__(self):
self.kube_config = KubeConfig()
self._manager = multiprocessing.Manager()
- self.task_queue: Queue[KubernetesJobType] = self._manager.Queue()
- self.result_queue: Queue[KubernetesResultsType] = self._manager.Queue()
+ self.task_queue: Queue[KubernetesJob] = self._manager.Queue()
+ self.result_queue: Queue[KubernetesResults] = self._manager.Queue()
self.kube_scheduler: AirflowKubernetesScheduler | None = None
self.kube_client: client.CoreV1Api | None = None
self.scheduler_job_id: str | None = None
@@ -280,7 +277,7 @@ class KubernetesExecutor(BaseExecutor):
else:
pod_template_file = None
self.event_buffer[key] = (TaskInstanceState.QUEUED,
self.scheduler_job_id)
- self.task_queue.put((key, command, kube_executor_config,
pod_template_file))
+ self.task_queue.put(KubernetesJob(key, command, kube_executor_config,
pod_template_file))
# We keep a temporary local record that we've handled this so we don't
# try and remove it from the QUEUED state while we process it
self.last_handled[key] = time.time()
@@ -331,17 +328,16 @@ class KubernetesExecutor(BaseExecutor):
while True:
results = self.result_queue.get_nowait()
try:
- key, state, pod_name, namespace, resource_version,
failure_details = results
- last_resource_version[namespace] = resource_version
- self.log.info("Changing state of %s to %s", results, state)
+ last_resource_version[results.namespace] =
results.resource_version
+ self.log.info("Changing state of %s to %s", results,
results.state)
try:
- self._change_state(key, state, pod_name, namespace,
failure_details)
+ self._change_state(results)
except Exception as e:
self.log.exception(
"Exception: %s when attempting to change state of
%s to %s, re-queueing.",
e,
results,
- state,
+ results.state,
)
self.result_queue.put(results)
finally:
@@ -362,7 +358,7 @@ class KubernetesExecutor(BaseExecutor):
task = self.task_queue.get_nowait()
try:
- key, command, kube_executor_config, pod_template_file =
task
+ key = task.key
self.kube_scheduler.run_next(task)
self.task_publish_retries.pop(key, None)
except PodReconciliationError as e:
@@ -391,11 +387,11 @@ class KubernetesExecutor(BaseExecutor):
self.task_publish_retries[key] = retries + 1
else:
self.log.error("Pod creation failed with reason %r.
Failing task", e.reason)
- key, _, _, _ = task
+ key = task.key
self.fail(key, e)
self.task_publish_retries.pop(key, None)
except PodMutationHookException as e:
- key, _, _, _ = task
+ key = task.key
self.log.error(
"Pod Mutation Hook failed for the task %s. Failing
task. Details: %s",
key,
@@ -408,16 +404,19 @@ class KubernetesExecutor(BaseExecutor):
@provide_session
def _change_state(
self,
- key: TaskInstanceKey,
- state: TaskInstanceState | str | None,
- pod_name: str,
- namespace: str,
- failure_details: FailureDetails | None = None,
+ results: KubernetesResults,
session: Session = NEW_SESSION,
) -> None:
+ """Change state of the task based on KubernetesResults."""
if TYPE_CHECKING:
assert self.kube_scheduler
+ key = results.key
+ state = results.state
+ pod_name = results.pod_name
+ namespace = results.namespace
+ failure_details = results.failure_details
+
if state == TaskInstanceState.FAILED:
# Use pre-collected failure details from the watcher to avoid
additional API calls
if failure_details:
@@ -734,18 +733,20 @@ class KubernetesExecutor(BaseExecutor):
results = self.result_queue.get_nowait()
self.log.warning("Executor shutting down, flushing
results=%s", results)
try:
- key, state, pod_name, namespace, resource_version,
failure_details = results
self.log.info(
- "Changing state of %s to %s : resource_version=%d",
results, state, resource_version
+ "Changing state of %s to %s : resource_version=%s",
+ results,
+ results.state,
+ results.resource_version,
)
try:
- self._change_state(key, state, pod_name, namespace,
failure_details)
+ self._change_state(results)
except Exception as e:
self.log.exception(
"Ignoring exception: %s when attempting to change
state of %s to %s.",
e,
results,
- state,
+ results.state,
)
finally:
self.result_queue.task_done()
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py
index a475959e588..f8e03f1f04c 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py
@@ -16,7 +16,14 @@
# under the License.
from __future__ import annotations
-from typing import TYPE_CHECKING, Any, Literal, TypedDict
+from typing import TYPE_CHECKING, Any, Literal, NamedTuple, TypedDict
+
+if TYPE_CHECKING:
+ from collections.abc import Sequence
+
+ from airflow.models.taskinstance import TaskInstanceKey
+ from airflow.utils.state import TaskInstanceState
+
ADOPTED = "adopted"
@@ -35,27 +42,40 @@ class FailureDetails(TypedDict, total=False):
container_name: str | None
-if TYPE_CHECKING:
- from collections.abc import Sequence
+class KubernetesResults(NamedTuple):
+ """Results from Kubernetes task execution."""
- from airflow.models.taskinstance import TaskInstanceKey
- from airflow.utils.state import TaskInstanceState
+ key: TaskInstanceKey
+ state: TaskInstanceState | str | None
+ pod_name: str
+ namespace: str
+ resource_version: str
+ failure_details: FailureDetails | None
+
+
+class KubernetesWatch(NamedTuple):
+ """Watch event data from Kubernetes pods."""
+
+ pod_name: str
+ namespace: str
+ state: TaskInstanceState | str | None
+ annotations: dict[str, str]
+ resource_version: str
+ failure_details: FailureDetails | None
+
+
+# TODO: Remove after Airflow 2 support is removed
+CommandType = "Sequence[str]"
- # TODO: Remove after Airflow 2 support is removed
- CommandType = Sequence[str]
- # TaskInstance key, command, configuration, pod_template_file
- KubernetesJobType = tuple[TaskInstanceKey, CommandType, Any, str | None]
+class KubernetesJob(NamedTuple):
+ """Job definition for Kubernetes execution."""
- # key, pod state, pod_name, namespace, resource_version, failure_details
- KubernetesResultsType = tuple[
- TaskInstanceKey, TaskInstanceState | str | None, str, str, str,
FailureDetails | None
- ]
+ key: TaskInstanceKey
+ command: Sequence[str]
+ kube_executor_config: Any
+ pod_template_file: str | None
- # pod_name, namespace, pod state, annotations, resource_version,
failure_details
- KubernetesWatchType = tuple[
- str, str, TaskInstanceState | str | None, dict[str, str], str,
FailureDetails | None
- ]
ALL_NAMESPACES = "ALL_NAMESPACES"
POD_EXECUTOR_DONE_KEY = "airflow_executor_done"
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
index 43a03ada7d9..1ad05150b89 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
@@ -35,6 +35,9 @@ from
airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types impor
POD_EXECUTOR_DONE_KEY,
POD_REVOKED_KEY,
FailureDetails,
+ KubernetesJob,
+ KubernetesResults,
+ KubernetesWatch,
)
from airflow.providers.cncf.kubernetes.kube_client import get_kube_client
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import (
@@ -50,12 +53,6 @@ from airflow.utils.state import TaskInstanceState
if TYPE_CHECKING:
from kubernetes.client import Configuration, models as k8s
- from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types
import (
- KubernetesJobType,
- KubernetesResultsType,
- KubernetesWatchType,
- )
-
class ResourceVersion(metaclass=Singleton):
"""Singleton for tracking resourceVersion from Kubernetes."""
@@ -69,7 +66,7 @@ class KubernetesJobWatcher(multiprocessing.Process,
LoggingMixin):
def __init__(
self,
namespace: str,
- watcher_queue: Queue[KubernetesWatchType],
+ watcher_queue: Queue[KubernetesWatch],
resource_version: str | None,
scheduler_job_id: str,
kube_config: Configuration,
@@ -217,7 +214,9 @@ class KubernetesJobWatcher(multiprocessing.Process,
LoggingMixin):
# So, there is no change in the pod state.
# However, need to free the executor slot from the current
executor.
self.log.info("Event: pod %s adopted, annotations: %s", pod_name,
annotations_string)
- self.watcher_queue.put((pod_name, namespace, ADOPTED, annotations,
resource_version, None))
+ self.watcher_queue.put(
+ KubernetesWatch(pod_name, namespace, ADOPTED, annotations,
resource_version, None)
+ )
elif hasattr(pod.status, "reason") and pod.status.reason ==
"ProviderFailed":
# Most likely this happens due to Kubernetes setup (virtual
kubelet, virtual nodes, etc.)
key = annotations_to_key(annotations=annotations)
@@ -229,7 +228,7 @@ class KubernetesJobWatcher(multiprocessing.Process,
LoggingMixin):
annotations_string,
)
self.watcher_queue.put(
- (
+ KubernetesWatch(
pod_name,
namespace,
TaskInstanceState.FAILED,
@@ -277,7 +276,7 @@ class KubernetesJobWatcher(multiprocessing.Process,
LoggingMixin):
task_key_str,
)
self.watcher_queue.put(
- (
+ KubernetesWatch(
pod_name,
namespace,
TaskInstanceState.FAILED,
@@ -306,7 +305,7 @@ class KubernetesJobWatcher(multiprocessing.Process,
LoggingMixin):
"Event: %s Failed, task: %s, annotations: %s", pod_name,
task_key_str, annotations_string
)
self.watcher_queue.put(
- (
+ KubernetesWatch(
pod_name,
namespace,
TaskInstanceState.FAILED,
@@ -317,7 +316,9 @@ class KubernetesJobWatcher(multiprocessing.Process,
LoggingMixin):
)
elif status == "Succeeded":
self.log.info("Event: %s Succeeded, annotations: %s", pod_name,
annotations_string)
- self.watcher_queue.put((pod_name, namespace, None, annotations,
resource_version, None))
+ self.watcher_queue.put(
+ KubernetesWatch(pod_name, namespace, None, annotations,
resource_version, None)
+ )
elif status == "Running":
# deletion_timestamp is set by kube server when a graceful
deletion is requested.
# since kube server have received request to delete pod set TI
state failed
@@ -328,7 +329,7 @@ class KubernetesJobWatcher(multiprocessing.Process,
LoggingMixin):
annotations_string,
)
self.watcher_queue.put(
- (
+ KubernetesWatch(
pod_name,
namespace,
TaskInstanceState.FAILED,
@@ -466,7 +467,7 @@ class AirflowKubernetesScheduler(LoggingMixin):
def __init__(
self,
kube_config: Any,
- result_queue: Queue[KubernetesResultsType],
+ result_queue: Queue[KubernetesResults],
kube_client: client.CoreV1Api,
scheduler_job_id: str,
):
@@ -540,9 +541,12 @@ class AirflowKubernetesScheduler(LoggingMixin):
ResourceVersion().resource_version[namespace] = "0"
self.kube_watchers[namespace] =
self._make_kube_watcher(namespace)
- def run_next(self, next_job: KubernetesJobType) -> None:
+ def run_next(self, next_job: KubernetesJob) -> None:
"""Receives the next job to run, builds the pod, and creates it."""
- key, command, kube_executor_config, pod_template_file = next_job
+ key = next_job.key
+ command = next_job.command
+ kube_executor_config = next_job.kube_executor_config
+ pod_template_file = next_job.pod_template_file
dag_id, task_id, run_id, try_number, map_index = key
if len(command) == 1:
@@ -660,19 +664,27 @@ class AirflowKubernetesScheduler(LoggingMixin):
finally:
self.watcher_queue.task_done()
- def process_watcher_task(self, task: KubernetesWatchType) -> None:
+ def process_watcher_task(self, task: KubernetesWatch) -> None:
"""Process the task by watcher."""
- pod_name, namespace, state, annotations, resource_version,
failure_details = task
self.log.debug(
"Attempting to finish pod; pod_name: %s; state: %s; annotations:
%s",
- pod_name,
- state,
- annotations_for_logging_task_metadata(annotations),
+ task.pod_name,
+ task.state,
+ annotations_for_logging_task_metadata(task.annotations),
)
- key = annotations_to_key(annotations=annotations)
+ key = annotations_to_key(annotations=task.annotations)
if key:
- self.log.debug("finishing job %s - %s (%s)", key, state, pod_name)
- self.result_queue.put((key, state, pod_name, namespace,
resource_version, failure_details))
+ self.log.debug("finishing job %s - %s (%s)", key, task.state,
task.pod_name)
+ self.result_queue.put(
+ KubernetesResults(
+ key,
+ task.state,
+ task.pod_name,
+ task.namespace,
+ task.resource_version,
+ task.failure_details,
+ )
+ )
def _flush_watcher_queue(self) -> None:
self.log.debug("Executor shutting down, watcher_queue approx.
size=%d", self.watcher_queue.qsize())
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
index 2ad7eeb7286..3d6bb97167f 100644
---
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -38,6 +38,8 @@ from
airflow.providers.cncf.kubernetes.executors.kubernetes_executor import (
)
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types
import (
ADOPTED,
+ KubernetesResults,
+ KubernetesWatch,
)
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils
import (
AirflowKubernetesScheduler,
@@ -692,7 +694,8 @@ class TestKubernetesExecutor:
try:
key = TaskInstanceKey(dag_id="dag_id", task_id="task_id",
run_id="run_id", try_number=1)
executor.running = {key}
- executor._change_state(key, State.RUNNING, "pod_name", "default")
+ results = KubernetesResults(key, State.RUNNING, "pod_name",
"default", "resource_version", None)
+ executor._change_state(results)
assert executor.event_buffer[key][0] == State.RUNNING
assert executor.running == {key}
finally:
@@ -710,7 +713,8 @@ class TestKubernetesExecutor:
try:
key = TaskInstanceKey(dag_id="dag_id", task_id="task_id",
run_id="run_id", try_number=2)
executor.running = {key}
- executor._change_state(key, State.SUCCESS, "pod_name", "default")
+ results = KubernetesResults(key, State.SUCCESS, "pod_name",
"default", "resource_version", None)
+ executor._change_state(results)
assert executor.event_buffer[key][0] == State.SUCCESS
assert executor.running == set()
mock_delete_pod.assert_called_once_with(pod_name="pod_name",
namespace="default")
@@ -735,7 +739,10 @@ class TestKubernetesExecutor:
try:
key = TaskInstanceKey(dag_id="dag_id", task_id="task_id",
run_id="run_id", try_number=3)
executor.running = {key}
- executor._change_state(key, State.FAILED, "pod_id",
"test-namespace")
+ results = KubernetesResults(
+ key, State.FAILED, "pod_id", "test-namespace",
"resource_version", None
+ )
+ executor._change_state(results)
assert executor.event_buffer[key][0] == State.FAILED
assert executor.running == set()
mock_delete_pod.assert_not_called()
@@ -767,7 +774,8 @@ class TestKubernetesExecutor:
ti = create_task_instance(state=ti_state)
key = ti.key
executor.running = {key}
- executor._change_state(key, None, "pod_name", "default")
+ results = KubernetesResults(key, None, "pod_name", "default",
"resource_version", None)
+ executor._change_state(results)
assert executor.event_buffer[key][0] == ti_state
assert executor.running == set()
mock_delete_pod.assert_called_once_with(pod_name="pod_name",
namespace="default")
@@ -786,7 +794,8 @@ class TestKubernetesExecutor:
try:
key = TaskInstanceKey(dag_id="dag_id", task_id="task_id",
run_id="run_id", try_number=2)
executor.running = {key}
- executor._change_state(key, ADOPTED, "pod_name", "default")
+ results = KubernetesResults(key, ADOPTED, "pod_name", "default",
"resource_version", None)
+ executor._change_state(results)
assert len(executor.event_buffer) == 0
assert len(executor.running) == 0
mock_delete_pod.assert_not_called()
@@ -802,7 +811,8 @@ class TestKubernetesExecutor:
try:
key = TaskInstanceKey(dag_id="dag_id", task_id="task_id",
run_id="run_id", try_number=1)
executor.running = set()
- executor._change_state(key, State.SUCCESS, "pod_name", "default")
+ results = KubernetesResults(key, State.SUCCESS, "pod_name",
"default", "resource_version", None)
+ executor._change_state(results)
assert executor.event_buffer.get(key) is None
assert executor.running == set()
finally:
@@ -850,7 +860,10 @@ class TestKubernetesExecutor:
try:
key = TaskInstanceKey(dag_id="dag_id", task_id="task_id",
run_id="run_id", try_number=2)
executor.running = {key}
- executor._change_state(key, State.SUCCESS, "pod_name",
"test-namespace")
+ results = KubernetesResults(
+ key, State.SUCCESS, "pod_name", "test-namespace",
"resource_version", None
+ )
+ executor._change_state(results)
assert executor.event_buffer[key][0] == State.SUCCESS
assert executor.running == set()
mock_delete_pod.assert_not_called()
@@ -876,7 +889,10 @@ class TestKubernetesExecutor:
try:
key = TaskInstanceKey(dag_id="dag_id", task_id="task_id",
run_id="run_id", try_number=2)
executor.running = {key}
- executor._change_state(key, State.FAILED, "pod_name",
"test-namespace")
+ results = KubernetesResults(
+ key, State.FAILED, "pod_name", "test-namespace",
"resource_version", None
+ )
+ executor._change_state(results)
assert executor.event_buffer[key][0] == State.FAILED
assert executor.running == set()
mock_delete_pod.assert_called_once_with(pod_name="pod_name",
namespace="test-namespace")
@@ -1492,7 +1508,7 @@ class TestKubernetesJobWatcher:
def assert_watcher_queue_called_once_with_state(self, state):
self.watcher.watcher_queue.put.assert_called_once_with(
- (
+ KubernetesWatch(
self.pod.metadata.name,
self.watcher.namespace,
state,
@@ -1729,7 +1745,7 @@ class TestKubernetesJobWatcher:
self._run()
self.watcher.watcher_queue.put.assert_called_once_with(
- (
+ KubernetesWatch(
self.pod.metadata.name,
self.watcher.namespace,
ADOPTED,