This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 49108e15eb Kubernetes executor running slots leak fix (#36240)
49108e15eb is described below
commit 49108e15eb2eb30e2ccb95c9332db7b38d35f2de
Author: Gopal Dirisala <[email protected]>
AuthorDate: Wed Dec 20 20:42:40 2023 +0530
Kubernetes executor running slots leak fix (#36240)
---------
Co-authored-by: gopal <[email protected]>
---
.../kubernetes/executors/kubernetes_executor.py | 16 ++++++-
.../executors/kubernetes_executor_types.py | 7 ++--
.../executors/kubernetes_executor_utils.py | 9 +++-
.../executors/test_kubernetes_executor.py | 49 ++++++++++++++++++++--
4 files changed, 71 insertions(+), 10 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index 6e32c00473..a5d911f8ce 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -75,7 +75,10 @@ except ImportError:
raise
from airflow.configuration import conf
from airflow.executors.base_executor import BaseExecutor
-from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types
import POD_EXECUTOR_DONE_KEY
+from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types
import (
+ ADOPTED,
+ POD_EXECUTOR_DONE_KEY,
+)
from airflow.providers.cncf.kubernetes.kube_config import KubeConfig
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import
annotations_to_key
from airflow.utils.event_scheduler import EventScheduler
@@ -463,7 +466,7 @@ class KubernetesExecutor(BaseExecutor):
def _change_state(
self,
key: TaskInstanceKey,
- state: TaskInstanceState | None,
+ state: TaskInstanceState | str | None,
pod_name: str,
namespace: str,
session: Session = NEW_SESSION,
@@ -471,6 +474,15 @@ class KubernetesExecutor(BaseExecutor):
if TYPE_CHECKING:
assert self.kube_scheduler
+ if state == ADOPTED:
+ # When the task pod is adopted by another executor,
+ # then remove the task from the current executor running queue.
+ try:
+ self.running.remove(key)
+ except KeyError:
+ self.log.debug("TI key not in running: %s", key)
+ return
+
if state == TaskInstanceState.RUNNING:
self.event_buffer[key] = state, None
return
diff --git
a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py
index 80b8f1de72..4229136298 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py
@@ -16,8 +16,9 @@
# under the License.
from __future__ import annotations
-from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union
+ADOPTED = "adopted"
if TYPE_CHECKING:
from airflow.executors.base_executor import CommandType
from airflow.models.taskinstance import TaskInstanceKey
@@ -27,10 +28,10 @@ if TYPE_CHECKING:
KubernetesJobType = Tuple[TaskInstanceKey, CommandType, Any, Optional[str]]
# key, pod state, pod_name, namespace, resource_version
- KubernetesResultsType = Tuple[TaskInstanceKey,
Optional[TaskInstanceState], str, str, str]
+ KubernetesResultsType = Tuple[TaskInstanceKey,
Optional[Union[TaskInstanceState, str]], str, str, str]
# pod_name, namespace, pod state, annotations, resource_version
- KubernetesWatchType = Tuple[str, str, Optional[TaskInstanceState],
Dict[str, str], str]
+ KubernetesWatchType = Tuple[str, str, Optional[Union[TaskInstanceState,
str]], Dict[str, str], str]
ALL_NAMESPACES = "ALL_NAMESPACES"
POD_EXECUTOR_DONE_KEY = "airflow_executor_done"
diff --git
a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
index 074b65d198..09b47aea19 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
@@ -40,6 +40,7 @@ from airflow.utils.state import TaskInstanceState
try:
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types
import (
+ ADOPTED,
ALL_NAMESPACES,
POD_EXECUTOR_DONE_KEY,
)
@@ -220,7 +221,13 @@ class KubernetesJobWatcher(multiprocessing.Process,
LoggingMixin):
pod = event["object"]
annotations_string = annotations_for_logging_task_metadata(annotations)
"""Process status response."""
- if status == "Pending":
+ if event["type"] == "DELETED" and not pod.metadata.deletion_timestamp:
+ # This will happen only when the task pods are adopted by another
executor.
+ # So, there is no change in the pod state.
+ # However, need to free the executor slot from the current
executor.
+ self.log.info("Event: pod %s adopted, annotations: %s", pod_name,
annotations_string)
+ self.watcher_queue.put((pod_name, namespace, ADOPTED, annotations,
resource_version))
+ elif status == "Pending":
# deletion_timestamp is set by kube server when a graceful
deletion is requested.
# since kube server have received request to delete pod set TI
state failed
if event["type"] == "DELETED" and pod.metadata.deletion_timestamp:
diff --git
a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
index a0b187087a..15b17515c1 100644
--- a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -44,7 +44,10 @@ try:
KubernetesExecutor,
PodReconciliationError,
)
- from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types
import POD_EXECUTOR_DONE_KEY
+ from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types
import (
+ ADOPTED,
+ POD_EXECUTOR_DONE_KEY,
+ )
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils
import (
AirflowKubernetesScheduler,
KubernetesJobWatcher,
@@ -644,6 +647,25 @@ class TestKubernetesExecutor:
finally:
executor.end()
+ @pytest.mark.db_test
+
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")
+
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
+ @mock.patch(
+
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.AirflowKubernetesScheduler.delete_pod"
+ )
+ def test_change_state_adopted(self, mock_delete_pod, mock_get_kube_client,
mock_kubernetes_job_watcher):
+ executor = self.kubernetes_executor
+ executor.start()
+ try:
+ key = ("dag_id", "task_id", "run_id", "try_number2")
+ executor.running = {key}
+ executor._change_state(key, ADOPTED, "pod_name", "default")
+ assert len(executor.event_buffer) == 0
+ assert len(executor.running) == 0
+ mock_delete_pod.assert_not_called()
+ finally:
+ executor.end()
+
@pytest.mark.db_test
@pytest.mark.parametrize(
"multi_namespace_mode_namespace_list, watchers_keys",
@@ -1431,12 +1453,31 @@ class TestKubernetesJobWatcher:
self._run()
self.watcher.watcher_queue.put.assert_not_called()
- def test_process_status_succeeded_type_delete(self):
- self.pod.status.phase = "Succeeded"
+ @pytest.mark.parametrize(
+ "ti_state",
+ [
+ TaskInstanceState.SUCCESS,
+ TaskInstanceState.FAILED,
+ TaskInstanceState.RUNNING,
+ TaskInstanceState.QUEUED,
+ TaskInstanceState.UP_FOR_RETRY,
+ ],
+ )
+ def test_process_status_pod_adopted(self, ti_state):
+ self.pod.status.phase = ti_state
self.events.append({"type": "DELETED", "object": self.pod})
+ self.pod.metadata.deletion_timestamp = None
self._run()
- self.watcher.watcher_queue.put.assert_not_called()
+ self.watcher.watcher_queue.put.assert_called_once_with(
+ (
+ self.pod.metadata.name,
+ self.watcher.namespace,
+ ADOPTED,
+ self.core_annotations,
+ self.pod.metadata.resource_version,
+ )
+ )
def test_process_status_running_deleted(self):
self.pod.status.phase = "Running"