This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9c9225a9452 added both different namespace for taskmanager or for 
global namespace (#53653)
9c9225a9452 is described below

commit 9c9225a945299b8501c65acd3498e92cea46e3a8
Author: Nataneljpwd <[email protected]>
AuthorDate: Mon Jul 28 10:48:17 2025 +0300

    added both different namespace for taskmanager or for global namespace 
(#53653)
    
    * added both different namespace for taskmanager or for global namespace
    
    * added tests
    
    ---------
    
    Co-authored-by: Natanel Rudyuklakir <[email protected]>
---
 .../apache/flink/sensors/flink_kubernetes.py       |  6 +-
 .../apache/flink/sensors/test_flink_kubernetes.py  | 70 ++++++++++++++++++++++
 2 files changed, 75 insertions(+), 1 deletion(-)

diff --git 
a/providers/apache/flink/src/airflow/providers/apache/flink/sensors/flink_kubernetes.py
 
b/providers/apache/flink/src/airflow/providers/apache/flink/sensors/flink_kubernetes.py
index ad0cc1cfa54..4d86aa3d5ae 100644
--- 
a/providers/apache/flink/src/airflow/providers/apache/flink/sensors/flink_kubernetes.py
+++ 
b/providers/apache/flink/src/airflow/providers/apache/flink/sensors/flink_kubernetes.py
@@ -62,6 +62,7 @@ class FlinkKubernetesSensor(BaseSensorOperator):
         api_group: str = "flink.apache.org",
         api_version: str = "v1beta1",
         plural: str = "flinkdeployments",
+        taskmanager_pods_namespace: str | None = None,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
@@ -73,6 +74,7 @@ class FlinkKubernetesSensor(BaseSensorOperator):
         self.api_group = api_group
         self.api_version = api_version
         self.plural = plural
+        self.taskmanager_pods_namespace = taskmanager_pods_namespace
 
     def _log_driver(self, application_state: str, response: dict) -> None:
         log_method = self.log.error if application_state in 
self.FAILURE_STATES else self.log.info
@@ -88,7 +90,9 @@ class FlinkKubernetesSensor(BaseSensorOperator):
 
         task_manager_labels = status_info["taskManager"]["labelSelector"]
         all_pods = self.hook.get_namespaced_pod_list(
-            namespace="default", watch=False, 
label_selector=task_manager_labels
+            namespace=self.taskmanager_pods_namespace or self.namespace or 
"default",
+            watch=False,
+            label_selector=task_manager_labels,
         )
 
         namespace = response["metadata"]["namespace"]
diff --git 
a/providers/apache/flink/tests/unit/apache/flink/sensors/test_flink_kubernetes.py
 
b/providers/apache/flink/tests/unit/apache/flink/sensors/test_flink_kubernetes.py
index 0df129be503..cbaf52e0aca 100644
--- 
a/providers/apache/flink/tests/unit/apache/flink/sensors/test_flink_kubernetes.py
+++ 
b/providers/apache/flink/tests/unit/apache/flink/sensors/test_flink_kubernetes.py
@@ -1140,6 +1140,76 @@ class TestFlinkKubernetesSensor:
             name="flink-stream-example",
         )
 
+    @patch(
+        
"kubernetes.client.api.custom_objects_api.CustomObjectsApi.get_namespaced_custom_object",
+        return_value=TEST_READY_CLUSTER,
+    )
+    @patch("logging.Logger.info")
+    @patch(
+        
"airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_pod_logs",
+        return_value=TEST_POD_LOGS,
+    )
+    @patch(
+        
"airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_namespaced_pod_list",
+        return_value=TASK_MANAGER_POD_LIST,
+    )
+    def 
test_logging_taskmanager_from_taskmanager_namespace_when_namespace_is_set(
+        self, mock_namespaced_pod_list, mock_pod_logs, info_log_call, 
mock_namespaced_crd, mock_kube_conn
+    ):
+        namespace = "different-namespace123456"
+        namespae_name = "test123"
+
+        sensor = FlinkKubernetesSensor(
+            application_name="flink-stream-example",
+            namespace=namespace,
+            taskmanager_pods_namespace=namespae_name,
+            attach_log=True,
+            dag=self.dag,
+            task_id="test_task_id",
+        )
+
+        sensor.poke(context=None)
+
+        mock_namespaced_pod_list.assert_called_once_with(
+            namespace=namespae_name,
+            watch=False,
+            label_selector="component=taskmanager,app=flink-stream-example",
+        )
+
+    @patch(
+        
"kubernetes.client.api.custom_objects_api.CustomObjectsApi.get_namespaced_custom_object",
+        return_value=TEST_READY_CLUSTER,
+    )
+    @patch("logging.Logger.info")
+    @patch(
+        
"airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_pod_logs",
+        return_value=TEST_POD_LOGS,
+    )
+    @patch(
+        
"airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_namespaced_pod_list",
+        return_value=TASK_MANAGER_POD_LIST,
+    )
+    def test_logging_taskmanager_from_non_default_namespace(
+        self, mock_namespaced_pod_list, mock_pod_logs, info_log_call, 
mock_namespaced_crd, mock_kube_conn
+    ):
+        namespae_name = "test123"
+
+        sensor = FlinkKubernetesSensor(
+            application_name="flink-stream-example",
+            namespace=namespae_name,
+            attach_log=True,
+            dag=self.dag,
+            task_id="test_task_id",
+        )
+
+        sensor.poke(context=None)
+
+        mock_namespaced_pod_list.assert_called_once_with(
+            namespace=namespae_name,
+            watch=False,
+            label_selector="component=taskmanager,app=flink-stream-example",
+        )
+
     @patch(
         
"kubernetes.client.api.custom_objects_api.CustomObjectsApi.get_namespaced_custom_object",
         return_value=TEST_READY_CLUSTER,

Reply via email to