This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8e3abe4180 Fix ``KubernetesPodOperator`` with `KubernetesExecutor`` on
2.3.0 (#23371)
8e3abe4180 is described below
commit 8e3abe418021a3ba241ead1cad79a1c5b492c587
Author: Jed Cunningham <[email protected]>
AuthorDate: Fri Apr 29 15:35:44 2022 -0600
Fix ``KubernetesPodOperator`` with `KubernetesExecutor`` on 2.3.0 (#23371)
KubernetesPodOperator was mistakenly trying to reattach to it's
KubernetesExecutor worker, where it would get stuck watching itself for
logs. We will properly filter for KPO's only, and ignore
KubernetesExecutor workers for good measure.
---
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py | 10 +++++++---
.../providers/cncf/kubernetes/operators/test_kubernetes_pod.py | 5 ++++-
2 files changed, 11 insertions(+), 4 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index 8a79658381..d075915bf7 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -290,7 +290,12 @@ class KubernetesPodOperator(BaseOperator):
ti = context['ti']
run_id = context['run_id']
- labels = {'dag_id': ti.dag_id, 'task_id': ti.task_id, 'run_id': run_id}
+ labels = {
+ 'dag_id': ti.dag_id,
+ 'task_id': ti.task_id,
+ 'run_id': run_id,
+ 'kubernetes_pod_operator': 'True',
+ }
# If running on Airflow 2.3+:
map_index = getattr(ti, 'map_index', -1)
@@ -433,7 +438,7 @@ class KubernetesPodOperator(BaseOperator):
def _build_find_pod_label_selector(self, context: Optional[dict] = None)
-> str:
labels = self._get_ti_pod_labels(context, include_try_number=False)
label_strings = [f'{label_id}={label}' for label_id, label in
sorted(labels.items())]
- return ','.join(label_strings) + f',{self.POD_CHECKED_KEY}!=True'
+ return ','.join(label_strings) +
f',{self.POD_CHECKED_KEY}!=True,!airflow-worker'
def _set_name(self, name):
if name is None:
@@ -541,7 +546,6 @@ class KubernetesPodOperator(BaseOperator):
pod.metadata.labels.update(
{
'airflow_version': airflow_version.replace('+', '-'),
- 'kubernetes_pod_operator': 'True',
}
)
pod_mutation_hook(pod)
diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
index ee2a3cdb43..e70bf88326 100644
--- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
@@ -228,7 +228,10 @@ class TestKubernetesPodOperator:
self.run_pod(k)
self.client_mock.return_value.list_namespaced_pod.assert_called_once()
_, kwargs = self.client_mock.return_value.list_namespaced_pod.call_args
- assert kwargs['label_selector'] ==
'dag_id=dag,run_id=test,task_id=task,already_checked!=True'
+ assert kwargs['label_selector'] == (
+ 'dag_id=dag,kubernetes_pod_operator=True,run_id=test,task_id=task,'
+ 'already_checked!=True,!airflow-worker'
+ )
def test_image_pull_secrets_correctly_set(self):
fake_pull_secrets = "fakeSecret"