This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 164526d4c7 Consider custom pod labels on pod finding process on 
`KubernetesPodOperator` (#33057)
164526d4c7 is described below

commit 164526d4c798a72dba3087d71f30f60f60595b0e
Author: Changhoon Oh <[email protected]>
AuthorDate: Sat Aug 5 03:41:21 2023 +0900

    Consider custom pod labels on pod finding process on 
`KubernetesPodOperator` (#33057)
    
    * consider custom pod labels on pod finding process on KubernetesPodOperator
    
    ---------
    
    Co-authored-by: eladkal <[email protected]>
---
 airflow/providers/cncf/kubernetes/operators/pod.py    |  5 ++++-
 tests/providers/cncf/kubernetes/operators/test_pod.py | 12 +++++++++++-
 2 files changed, 15 insertions(+), 2 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py 
b/airflow/providers/cncf/kubernetes/operators/pod.py
index 0b6c90c987..d72b2fec0c 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -762,7 +762,10 @@ class KubernetesPodOperator(BaseOperator):
                     self.log.info("Skipping deleting pod: %s", 
pod.metadata.name)
 
     def _build_find_pod_label_selector(self, context: Context | None = None, 
*, exclude_checked=True) -> str:
-        labels = self._get_ti_pod_labels(context, include_try_number=False)
+        labels = {
+            **self.labels,
+            **self._get_ti_pod_labels(context, include_try_number=False),
+        }
         label_strings = [f"{label_id}={label}" for label_id, label in 
sorted(labels.items())]
         labels_value = ",".join(label_strings)
         if exclude_checked:
diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py 
b/tests/providers/cncf/kubernetes/operators/test_pod.py
index bcd2bd0a31..2c6dc2188a 100644
--- a/tests/providers/cncf/kubernetes/operators/test_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_pod.py
@@ -312,6 +312,16 @@ class TestKubernetesPodOperator:
             "airflow_kpo_in_cluster": str(k.hook.is_in_cluster),
         }
 
+    def test_find_custom_pod_labels(self):
+        k = KubernetesPodOperator(
+            labels={"foo": "bar", "hello": "airflow"},
+            name="test",
+            task_id="task",
+        )
+        context = create_context(k)
+        label_selector = k._build_find_pod_label_selector(context)
+        assert "foo=bar" in label_selector and "hello=airflow" in 
label_selector
+
     @patch(HOOK_CLASS, new=MagicMock)
     def test_find_pod_labels(self):
         k = KubernetesPodOperator(
@@ -327,7 +337,7 @@ class TestKubernetesPodOperator:
         self.run_pod(k)
         _, kwargs = k.client.list_namespaced_pod.call_args
         assert kwargs["label_selector"] == (
-            "dag_id=dag,kubernetes_pod_operator=True,run_id=test,task_id=task,"
+            
"dag_id=dag,foo=bar,kubernetes_pod_operator=True,run_id=test,task_id=task,"
             "already_checked!=True,!airflow-worker"
         )
 

Reply via email to