kmacdonald76 opened a new issue, #48369: URL: https://github.com/apache/airflow/issues/48369
### Apache Airflow Provider(s) apache-flink ### Versions of Apache Airflow Providers apache-airflow-providers-apache-flink==1.6.0 ### Apache Airflow version 2.10.5 ### Operating System Debian GNU/Linux 12 (bookworm) ### Deployment Official Apache Airflow Helm Chart ### Deployment details Deployed on local minikube instance Executor: KubernetesExecutor Flink Operator Repo: https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/ ### What happened The FlinkKubernetesSensor operator fails to find the job state of the Flink TaskManager pod: ``` File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/apache/flink/sensors/flink_kubernetes.py", line 127, in poke self._log_driver(application_state, response) File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/apache/flink/sensors/flink_kubernetes.py", line 95, in _log_driver for task_manager in all_pods.items: ^^^^^^^^^^^^^^ AttributeError: 'HTTPResponse' object has no attribute 'items' ``` ### What you think should happen instead Idea 1) The call to obtain the list of pods in FlinkKubernetesSensor._log_driver() should use a different kubernetes API call to obtain pods across all namespaces: https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/CoreV1Api.md#list_pod_for_all_namespaces However, the argument for [get_namespaced_pod_list() is hardcoded](https://github.com/apache/airflow/blob/main/providers/apache/flink/src/airflow/providers/apache/flink/sensors/flink_kubernetes.py#L91) to the "default" namespace: ``` all_pods = self.hook.get_namespaced_pod_list( namespace="default", watch=False, label_selector=task_manager_labels ) ``` [This hook](https://github.com/apache/airflow/blob/main/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py#L460) calls the [list_namespaced_pod() API function](https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/CoreV1Api.md#list_namespaced_pod) which from my understanding will only look in the default namespace for flink taskmanager pods. Idea 2) Utilize the existing namespace argument in the FlinkKubernetesSensor parameter to look for taskmanager pods Idea 3) Provide an additional namespace argument for taskmanager pods. Perhaps users want to have the flexibility of deploying taskmanager pods in a different namespace than jobmanager pods? ### How to reproduce Deploy dag that contains two tasks: ``` deployment_yaml = ... t1 = FlinkKubernetesOperator( task_id="submit_flink_job", application_file=deployment_yaml, namespace="flink", api_group="flink.apache.org", api_version="v1beta1" ) t2 = FlinkKubernetesSensor( task_id="monitor_flink_job", application_name=deployment_yaml['metadata']['name'], namespace="flink", api_group="flink.apache.org", api_version="v1beta1", attach_log=True ) t1 >> t2 ``` - The first task successfully deploys the kubernetes deployment: the job manager launches in the k8s "flink" namespace, and then the task manager pod also launches in the "flink" namespace. - The FlinkKubernetesSensor task successfully detects the job state change of the job manager. - The FlinkKubernetesSensor sensor fails to find the task manager pod, resulting in failure mentioned above. ### Anything else Perhaps I'm not following some best-practice for kubernetes flink deployment style where taskmanager pods should always be in the "default" namespace - I'm still fairly new to Apache Flink. Workaround is currently to disable the "attach_log" feature: ``` t2 = FlinkKubernetesSensor( ... attach_log=False ) ``` ### Are you willing to submit PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
