Hi all, We currently run Airflow as a Deployment in a kubernetes cluster. We run a single airflow worker with the CeleryExecutor and Redis for Messaging (similar to what is outlined in https://github.com/puckel/docker-airflow).
We are trying to use airflow in a maximally fault-tolerant fashion. Specifically, we'd like to recover when the airflow worker goes down, such as due to the GCE compute instance it's hosted on being pre-empted [1] . How can we achieve this? See below for a specific example scenario: - Airflow is running with some DAGs in flight, with several tasks in the 'running' state - The node that the worker resides on is pre-empted (or otherwise re-started), resulting in running jobs getting terminated - The airflow worker is automatically restarted by Kubernetes but all the jobs it was running are terminated and the states of the tasks that were 'running' are lost - Any lost tasks from the jobs are zombies and eventually get marked 'failed' [2] - We'd like to either restore the state of the worker, or retry the jobs it was running ***only*** in the case of these kinds of disruptions (ie. node preemptions). - Note that we don't want to broadly enable task retries to avoid re-running tasks for legitimate failures that cannot be recovered from, e.g., a failure within our running task code causing the task to error out. One resource [3] suggests retries on DAG tasks, but this seems subpar, since a task can fail for reasons like above (GKE/GCE preemption), or for reasons where it 'should' fail (like a fatal sigterm thrown by the process). In a previous thread [4], the KubernetesExecutor was suggested as being fault-tolerant, but, so far as we can tell, all that is stored and recovered upon restarts is the resource versions and uuids of the worker pods [5]. These two data combined are enough to reproduce the KubernetesJobWatcher [6], but not enough to reproduce any disrupted workers and their state, and the KubernetesExecutor would suffer from the same issue above in the case of worker disruptions: workers dying with running jobs will create zombies tasks that are marked failed [7]. How can we achieve fault-tolerance, or at least fine-grained failure handling for worker failures in Airflow? [1] https://cloud.google.com/kubernetes-engine/docs/how-to/preemptible-vms [2] https://airflow.apache.org/concepts.html#zombies-undeads [3] https://www.astronomer.io/guides/dag-best-practices/ [4] https://lists.apache.org/thread.html/7a7e1456db51b75666fdec9353453591de962495bdf235733170bb85@%3Cdev.airflow.apache.org%3E [5] https://github.com/apache/airflow/blob/e1d3df19992c76aff47fa18b9afca8ebf691e1da/airflow/models/kubernetes.py#L28 [6] https://github.com/apache/airflow/blob/v1-10-stable/airflow/contrib/executors/kubernetes_executor.py#L279 [7] https://github.com/apache/airflow/blob/d2a2e2cbff7c7ac4bbcc47aa06e6cf4eb8ac8f57/airflow/jobs.py#L1791
