Hi all,

We currently run Airflow as a Deployment in a kubernetes cluster. We run a
single airflow worker with the CeleryExecutor and Redis for Messaging
(similar to what is outlined in https://github.com/puckel/docker-airflow).

We are trying to use airflow in a maximally fault-tolerant fashion.
Specifically, we'd like to recover when the airflow worker goes down, such
as due to the GCE compute instance it's hosted on being pre-empted [1] .
How can we achieve this? See below for a specific example scenario:


   -

   Airflow is running with some DAGs in flight, with several tasks in the
   'running' state
   -

   The node that the worker resides on is pre-empted (or otherwise
   re-started), resulting in running jobs getting terminated
   -

   The airflow worker is automatically restarted by Kubernetes but all the
   jobs it was running are terminated and the states of the tasks that were
   'running' are lost
   -

   Any lost tasks from the jobs are zombies and eventually get marked
   'failed' [2]
   -

   We'd like to either restore the state of the worker, or retry the jobs
   it was running ***only*** in the case of these kinds of disruptions (ie.
   node preemptions).
   -

   Note that we don't want to broadly enable task retries to avoid
   re-running tasks for legitimate failures that cannot be recovered from,
   e.g., a failure within our running task code causing the task to error out.


One resource [3] suggests retries on DAG tasks, but this seems subpar,
since a task can fail for reasons like above (GKE/GCE preemption), or for
reasons where it 'should' fail (like a fatal sigterm thrown by the
process).

In a previous thread [4], the KubernetesExecutor was suggested as being
fault-tolerant, but, so far as we can tell, all that is stored and
recovered upon restarts is the resource versions and uuids of the worker
pods [5]. These two data combined are enough to reproduce the
KubernetesJobWatcher [6], but not enough to reproduce any disrupted workers
and their state, and the KubernetesExecutor would suffer from the same
issue above in the case of worker disruptions: workers dying with running
jobs will create zombies tasks that are marked failed [7].

How can we achieve fault-tolerance, or at least fine-grained failure
handling for worker failures in Airflow?

[1] https://cloud.google.com/kubernetes-engine/docs/how-to/preemptible-vms

[2] https://airflow.apache.org/concepts.html#zombies-undeads

[3] https://www.astronomer.io/guides/dag-best-practices/

[4]
https://lists.apache.org/thread.html/7a7e1456db51b75666fdec9353453591de962495bdf235733170bb85@%3Cdev.airflow.apache.org%3E

[5]
https://github.com/apache/airflow/blob/e1d3df19992c76aff47fa18b9afca8ebf691e1da/airflow/models/kubernetes.py#L28

[6]
https://github.com/apache/airflow/blob/v1-10-stable/airflow/contrib/executors/kubernetes_executor.py#L279
[7]
https://github.com/apache/airflow/blob/d2a2e2cbff7c7ac4bbcc47aa06e6cf4eb8ac8f57/airflow/jobs.py#L1791

Reply via email to