Personally I'd find it really valuable if Airflow tasks had an understanding of the difference between a reported executor failure, an implied executor failure via timeout, and an application-level failure code returned by the executor. It's frustrating having to put retries on most tasks to cope with the possibility of the executor failing, even when an application-level failure isn't recoverable.
On Tue, Apr 23, 2019 at 5:21 PM Kevin Lam <[email protected]> wrote: > Hi all, > > We currently run Airflow as a Deployment in a kubernetes cluster. We run a > single airflow worker with the CeleryExecutor and Redis for Messaging > (similar to what is outlined in https://github.com/puckel/docker-airflow). > > We are trying to use airflow in a maximally fault-tolerant fashion. > Specifically, we'd like to recover when the airflow worker goes down, such > as due to the GCE compute instance it's hosted on being pre-empted [1] . > How can we achieve this? See below for a specific example scenario: > > > - > > Airflow is running with some DAGs in flight, with several tasks in the > 'running' state > - > > The node that the worker resides on is pre-empted (or otherwise > re-started), resulting in running jobs getting terminated > - > > The airflow worker is automatically restarted by Kubernetes but all the > jobs it was running are terminated and the states of the tasks that were > 'running' are lost > - > > Any lost tasks from the jobs are zombies and eventually get marked > 'failed' [2] > - > > We'd like to either restore the state of the worker, or retry the jobs > it was running ***only*** in the case of these kinds of disruptions (ie. > node preemptions). > - > > Note that we don't want to broadly enable task retries to avoid > re-running tasks for legitimate failures that cannot be recovered from, > e.g., a failure within our running task code causing the task to error > out. > > > One resource [3] suggests retries on DAG tasks, but this seems subpar, > since a task can fail for reasons like above (GKE/GCE preemption), or for > reasons where it 'should' fail (like a fatal sigterm thrown by the > process). > > In a previous thread [4], the KubernetesExecutor was suggested as being > fault-tolerant, but, so far as we can tell, all that is stored and > recovered upon restarts is the resource versions and uuids of the worker > pods [5]. These two data combined are enough to reproduce the > KubernetesJobWatcher [6], but not enough to reproduce any disrupted workers > and their state, and the KubernetesExecutor would suffer from the same > issue above in the case of worker disruptions: workers dying with running > jobs will create zombies tasks that are marked failed [7]. > > How can we achieve fault-tolerance, or at least fine-grained failure > handling for worker failures in Airflow? > > [1] https://cloud.google.com/kubernetes-engine/docs/how-to/preemptible-vms > > [2] https://airflow.apache.org/concepts.html#zombies-undeads > > [3] https://www.astronomer.io/guides/dag-best-practices/ > > [4] > > https://lists.apache.org/thread.html/7a7e1456db51b75666fdec9353453591de962495bdf235733170bb85@%3Cdev.airflow.apache.org%3E > > [5] > > https://github.com/apache/airflow/blob/e1d3df19992c76aff47fa18b9afca8ebf691e1da/airflow/models/kubernetes.py#L28 > > [6] > > https://github.com/apache/airflow/blob/v1-10-stable/airflow/contrib/executors/kubernetes_executor.py#L279 > [7] > > https://github.com/apache/airflow/blob/d2a2e2cbff7c7ac4bbcc47aa06e6cf4eb8ac8f57/airflow/jobs.py#L1791 >
