This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8e3abe4180 Fix ``KubernetesPodOperator`` with `KubernetesExecutor`` on 
2.3.0 (#23371)
8e3abe4180 is described below

commit 8e3abe418021a3ba241ead1cad79a1c5b492c587
Author: Jed Cunningham <[email protected]>
AuthorDate: Fri Apr 29 15:35:44 2022 -0600

    Fix ``KubernetesPodOperator`` with `KubernetesExecutor`` on 2.3.0 (#23371)
    
    KubernetesPodOperator was mistakenly trying to reattach to it's
    KubernetesExecutor worker, where it would get stuck watching itself for
    logs. We will properly filter for KPO's only, and ignore
    KubernetesExecutor workers for good measure.
---
 airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py  | 10 +++++++---
 .../providers/cncf/kubernetes/operators/test_kubernetes_pod.py |  5 ++++-
 2 files changed, 11 insertions(+), 4 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py 
b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index 8a79658381..d075915bf7 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -290,7 +290,12 @@ class KubernetesPodOperator(BaseOperator):
         ti = context['ti']
         run_id = context['run_id']
 
-        labels = {'dag_id': ti.dag_id, 'task_id': ti.task_id, 'run_id': run_id}
+        labels = {
+            'dag_id': ti.dag_id,
+            'task_id': ti.task_id,
+            'run_id': run_id,
+            'kubernetes_pod_operator': 'True',
+        }
 
         # If running on Airflow 2.3+:
         map_index = getattr(ti, 'map_index', -1)
@@ -433,7 +438,7 @@ class KubernetesPodOperator(BaseOperator):
     def _build_find_pod_label_selector(self, context: Optional[dict] = None) 
-> str:
         labels = self._get_ti_pod_labels(context, include_try_number=False)
         label_strings = [f'{label_id}={label}' for label_id, label in 
sorted(labels.items())]
-        return ','.join(label_strings) + f',{self.POD_CHECKED_KEY}!=True'
+        return ','.join(label_strings) + 
f',{self.POD_CHECKED_KEY}!=True,!airflow-worker'
 
     def _set_name(self, name):
         if name is None:
@@ -541,7 +546,6 @@ class KubernetesPodOperator(BaseOperator):
         pod.metadata.labels.update(
             {
                 'airflow_version': airflow_version.replace('+', '-'),
-                'kubernetes_pod_operator': 'True',
             }
         )
         pod_mutation_hook(pod)
diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py 
b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
index ee2a3cdb43..e70bf88326 100644
--- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
@@ -228,7 +228,10 @@ class TestKubernetesPodOperator:
         self.run_pod(k)
         self.client_mock.return_value.list_namespaced_pod.assert_called_once()
         _, kwargs = self.client_mock.return_value.list_namespaced_pod.call_args
-        assert kwargs['label_selector'] == 
'dag_id=dag,run_id=test,task_id=task,already_checked!=True'
+        assert kwargs['label_selector'] == (
+            'dag_id=dag,kubernetes_pod_operator=True,run_id=test,task_id=task,'
+            'already_checked!=True,!airflow-worker'
+        )
 
     def test_image_pull_secrets_correctly_set(self):
         fake_pull_secrets = "fakeSecret"

Reply via email to