This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 700b43c0731 Refactor Kubernetes Executor Types to NamedTuples (#54858)
700b43c0731 is described below

commit 700b43c0731c8c6c90604c1c3066fd30004b50dd
Author: Gary Hsu <[email protected]>
AuthorDate: Wed Aug 27 16:58:31 2025 +0800

    Refactor Kubernetes Executor Types to NamedTuples (#54858)
---
 .../kubernetes_tests/test_kubernetes_executor.py   | 31 ++++++++---
 .../kubernetes/executors/kubernetes_executor.py    | 51 +++++++++---------
 .../executors/kubernetes_executor_types.py         | 54 +++++++++++++------
 .../executors/kubernetes_executor_utils.py         | 60 +++++++++++++---------
 .../executors/test_kubernetes_executor.py          | 36 +++++++++----
 5 files changed, 149 insertions(+), 83 deletions(-)

diff --git 
a/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py 
b/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py
index 4d14c75864d..f7184acaab6 100644
--- a/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py
+++ b/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py
@@ -24,6 +24,7 @@ import pytest
 if TYPE_CHECKING:
     from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types 
import FailureDetails
 
+from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types 
import KubernetesResults
 from kubernetes_tests.test_base import (
     EXECUTOR,
     BaseK8STest,  # isort:skip (needed to workaround isort bug)
@@ -138,15 +139,19 @@ class TestKubernetesExecutor(BaseK8STest):
         # Create a test task key
         task_key = TaskInstanceKey(dag_id="test_dag", task_id="test_task", 
run_id="test_run", try_number=1)
 
-        # Call _change_state with FAILED status and failure details
-        executor._change_state(
+        # Create KubernetesResults object
+        results = KubernetesResults(
             key=task_key,
             state=TaskInstanceState.FAILED,
             pod_name="test-pod",
             namespace="test-namespace",
+            resource_version="123",
             failure_details=failure_details,
         )
 
+        # Call _change_state with KubernetesResults object
+        executor._change_state(results)
+
         # Verify that the warning log was called with expected parameters
         mock_log.warning.assert_called_once_with(
             "Task %s failed in pod %s/%s. Pod phase: %s, reason: %s, message: 
%s, "
@@ -181,15 +186,19 @@ class TestKubernetesExecutor(BaseK8STest):
         # Create a test task key
         task_key = TaskInstanceKey(dag_id="test_dag", task_id="test_task", 
run_id="test_run", try_number=1)
 
-        # Call _change_state with FAILED status but no failure details
-        executor._change_state(
+        # Create KubernetesResults object without failure details
+        results = KubernetesResults(
             key=task_key,
             state=TaskInstanceState.FAILED,
             pod_name="test-pod",
             namespace="test-namespace",
+            resource_version="123",
             failure_details=None,
         )
 
+        # Call _change_state with KubernetesResults object
+        executor._change_state(results)
+
         # Verify that the warning log was called with the correct parameters
         mock_log.warning.assert_called_once_with(
             "Task %s failed in pod %s/%s (no details available)",
@@ -214,11 +223,19 @@ class TestKubernetesExecutor(BaseK8STest):
         # Create a test task key
         task_key = TaskInstanceKey(dag_id="test_dag", task_id="test_task", 
run_id="test_run", try_number=1)
 
-        # Call _change_state with SUCCESS status
-        executor._change_state(
-            key=task_key, state=TaskInstanceState.SUCCESS, 
pod_name="test-pod", namespace="test-namespace"
+        # Create KubernetesResults object with SUCCESS state
+        results = KubernetesResults(
+            key=task_key,
+            state=TaskInstanceState.SUCCESS,
+            pod_name="test-pod",
+            namespace="test-namespace",
+            resource_version="123",
+            failure_details=None,
         )
 
+        # Call _change_state with KubernetesResults object
+        executor._change_state(results)
+
         # Verify that no failure logs were called
         mock_log.error.assert_not_called()
         mock_log.warning.assert_not_called()
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index d2f6c1b5df6..ee5dfa9db73 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -66,7 +66,8 @@ from airflow.providers.cncf.kubernetes.exceptions import 
PodMutationHookExceptio
 from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types 
import (
     ADOPTED,
     POD_EXECUTOR_DONE_KEY,
-    FailureDetails,
+    KubernetesJob,
+    KubernetesResults,
 )
 from airflow.providers.cncf.kubernetes.kube_config import KubeConfig
 from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import 
annotations_to_key
@@ -86,10 +87,6 @@ if TYPE_CHECKING:
     from airflow.executors import workloads
     from airflow.models.taskinstance import TaskInstance
     from airflow.models.taskinstancekey import TaskInstanceKey
-    from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types 
import (
-        KubernetesJobType,
-        KubernetesResultsType,
-    )
     from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils 
import (
         AirflowKubernetesScheduler,
     )
@@ -157,8 +154,8 @@ class KubernetesExecutor(BaseExecutor):
     def __init__(self):
         self.kube_config = KubeConfig()
         self._manager = multiprocessing.Manager()
-        self.task_queue: Queue[KubernetesJobType] = self._manager.Queue()
-        self.result_queue: Queue[KubernetesResultsType] = self._manager.Queue()
+        self.task_queue: Queue[KubernetesJob] = self._manager.Queue()
+        self.result_queue: Queue[KubernetesResults] = self._manager.Queue()
         self.kube_scheduler: AirflowKubernetesScheduler | None = None
         self.kube_client: client.CoreV1Api | None = None
         self.scheduler_job_id: str | None = None
@@ -280,7 +277,7 @@ class KubernetesExecutor(BaseExecutor):
         else:
             pod_template_file = None
         self.event_buffer[key] = (TaskInstanceState.QUEUED, 
self.scheduler_job_id)
-        self.task_queue.put((key, command, kube_executor_config, 
pod_template_file))
+        self.task_queue.put(KubernetesJob(key, command, kube_executor_config, 
pod_template_file))
         # We keep a temporary local record that we've handled this so we don't
         # try and remove it from the QUEUED state while we process it
         self.last_handled[key] = time.time()
@@ -331,17 +328,16 @@ class KubernetesExecutor(BaseExecutor):
             while True:
                 results = self.result_queue.get_nowait()
                 try:
-                    key, state, pod_name, namespace, resource_version, 
failure_details = results
-                    last_resource_version[namespace] = resource_version
-                    self.log.info("Changing state of %s to %s", results, state)
+                    last_resource_version[results.namespace] = 
results.resource_version
+                    self.log.info("Changing state of %s to %s", results, 
results.state)
                     try:
-                        self._change_state(key, state, pod_name, namespace, 
failure_details)
+                        self._change_state(results)
                     except Exception as e:
                         self.log.exception(
                             "Exception: %s when attempting to change state of 
%s to %s, re-queueing.",
                             e,
                             results,
-                            state,
+                            results.state,
                         )
                         self.result_queue.put(results)
                 finally:
@@ -362,7 +358,7 @@ class KubernetesExecutor(BaseExecutor):
                 task = self.task_queue.get_nowait()
 
                 try:
-                    key, command, kube_executor_config, pod_template_file = 
task
+                    key = task.key
                     self.kube_scheduler.run_next(task)
                     self.task_publish_retries.pop(key, None)
                 except PodReconciliationError as e:
@@ -391,11 +387,11 @@ class KubernetesExecutor(BaseExecutor):
                         self.task_publish_retries[key] = retries + 1
                     else:
                         self.log.error("Pod creation failed with reason %r. 
Failing task", e.reason)
-                        key, _, _, _ = task
+                        key = task.key
                         self.fail(key, e)
                         self.task_publish_retries.pop(key, None)
                 except PodMutationHookException as e:
-                    key, _, _, _ = task
+                    key = task.key
                     self.log.error(
                         "Pod Mutation Hook failed for the task %s. Failing 
task. Details: %s",
                         key,
@@ -408,16 +404,19 @@ class KubernetesExecutor(BaseExecutor):
     @provide_session
     def _change_state(
         self,
-        key: TaskInstanceKey,
-        state: TaskInstanceState | str | None,
-        pod_name: str,
-        namespace: str,
-        failure_details: FailureDetails | None = None,
+        results: KubernetesResults,
         session: Session = NEW_SESSION,
     ) -> None:
+        """Change state of the task based on KubernetesResults."""
         if TYPE_CHECKING:
             assert self.kube_scheduler
 
+        key = results.key
+        state = results.state
+        pod_name = results.pod_name
+        namespace = results.namespace
+        failure_details = results.failure_details
+
         if state == TaskInstanceState.FAILED:
             # Use pre-collected failure details from the watcher to avoid 
additional API calls
             if failure_details:
@@ -734,18 +733,20 @@ class KubernetesExecutor(BaseExecutor):
                 results = self.result_queue.get_nowait()
                 self.log.warning("Executor shutting down, flushing 
results=%s", results)
                 try:
-                    key, state, pod_name, namespace, resource_version, 
failure_details = results
                     self.log.info(
-                        "Changing state of %s to %s : resource_version=%d", 
results, state, resource_version
+                        "Changing state of %s to %s : resource_version=%s",
+                        results,
+                        results.state,
+                        results.resource_version,
                     )
                     try:
-                        self._change_state(key, state, pod_name, namespace, 
failure_details)
+                        self._change_state(results)
                     except Exception as e:
                         self.log.exception(
                             "Ignoring exception: %s when attempting to change 
state of %s to %s.",
                             e,
                             results,
-                            state,
+                            results.state,
                         )
                 finally:
                     self.result_queue.task_done()
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py
index a475959e588..f8e03f1f04c 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py
@@ -16,7 +16,14 @@
 # under the License.
 from __future__ import annotations
 
-from typing import TYPE_CHECKING, Any, Literal, TypedDict
+from typing import TYPE_CHECKING, Any, Literal, NamedTuple, TypedDict
+
+if TYPE_CHECKING:
+    from collections.abc import Sequence
+
+    from airflow.models.taskinstance import TaskInstanceKey
+    from airflow.utils.state import TaskInstanceState
+
 
 ADOPTED = "adopted"
 
@@ -35,27 +42,40 @@ class FailureDetails(TypedDict, total=False):
     container_name: str | None
 
 
-if TYPE_CHECKING:
-    from collections.abc import Sequence
+class KubernetesResults(NamedTuple):
+    """Results from Kubernetes task execution."""
 
-    from airflow.models.taskinstance import TaskInstanceKey
-    from airflow.utils.state import TaskInstanceState
+    key: TaskInstanceKey
+    state: TaskInstanceState | str | None
+    pod_name: str
+    namespace: str
+    resource_version: str
+    failure_details: FailureDetails | None
+
+
+class KubernetesWatch(NamedTuple):
+    """Watch event data from Kubernetes pods."""
+
+    pod_name: str
+    namespace: str
+    state: TaskInstanceState | str | None
+    annotations: dict[str, str]
+    resource_version: str
+    failure_details: FailureDetails | None
+
+
+# TODO: Remove after Airflow 2 support is removed
+CommandType = "Sequence[str]"
 
-    # TODO: Remove after Airflow 2 support is removed
-    CommandType = Sequence[str]
 
-    # TaskInstance key, command, configuration, pod_template_file
-    KubernetesJobType = tuple[TaskInstanceKey, CommandType, Any, str | None]
+class KubernetesJob(NamedTuple):
+    """Job definition for Kubernetes execution."""
 
-    # key, pod state, pod_name, namespace, resource_version, failure_details
-    KubernetesResultsType = tuple[
-        TaskInstanceKey, TaskInstanceState | str | None, str, str, str, 
FailureDetails | None
-    ]
+    key: TaskInstanceKey
+    command: Sequence[str]
+    kube_executor_config: Any
+    pod_template_file: str | None
 
-    # pod_name, namespace, pod state, annotations, resource_version, 
failure_details
-    KubernetesWatchType = tuple[
-        str, str, TaskInstanceState | str | None, dict[str, str], str, 
FailureDetails | None
-    ]
 
 ALL_NAMESPACES = "ALL_NAMESPACES"
 POD_EXECUTOR_DONE_KEY = "airflow_executor_done"
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
index 43a03ada7d9..1ad05150b89 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
@@ -35,6 +35,9 @@ from 
airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types impor
     POD_EXECUTOR_DONE_KEY,
     POD_REVOKED_KEY,
     FailureDetails,
+    KubernetesJob,
+    KubernetesResults,
+    KubernetesWatch,
 )
 from airflow.providers.cncf.kubernetes.kube_client import get_kube_client
 from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import (
@@ -50,12 +53,6 @@ from airflow.utils.state import TaskInstanceState
 if TYPE_CHECKING:
     from kubernetes.client import Configuration, models as k8s
 
-    from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types 
import (
-        KubernetesJobType,
-        KubernetesResultsType,
-        KubernetesWatchType,
-    )
-
 
 class ResourceVersion(metaclass=Singleton):
     """Singleton for tracking resourceVersion from Kubernetes."""
@@ -69,7 +66,7 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
     def __init__(
         self,
         namespace: str,
-        watcher_queue: Queue[KubernetesWatchType],
+        watcher_queue: Queue[KubernetesWatch],
         resource_version: str | None,
         scheduler_job_id: str,
         kube_config: Configuration,
@@ -217,7 +214,9 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
             # So, there is no change in the pod state.
             # However, need to free the executor slot from the current 
executor.
             self.log.info("Event: pod %s adopted, annotations: %s", pod_name, 
annotations_string)
-            self.watcher_queue.put((pod_name, namespace, ADOPTED, annotations, 
resource_version, None))
+            self.watcher_queue.put(
+                KubernetesWatch(pod_name, namespace, ADOPTED, annotations, 
resource_version, None)
+            )
         elif hasattr(pod.status, "reason") and pod.status.reason == 
"ProviderFailed":
             # Most likely this happens due to Kubernetes setup (virtual 
kubelet, virtual nodes, etc.)
             key = annotations_to_key(annotations=annotations)
@@ -229,7 +228,7 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
                 annotations_string,
             )
             self.watcher_queue.put(
-                (
+                KubernetesWatch(
                     pod_name,
                     namespace,
                     TaskInstanceState.FAILED,
@@ -277,7 +276,7 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
                                 task_key_str,
                             )
                             self.watcher_queue.put(
-                                (
+                                KubernetesWatch(
                                     pod_name,
                                     namespace,
                                     TaskInstanceState.FAILED,
@@ -306,7 +305,7 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
                 "Event: %s Failed, task: %s, annotations: %s", pod_name, 
task_key_str, annotations_string
             )
             self.watcher_queue.put(
-                (
+                KubernetesWatch(
                     pod_name,
                     namespace,
                     TaskInstanceState.FAILED,
@@ -317,7 +316,9 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
             )
         elif status == "Succeeded":
             self.log.info("Event: %s Succeeded, annotations: %s", pod_name, 
annotations_string)
-            self.watcher_queue.put((pod_name, namespace, None, annotations, 
resource_version, None))
+            self.watcher_queue.put(
+                KubernetesWatch(pod_name, namespace, None, annotations, 
resource_version, None)
+            )
         elif status == "Running":
             # deletion_timestamp is set by kube server when a graceful 
deletion is requested.
             # since kube server have received request to delete pod set TI 
state failed
@@ -328,7 +329,7 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
                     annotations_string,
                 )
                 self.watcher_queue.put(
-                    (
+                    KubernetesWatch(
                         pod_name,
                         namespace,
                         TaskInstanceState.FAILED,
@@ -466,7 +467,7 @@ class AirflowKubernetesScheduler(LoggingMixin):
     def __init__(
         self,
         kube_config: Any,
-        result_queue: Queue[KubernetesResultsType],
+        result_queue: Queue[KubernetesResults],
         kube_client: client.CoreV1Api,
         scheduler_job_id: str,
     ):
@@ -540,9 +541,12 @@ class AirflowKubernetesScheduler(LoggingMixin):
                 ResourceVersion().resource_version[namespace] = "0"
                 self.kube_watchers[namespace] = 
self._make_kube_watcher(namespace)
 
-    def run_next(self, next_job: KubernetesJobType) -> None:
+    def run_next(self, next_job: KubernetesJob) -> None:
         """Receives the next job to run, builds the pod, and creates it."""
-        key, command, kube_executor_config, pod_template_file = next_job
+        key = next_job.key
+        command = next_job.command
+        kube_executor_config = next_job.kube_executor_config
+        pod_template_file = next_job.pod_template_file
 
         dag_id, task_id, run_id, try_number, map_index = key
         if len(command) == 1:
@@ -660,19 +664,27 @@ class AirflowKubernetesScheduler(LoggingMixin):
                 finally:
                     self.watcher_queue.task_done()
 
-    def process_watcher_task(self, task: KubernetesWatchType) -> None:
+    def process_watcher_task(self, task: KubernetesWatch) -> None:
         """Process the task by watcher."""
-        pod_name, namespace, state, annotations, resource_version, 
failure_details = task
         self.log.debug(
             "Attempting to finish pod; pod_name: %s; state: %s; annotations: 
%s",
-            pod_name,
-            state,
-            annotations_for_logging_task_metadata(annotations),
+            task.pod_name,
+            task.state,
+            annotations_for_logging_task_metadata(task.annotations),
         )
-        key = annotations_to_key(annotations=annotations)
+        key = annotations_to_key(annotations=task.annotations)
         if key:
-            self.log.debug("finishing job %s - %s (%s)", key, state, pod_name)
-            self.result_queue.put((key, state, pod_name, namespace, 
resource_version, failure_details))
+            self.log.debug("finishing job %s - %s (%s)", key, task.state, 
task.pod_name)
+            self.result_queue.put(
+                KubernetesResults(
+                    key,
+                    task.state,
+                    task.pod_name,
+                    task.namespace,
+                    task.resource_version,
+                    task.failure_details,
+                )
+            )
 
     def _flush_watcher_queue(self) -> None:
         self.log.debug("Executor shutting down, watcher_queue approx. 
size=%d", self.watcher_queue.qsize())
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
index 2ad7eeb7286..3d6bb97167f 100644
--- 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -38,6 +38,8 @@ from 
airflow.providers.cncf.kubernetes.executors.kubernetes_executor import (
 )
 from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types 
import (
     ADOPTED,
+    KubernetesResults,
+    KubernetesWatch,
 )
 from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils 
import (
     AirflowKubernetesScheduler,
@@ -692,7 +694,8 @@ class TestKubernetesExecutor:
         try:
             key = TaskInstanceKey(dag_id="dag_id", task_id="task_id", 
run_id="run_id", try_number=1)
             executor.running = {key}
-            executor._change_state(key, State.RUNNING, "pod_name", "default")
+            results = KubernetesResults(key, State.RUNNING, "pod_name", 
"default", "resource_version", None)
+            executor._change_state(results)
             assert executor.event_buffer[key][0] == State.RUNNING
             assert executor.running == {key}
         finally:
@@ -710,7 +713,8 @@ class TestKubernetesExecutor:
         try:
             key = TaskInstanceKey(dag_id="dag_id", task_id="task_id", 
run_id="run_id", try_number=2)
             executor.running = {key}
-            executor._change_state(key, State.SUCCESS, "pod_name", "default")
+            results = KubernetesResults(key, State.SUCCESS, "pod_name", 
"default", "resource_version", None)
+            executor._change_state(results)
             assert executor.event_buffer[key][0] == State.SUCCESS
             assert executor.running == set()
             mock_delete_pod.assert_called_once_with(pod_name="pod_name", 
namespace="default")
@@ -735,7 +739,10 @@ class TestKubernetesExecutor:
         try:
             key = TaskInstanceKey(dag_id="dag_id", task_id="task_id", 
run_id="run_id", try_number=3)
             executor.running = {key}
-            executor._change_state(key, State.FAILED, "pod_id", 
"test-namespace")
+            results = KubernetesResults(
+                key, State.FAILED, "pod_id", "test-namespace", 
"resource_version", None
+            )
+            executor._change_state(results)
             assert executor.event_buffer[key][0] == State.FAILED
             assert executor.running == set()
             mock_delete_pod.assert_not_called()
@@ -767,7 +774,8 @@ class TestKubernetesExecutor:
             ti = create_task_instance(state=ti_state)
             key = ti.key
             executor.running = {key}
-            executor._change_state(key, None, "pod_name", "default")
+            results = KubernetesResults(key, None, "pod_name", "default", 
"resource_version", None)
+            executor._change_state(results)
             assert executor.event_buffer[key][0] == ti_state
             assert executor.running == set()
             mock_delete_pod.assert_called_once_with(pod_name="pod_name", 
namespace="default")
@@ -786,7 +794,8 @@ class TestKubernetesExecutor:
         try:
             key = TaskInstanceKey(dag_id="dag_id", task_id="task_id", 
run_id="run_id", try_number=2)
             executor.running = {key}
-            executor._change_state(key, ADOPTED, "pod_name", "default")
+            results = KubernetesResults(key, ADOPTED, "pod_name", "default", 
"resource_version", None)
+            executor._change_state(results)
             assert len(executor.event_buffer) == 0
             assert len(executor.running) == 0
             mock_delete_pod.assert_not_called()
@@ -802,7 +811,8 @@ class TestKubernetesExecutor:
         try:
             key = TaskInstanceKey(dag_id="dag_id", task_id="task_id", 
run_id="run_id", try_number=1)
             executor.running = set()
-            executor._change_state(key, State.SUCCESS, "pod_name", "default")
+            results = KubernetesResults(key, State.SUCCESS, "pod_name", 
"default", "resource_version", None)
+            executor._change_state(results)
             assert executor.event_buffer.get(key) is None
             assert executor.running == set()
         finally:
@@ -850,7 +860,10 @@ class TestKubernetesExecutor:
         try:
             key = TaskInstanceKey(dag_id="dag_id", task_id="task_id", 
run_id="run_id", try_number=2)
             executor.running = {key}
-            executor._change_state(key, State.SUCCESS, "pod_name", 
"test-namespace")
+            results = KubernetesResults(
+                key, State.SUCCESS, "pod_name", "test-namespace", 
"resource_version", None
+            )
+            executor._change_state(results)
             assert executor.event_buffer[key][0] == State.SUCCESS
             assert executor.running == set()
             mock_delete_pod.assert_not_called()
@@ -876,7 +889,10 @@ class TestKubernetesExecutor:
         try:
             key = TaskInstanceKey(dag_id="dag_id", task_id="task_id", 
run_id="run_id", try_number=2)
             executor.running = {key}
-            executor._change_state(key, State.FAILED, "pod_name", 
"test-namespace")
+            results = KubernetesResults(
+                key, State.FAILED, "pod_name", "test-namespace", 
"resource_version", None
+            )
+            executor._change_state(results)
             assert executor.event_buffer[key][0] == State.FAILED
             assert executor.running == set()
             mock_delete_pod.assert_called_once_with(pod_name="pod_name", 
namespace="test-namespace")
@@ -1492,7 +1508,7 @@ class TestKubernetesJobWatcher:
 
     def assert_watcher_queue_called_once_with_state(self, state):
         self.watcher.watcher_queue.put.assert_called_once_with(
-            (
+            KubernetesWatch(
                 self.pod.metadata.name,
                 self.watcher.namespace,
                 state,
@@ -1729,7 +1745,7 @@ class TestKubernetesJobWatcher:
 
         self._run()
         self.watcher.watcher_queue.put.assert_called_once_with(
-            (
+            KubernetesWatch(
                 self.pod.metadata.name,
                 self.watcher.namespace,
                 ADOPTED,

Reply via email to