[
https://issues.apache.org/jira/browse/AIRFLOW-5589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16943462#comment-16943462
]
Daniel Cooper commented on AIRFLOW-5589:
----------------------------------------
I've done a fix for this in our extension of the k8s pod operator.
I've ported this into the v1-10-stable branch:
[https://github.com/danccooper/airflow/commit/26424705ac368056ff0d66a2f37337aa533dec86]
I've not tested this at all in the airflow project itself but wanted to make it
available for others to use if they need to.
There is at least one thing to tidy up (usage of 'private' function
_monitor_pod() from PodLauncher) and probably more.
Hopefully I will have time soon to get a local env running for airflow and
polish the contribution, if anyone else wants to pick this up & continue please
do.
> KubernetesPodOperator: Duplicate pods created on worker restart
> ---------------------------------------------------------------
>
> Key: AIRFLOW-5589
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5589
> Project: Apache Airflow
> Issue Type: Bug
> Components: worker
> Affects Versions: 1.10.4, 1.10.5
> Reporter: Daniel Cooper
> Assignee: Daniel Cooper
> Priority: Major
>
> K8sPodOperator holds state within the execute function that monitors the
> running pod. If a worker restarts for any reason (pod death, pod shuffle,
> upgrade etc.) then this state is lost.
> At this point the scheduler notices (after max heartbeat interval wait) that
> the task is now 'zombie' (not monitored) and reschedules the task.
> The new worker has no knowledge of the existing running pod and so creates a
> new duplicate pod. This can lead to many duplicate pods for the same task
> running together in extreme cases.
> I believe this is the problem Nicholas Brenwald (King) described as having
> when running k8s pod operator on Google Composer (at the September meetup at
> King).
> My fix is to add enough labels to uniquely identify a running pod as being
> from a given task instance (dag_id, task_id, run_id). We then do a
> namespaced list of pods from k8s with a label selector and monitor the
> existing pod if it exists otherwise we create a new one as normal.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)