Personally I'd find it really valuable if Airflow tasks had an
understanding of the difference between a reported executor failure, an
implied executor failure via timeout, and an application-level failure code
returned by the executor. It's frustrating having to put retries on most
tasks to cope with the possibility of the executor failing, even when an
application-level failure isn't recoverable.

On Tue, Apr 23, 2019 at 5:21 PM Kevin Lam <[email protected]> wrote:

> Hi all,
>
> We currently run Airflow as a Deployment in a kubernetes cluster. We run a
> single airflow worker with the CeleryExecutor and Redis for Messaging
> (similar to what is outlined in https://github.com/puckel/docker-airflow).
>
> We are trying to use airflow in a maximally fault-tolerant fashion.
> Specifically, we'd like to recover when the airflow worker goes down, such
> as due to the GCE compute instance it's hosted on being pre-empted [1] .
> How can we achieve this? See below for a specific example scenario:
>
>
>    -
>
>    Airflow is running with some DAGs in flight, with several tasks in the
>    'running' state
>    -
>
>    The node that the worker resides on is pre-empted (or otherwise
>    re-started), resulting in running jobs getting terminated
>    -
>
>    The airflow worker is automatically restarted by Kubernetes but all the
>    jobs it was running are terminated and the states of the tasks that were
>    'running' are lost
>    -
>
>    Any lost tasks from the jobs are zombies and eventually get marked
>    'failed' [2]
>    -
>
>    We'd like to either restore the state of the worker, or retry the jobs
>    it was running ***only*** in the case of these kinds of disruptions (ie.
>    node preemptions).
>    -
>
>    Note that we don't want to broadly enable task retries to avoid
>    re-running tasks for legitimate failures that cannot be recovered from,
>    e.g., a failure within our running task code causing the task to error
> out.
>
>
> One resource [3] suggests retries on DAG tasks, but this seems subpar,
> since a task can fail for reasons like above (GKE/GCE preemption), or for
> reasons where it 'should' fail (like a fatal sigterm thrown by the
> process).
>
> In a previous thread [4], the KubernetesExecutor was suggested as being
> fault-tolerant, but, so far as we can tell, all that is stored and
> recovered upon restarts is the resource versions and uuids of the worker
> pods [5]. These two data combined are enough to reproduce the
> KubernetesJobWatcher [6], but not enough to reproduce any disrupted workers
> and their state, and the KubernetesExecutor would suffer from the same
> issue above in the case of worker disruptions: workers dying with running
> jobs will create zombies tasks that are marked failed [7].
>
> How can we achieve fault-tolerance, or at least fine-grained failure
> handling for worker failures in Airflow?
>
> [1] https://cloud.google.com/kubernetes-engine/docs/how-to/preemptible-vms
>
> [2] https://airflow.apache.org/concepts.html#zombies-undeads
>
> [3] https://www.astronomer.io/guides/dag-best-practices/
>
> [4]
>
> https://lists.apache.org/thread.html/7a7e1456db51b75666fdec9353453591de962495bdf235733170bb85@%3Cdev.airflow.apache.org%3E
>
> [5]
>
> https://github.com/apache/airflow/blob/e1d3df19992c76aff47fa18b9afca8ebf691e1da/airflow/models/kubernetes.py#L28
>
> [6]
>
> https://github.com/apache/airflow/blob/v1-10-stable/airflow/contrib/executors/kubernetes_executor.py#L279
> [7]
>
> https://github.com/apache/airflow/blob/d2a2e2cbff7c7ac4bbcc47aa06e6cf4eb8ac8f57/airflow/jobs.py#L1791
>

Reply via email to