kmacdonald76 opened a new issue, #48369:
URL: https://github.com/apache/airflow/issues/48369

   ### Apache Airflow Provider(s)
   
   apache-flink
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-apache-flink==1.6.0
   
   ### Apache Airflow version
   
   2.10.5
   
   ### Operating System
   
   Debian GNU/Linux 12 (bookworm)
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   Deployed on local minikube instance
   Executor: KubernetesExecutor
   Flink Operator Repo: 
https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/
   
   
   ### What happened
   
   The FlinkKubernetesSensor operator fails to find the job state of the Flink 
TaskManager pod:
   
   ```
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/apache/flink/sensors/flink_kubernetes.py",
 line 127, in poke
       self._log_driver(application_state, response)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/apache/flink/sensors/flink_kubernetes.py",
 line 95, in _log_driver
       for task_manager in all_pods.items:
                           ^^^^^^^^^^^^^^
   AttributeError: 'HTTPResponse' object has no attribute 'items'
   ```
   
   ### What you think should happen instead
   
   Idea 1)
   
   The call to obtain the list of pods in FlinkKubernetesSensor._log_driver() 
should use a different kubernetes API call to obtain pods across all namespaces:
   
   
https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/CoreV1Api.md#list_pod_for_all_namespaces
   
   However, the argument for [get_namespaced_pod_list() is 
hardcoded](https://github.com/apache/airflow/blob/main/providers/apache/flink/src/airflow/providers/apache/flink/sensors/flink_kubernetes.py#L91)
 to the "default" namespace:
   
   ```
     all_pods = self.hook.get_namespaced_pod_list(
         namespace="default", watch=False, label_selector=task_manager_labels
     )
   ```
   
   [This 
hook](https://github.com/apache/airflow/blob/main/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py#L460)
 calls the [list_namespaced_pod() API 
function](https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/CoreV1Api.md#list_namespaced_pod)
 which from my understanding will only look in the default namespace for flink 
taskmanager pods.
   
   Idea 2)
   
   Utilize the existing namespace argument in the FlinkKubernetesSensor 
parameter to look for taskmanager pods
   
   Idea 3)
   
   Provide an additional namespace argument for taskmanager pods.  Perhaps 
users want to have the flexibility of deploying taskmanager pods in a different 
namespace than jobmanager pods?
   
   ### How to reproduce
   
   Deploy dag that contains two tasks:
   
   ```
   
      deployment_yaml = ...
   
       t1 = FlinkKubernetesOperator(
           task_id="submit_flink_job",
           application_file=deployment_yaml,
           namespace="flink",
           api_group="flink.apache.org",
           api_version="v1beta1"
       )
   
       t2 = FlinkKubernetesSensor(
           task_id="monitor_flink_job",
           application_name=deployment_yaml['metadata']['name'],
           namespace="flink",
           api_group="flink.apache.org",
           api_version="v1beta1",
           attach_log=True
       )
       t1 >> t2
   ```
         
   - The first task successfully deploys the kubernetes deployment: the job 
manager launches in the k8s "flink" namespace, and then the task manager pod 
also launches in the "flink" namespace.
   - The FlinkKubernetesSensor task successfully detects the job state change 
of the job manager.
   - The FlinkKubernetesSensor sensor fails to find the task manager pod, 
resulting in failure mentioned above.
   
   ### Anything else
   
   Perhaps I'm not following some best-practice for kubernetes flink deployment 
style where taskmanager pods should always be in the "default" namespace - I'm 
still fairly new to Apache Flink.
   
   Workaround is currently to disable the "attach_log" feature:
   
   ```
           t2 = FlinkKubernetesSensor(
               ...
               attach_log=False
           )
   ```
   
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to