This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9c9225a9452 added both different namespace for taskmanager or for
global namespace (#53653)
9c9225a9452 is described below
commit 9c9225a945299b8501c65acd3498e92cea46e3a8
Author: Nataneljpwd <[email protected]>
AuthorDate: Mon Jul 28 10:48:17 2025 +0300
added both different namespace for taskmanager or for global namespace
(#53653)
* added both different namespace for taskmanager or for global namespace
* added tests
---------
Co-authored-by: Natanel Rudyuklakir <[email protected]>
---
.../apache/flink/sensors/flink_kubernetes.py | 6 +-
.../apache/flink/sensors/test_flink_kubernetes.py | 70 ++++++++++++++++++++++
2 files changed, 75 insertions(+), 1 deletion(-)
diff --git
a/providers/apache/flink/src/airflow/providers/apache/flink/sensors/flink_kubernetes.py
b/providers/apache/flink/src/airflow/providers/apache/flink/sensors/flink_kubernetes.py
index ad0cc1cfa54..4d86aa3d5ae 100644
---
a/providers/apache/flink/src/airflow/providers/apache/flink/sensors/flink_kubernetes.py
+++
b/providers/apache/flink/src/airflow/providers/apache/flink/sensors/flink_kubernetes.py
@@ -62,6 +62,7 @@ class FlinkKubernetesSensor(BaseSensorOperator):
api_group: str = "flink.apache.org",
api_version: str = "v1beta1",
plural: str = "flinkdeployments",
+ taskmanager_pods_namespace: str | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -73,6 +74,7 @@ class FlinkKubernetesSensor(BaseSensorOperator):
self.api_group = api_group
self.api_version = api_version
self.plural = plural
+ self.taskmanager_pods_namespace = taskmanager_pods_namespace
def _log_driver(self, application_state: str, response: dict) -> None:
log_method = self.log.error if application_state in
self.FAILURE_STATES else self.log.info
@@ -88,7 +90,9 @@ class FlinkKubernetesSensor(BaseSensorOperator):
task_manager_labels = status_info["taskManager"]["labelSelector"]
all_pods = self.hook.get_namespaced_pod_list(
- namespace="default", watch=False,
label_selector=task_manager_labels
+ namespace=self.taskmanager_pods_namespace or self.namespace or
"default",
+ watch=False,
+ label_selector=task_manager_labels,
)
namespace = response["metadata"]["namespace"]
diff --git
a/providers/apache/flink/tests/unit/apache/flink/sensors/test_flink_kubernetes.py
b/providers/apache/flink/tests/unit/apache/flink/sensors/test_flink_kubernetes.py
index 0df129be503..cbaf52e0aca 100644
---
a/providers/apache/flink/tests/unit/apache/flink/sensors/test_flink_kubernetes.py
+++
b/providers/apache/flink/tests/unit/apache/flink/sensors/test_flink_kubernetes.py
@@ -1140,6 +1140,76 @@ class TestFlinkKubernetesSensor:
name="flink-stream-example",
)
+ @patch(
+
"kubernetes.client.api.custom_objects_api.CustomObjectsApi.get_namespaced_custom_object",
+ return_value=TEST_READY_CLUSTER,
+ )
+ @patch("logging.Logger.info")
+ @patch(
+
"airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_pod_logs",
+ return_value=TEST_POD_LOGS,
+ )
+ @patch(
+
"airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_namespaced_pod_list",
+ return_value=TASK_MANAGER_POD_LIST,
+ )
+ def
test_logging_taskmanager_from_taskmanager_namespace_when_namespace_is_set(
+ self, mock_namespaced_pod_list, mock_pod_logs, info_log_call,
mock_namespaced_crd, mock_kube_conn
+ ):
+ namespace = "different-namespace123456"
+ namespae_name = "test123"
+
+ sensor = FlinkKubernetesSensor(
+ application_name="flink-stream-example",
+ namespace=namespace,
+ taskmanager_pods_namespace=namespae_name,
+ attach_log=True,
+ dag=self.dag,
+ task_id="test_task_id",
+ )
+
+ sensor.poke(context=None)
+
+ mock_namespaced_pod_list.assert_called_once_with(
+ namespace=namespae_name,
+ watch=False,
+ label_selector="component=taskmanager,app=flink-stream-example",
+ )
+
+ @patch(
+
"kubernetes.client.api.custom_objects_api.CustomObjectsApi.get_namespaced_custom_object",
+ return_value=TEST_READY_CLUSTER,
+ )
+ @patch("logging.Logger.info")
+ @patch(
+
"airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_pod_logs",
+ return_value=TEST_POD_LOGS,
+ )
+ @patch(
+
"airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_namespaced_pod_list",
+ return_value=TASK_MANAGER_POD_LIST,
+ )
+ def test_logging_taskmanager_from_non_default_namespace(
+ self, mock_namespaced_pod_list, mock_pod_logs, info_log_call,
mock_namespaced_crd, mock_kube_conn
+ ):
+ namespae_name = "test123"
+
+ sensor = FlinkKubernetesSensor(
+ application_name="flink-stream-example",
+ namespace=namespae_name,
+ attach_log=True,
+ dag=self.dag,
+ task_id="test_task_id",
+ )
+
+ sensor.poke(context=None)
+
+ mock_namespaced_pod_list.assert_called_once_with(
+ namespace=namespae_name,
+ watch=False,
+ label_selector="component=taskmanager,app=flink-stream-example",
+ )
+
@patch(
"kubernetes.client.api.custom_objects_api.CustomObjectsApi.get_namespaced_custom_object",
return_value=TEST_READY_CLUSTER,