MioNok opened a new issue #22684:
URL: https://github.com/apache/airflow/issues/22684
### Apache Airflow version
2.2.4 (latest released)
### What happened
When Triggering a dag the run goes as expected, but when trying to rerun a
kubernetes task, it tires to:
A) Run an aleady completed pod
B) Finds many previously completed pods and thus throws an error.
I have narrowed down the problem to
_airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py_, more
specifically the ```find_pod``` function. As the docstring explains, I think
the original idea was that it would only return running pods, but in my case it
does sometimes also return one or many completed pods.
``` python
def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]:
"""Returns an already-running pod for this task instance if one
exists."""
label_selector = self._build_find_pod_label_selector(context)
pod_list = self.client.list_namespaced_pod(
namespace=namespace,
label_selector=label_selector,
).items ########## This pod_llist contains also completed pods
###### I added som debug print steps to see the pod status:#######
for pod in pod_list:
self.log.info(pod.metadata)
pod_status =
self.client.read_namespaced_pod_status(name=pod.metadata.name,namespace=namespace)
self.log.info(f"Pod status is {pod_status}")
# I think you could perhaps double check from this pod_status if
the has completed or if it is running.
#####################################################
pod = None
num_pods = len(pod_list)
if num_pods > 1:
raise AirflowException(f'More than one pod running with labels
{label_selector}') ### thus making it crash here
elif num_pods == 1:
pod = pod_list[0] ### or trying to use the the completed pod here
self.log.info("Found matching pod %s with labels %s",
pod.metadata.name, pod.metadata.labels)
self.log.info("`try_number` of task_instance: %s",
context['ti'].try_number)
self.log.info("`try_number` of pod: %s",
pod.metadata.labels['try_number'])
return pod
```
When looking at the print out from that ```pod_status``` I can see that
```pod_status.status.containerstatuses[0].state.terminated.reason ``` would be
**Completed**.
I have pasted a the pod_status log here too, perhaps there is an easier way
too to check if it is completed, on quick glance the ``` list_namespaced_pod
```the current function is using does not return the container/pod state, that
is why is used the ```read_namespaced_pod_status``` in the debug step.
Pod status:
``` json
{'api_version': 'v1',
'kind': 'Pod',
'metadata': {<metadata_stuff>},
'spec': {<spec_stuff>},
'status': {'conditions': [{'last_probe_time': None,
'last_transition_time': datetime.datetime(2022,
4, 1, 6, 44, 13, tzinfo=tzlocal()),
'message': None,
'reason': 'PodCompleted',
'status': 'True',
'type': 'Initialized'},
{'last_probe_time': None,
'last_transition_time': datetime.datetime(2022,
4, 1, 6, 45, 55, tzinfo=tzlocal()),
'message': None,
'reason': 'PodCompleted',
'status': 'False',
'type': 'Ready'},
{'last_probe_time': None,
'last_transition_time': datetime.datetime(2022,
4, 1, 6, 45, 55, tzinfo=tzlocal()),
'message': None,
'reason': 'PodCompleted',
'status': 'False',
'type': 'ContainersReady'},
{'last_probe_time': None,
'last_transition_time': datetime.datetime(2022,
4, 1, 6, 44, 13, tzinfo=tzlocal()),
'message': None,
'reason': None,
'status': 'True',
'type': 'PodScheduled'}],
'container_statuses': [{'container_id': 'docker://-----',
'image': '----',
'image_id': '---,
'last_state': {'running': None,
'terminated': None,
'waiting': None},
'name': 'base',
'ready': False,
'restart_count': 0,
'state': {'running': None,
'terminated': {'container_id':
'----',
'exit_code': 0,
'finished_at':
datetime.datetime(2022, 4, 1, 6, 45, 54, tzinfo=tzlocal()),
'message': None,
'reason':
'Completed',
'signal': None,
'started_at':
datetime.datetime(2022, 4, 1, 6, 44, 16, tzinfo=tzlocal())},
'waiting': None}}],
'host_ip': '----',
'init_container_statuses': None,
'message': None,
'nominated_node_name': None,
'phase': 'Succeeded',
'pod_ip': '----',
'qos_class': 'Burstable',
'reason': None,
'start_time': datetime.datetime(2022, 4, 1, 6, 44, 13,
tzinfo=tzlocal())}}
```
### What you think should happen instead
If there is no running pods, it should just create a new one. On our
installation commenting out that check to use running pods solved our problems.
We do not have a case when we would like to reuse a pod, but since there is
this functionality I think a better solution would be to double check that the
retuned pods are actually running.
### How to reproduce
Step 1: Run a dag with a task that uses the KubernetsPodOperator ( can
contain other tasks aswell)
Step 2: Once the dag has finished, try and rerun the task, select the ignore
task state.
Optional( Step 2.1: try and rerun the task, select the ignore task state.)
Step 3: It should print out these lines:
```python
self.log.info("Found matching pod %s with labels %s", pod.metadata.name,
pod.metadata.labels)
self.log.info("`try_number` of task_instance: %s",
context['ti'].try_number)
self.log.info("`try_number` of pod: %s",
pod.metadata.labels['try_number'])
```
But no pod starts to run on the kubernets cluster since the pod returned is
completed. It seems that the logs are just copied from the previous run from
this ```try_number` of pod: %s", pod.metadata.labels['try_number']```onwards.
Step 3.1: if you did step 2.1 it should print out this:
```python
raise AirflowException(f'More than one pod running with labels
{label_selector}')
```
Quick and dirty fix:
Step 4. Commenting out line 355, ```pod = pod_list[0]``` Fixed the issue
for us, for step 3.
### Operating System
Debian GNU/Linux 10 (buster)
### Versions of Apache Airflow Providers
apache-airflow==2.2.4
apache-airflow-providers-amazon==3.0.0
apache-airflow-providers-celery==2.1.0
apache-airflow-providers-cncf-kubernetes==3.0.2
apache-airflow-providers-docker==2.4.1
apache-airflow-providers-elasticsearch==2.2.0
apache-airflow-providers-ftp==2.0.1
apache-airflow-providers-google==6.4.0
apache-airflow-providers-grpc==2.0.1
apache-airflow-providers-hashicorp==2.1.1
apache-airflow-providers-http==2.0.3
apache-airflow-providers-imap==2.2.0
apache-airflow-providers-microsoft-azure==3.6.0
apache-airflow-providers-microsoft-mssql==2.1.3
apache-airflow-providers-mysql==2.2.0
apache-airflow-providers-odbc==2.0.1
apache-airflow-providers-postgres==3.0.0
apache-airflow-providers-redis==2.0.1
apache-airflow-providers-sendgrid==2.0.1
apache-airflow-providers-sftp==2.4.1
apache-airflow-providers-slack==4.2.0
apache-airflow-providers-sqlite==2.1.0
apache-airflow-providers-ssh==2.4.0
### Deployment
Docker-Compose
### Deployment details
Docker compose, using the docker image from docker hub,
airflow:2.2.4-pyhton3.7.
### Anything else
This is happens every time. This *might* be an issue with our kubernetes
cluster (Rancher 2.x) displaying the pod status is an unexpected way. For our
use case we just want to create a new pod on every rerun. Happy to answer any
questions.
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]