MioNok opened a new issue #22684:
URL: https://github.com/apache/airflow/issues/22684


   ### Apache Airflow version
   
   2.2.4 (latest released)
   
   ### What happened
   
   When Triggering a dag the run goes as expected, but when trying to rerun a 
kubernetes task, it tires to:
   A) Run an aleady completed pod
   B) Finds many previously completed pods and thus throws an error.
   
   I have narrowed down the problem to 
_airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py_, more 
specifically the ```find_pod``` function.  As the docstring explains, I think 
the original idea was that it would only return running pods, but in my case it 
does sometimes also return one or many completed pods.
   
   ``` python
       def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]:
           """Returns an already-running pod for this task instance if one 
exists."""
           label_selector = self._build_find_pod_label_selector(context) 
           pod_list = self.client.list_namespaced_pod(
               namespace=namespace,
               label_selector=label_selector,
           ).items  ########## This pod_llist contains also completed pods
   
           ###### I added som debug print steps to see the pod status:#######
            for pod in pod_list:
               self.log.info(pod.metadata)
               pod_status = 
self.client.read_namespaced_pod_status(name=pod.metadata.name,namespace=namespace)
               self.log.info(f"Pod status is {pod_status}") 
               # I think you could perhaps double check from this pod_status if 
the has completed or if it is running.
           #####################################################
   
           pod = None
           num_pods = len(pod_list)
           if num_pods > 1:
               raise AirflowException(f'More than one pod running with labels 
{label_selector}') ### thus making it crash here
           elif num_pods == 1:
               pod = pod_list[0] ### or trying to use the the completed pod here
               self.log.info("Found matching pod %s with labels %s", 
pod.metadata.name, pod.metadata.labels)
               self.log.info("`try_number` of task_instance: %s", 
context['ti'].try_number)
               self.log.info("`try_number` of pod: %s", 
pod.metadata.labels['try_number'])
           return pod
   ```
   
   When looking at the print out from that ```pod_status``` I can see that 
```pod_status.status.containerstatuses[0].state.terminated.reason ``` would be 
**Completed**. 
   I have pasted a the pod_status log here too, perhaps there is an easier way 
too to check if it is completed, on quick glance the  ``` list_namespaced_pod 
```the current function is using does not return the container/pod state, that 
is why is used the ```read_namespaced_pod_status```  in the debug step.
   
   Pod status:
   ``` json
   {'api_version': 'v1',
    'kind': 'Pod',
    'metadata': {<metadata_stuff>},
    'spec': {<spec_stuff>},
    'status': {'conditions': [{'last_probe_time': None,
                               'last_transition_time': datetime.datetime(2022, 
4, 1, 6, 44, 13, tzinfo=tzlocal()),
                               'message': None,
                               'reason': 'PodCompleted',
                               'status': 'True',
                               'type': 'Initialized'},
                              {'last_probe_time': None,
                               'last_transition_time': datetime.datetime(2022, 
4, 1, 6, 45, 55, tzinfo=tzlocal()),
                               'message': None,
                               'reason': 'PodCompleted',
                               'status': 'False',
                               'type': 'Ready'},
                              {'last_probe_time': None,
                               'last_transition_time': datetime.datetime(2022, 
4, 1, 6, 45, 55, tzinfo=tzlocal()),
                               'message': None,
                               'reason': 'PodCompleted',
                               'status': 'False',
                               'type': 'ContainersReady'},
                              {'last_probe_time': None,
                               'last_transition_time': datetime.datetime(2022, 
4, 1, 6, 44, 13, tzinfo=tzlocal()),
                               'message': None,
                               'reason': None,
                               'status': 'True',
                               'type': 'PodScheduled'}],
               'container_statuses': [{'container_id': 'docker://-----',
                                       'image': '----',
                                       'image_id': '---,
                                       'last_state': {'running': None,
                                                      'terminated': None,
                                                      'waiting': None},
                                       'name': 'base',
                                       'ready': False,
                                       'restart_count': 0,
                                       'state': {'running': None,
                                                 'terminated': {'container_id': 
'----',
                                                                'exit_code': 0,
                                                                'finished_at': 
datetime.datetime(2022, 4, 1, 6, 45, 54, tzinfo=tzlocal()),
                                                                'message': None,
                                                                'reason': 
'Completed',
                                                                'signal': None,
                                                                'started_at': 
datetime.datetime(2022, 4, 1, 6, 44, 16, tzinfo=tzlocal())},
                                                 'waiting': None}}],
               'host_ip': '----',
               'init_container_statuses': None,
               'message': None,
               'nominated_node_name': None,
               'phase': 'Succeeded',
               'pod_ip': '----',
               'qos_class': 'Burstable',
               'reason': None,
               'start_time': datetime.datetime(2022, 4, 1, 6, 44, 13, 
tzinfo=tzlocal())}}
   ```
   
   
   ### What you think should happen instead
   
   If there is no running pods, it should just create a new one.  On our 
installation commenting out that check to use running pods solved our problems. 
We do not have a case when we would like to reuse a pod, but since there is 
this functionality I think a better solution would be to double check that the 
retuned pods are actually running.
   
   ### How to reproduce
   
   Step 1: Run a dag with a task that uses the KubernetsPodOperator ( can 
contain other tasks aswell)
   Step 2: Once the dag has finished, try and rerun the task, select the ignore 
task state.
   Optional( Step 2.1: try and rerun the task, select the ignore task state.)
   Step 3:  It should print out these lines:
    ```python
      self.log.info("Found matching pod %s with labels %s", pod.metadata.name, 
pod.metadata.labels)
      self.log.info("`try_number` of task_instance: %s", 
context['ti'].try_number)
      self.log.info("`try_number` of pod: %s", 
pod.metadata.labels['try_number'])
    ```
    But no pod starts to run on the kubernets cluster since the pod returned is 
completed. It seems that the logs are just copied from the previous run from 
this ```try_number` of pod: %s", pod.metadata.labels['try_number']```onwards.
    
    Step 3.1: if you did step 2.1 it should print out this:
    ```python
    raise AirflowException(f'More than one pod running with labels 
{label_selector}')
    ```
    
    Quick and dirty fix:
    Step 4.  Commenting out line 355, ```pod = pod_list[0]``` Fixed the issue 
for us, for step 3.
   
   ### Operating System
   
   Debian GNU/Linux 10 (buster)
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow==2.2.4
   apache-airflow-providers-amazon==3.0.0
   apache-airflow-providers-celery==2.1.0
   apache-airflow-providers-cncf-kubernetes==3.0.2
   apache-airflow-providers-docker==2.4.1
   apache-airflow-providers-elasticsearch==2.2.0
   apache-airflow-providers-ftp==2.0.1
   apache-airflow-providers-google==6.4.0
   apache-airflow-providers-grpc==2.0.1
   apache-airflow-providers-hashicorp==2.1.1
   apache-airflow-providers-http==2.0.3
   apache-airflow-providers-imap==2.2.0
   apache-airflow-providers-microsoft-azure==3.6.0
   apache-airflow-providers-microsoft-mssql==2.1.3
   apache-airflow-providers-mysql==2.2.0
   apache-airflow-providers-odbc==2.0.1
   apache-airflow-providers-postgres==3.0.0
   apache-airflow-providers-redis==2.0.1
   apache-airflow-providers-sendgrid==2.0.1
   apache-airflow-providers-sftp==2.4.1
   apache-airflow-providers-slack==4.2.0
   apache-airflow-providers-sqlite==2.1.0
   apache-airflow-providers-ssh==2.4.0
   
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   Docker compose, using the docker image from docker hub, 
airflow:2.2.4-pyhton3.7.
   
   ### Anything else
   
   This is happens every time. This *might* be an issue with our kubernetes 
cluster (Rancher 2.x) displaying the pod status is an unexpected way. For our 
use case we just want to create a new pod on every rerun.  Happy to answer any 
questions.
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to