Hi Ping,

(The dev list doesn't allow attachments, so we can't see any of the images you've posted, so some of my questions might have been addressed by those images.)

It seems that a lot of the goals here are overlapping with the AIP-1 and proposed separation of dag processor from scheduler and multi-tenancy work in general. Your description of how the scheduler and DAG parsing process operate is based on 1.10 mode of operation, but that has changed in 2.0 -- the scheduler _only_ operates on the serialized representation and doesn't need the result of the dag parsing process. Breaking this tight coupling was one of the major speed ups I achieved.

It's not clear from your email the exact details yet, but my initial comments:

1. Runtime isolation of task execution is already possible by using the KubernetesExecutor

2. Running short-lived process (such as what I think you are proposing for dag parsing) in a Kube cluster isn't really practical as the spin up time of pods is highly variable and can be to the order of minutes

3. Not everyone has docker available or is comfortable running it -- we 100% need to support running without Docker or containers still.

4. Many of our users are Data Scientists or Engineers, and so aren't happy with building containers.

On Thu, Dec 16 2021 at 15:52:02 -0800, Ping Zhang <[email protected]> wrote:
Hi Airflow Community,

This is Ping Zhang from the Airbnb Airflow team. We would like to open source our internal feature: docker runtime isolation for airflow tasks. It has been in our production for close to 1 year and it is very stable.

I will create an AIP after the discussion.

Thanks,

Ping

Motivation

Airflow worker host is a shared resource among all tasks running on it. Thus, it requires hosts to provision dependencies for all tasks, including system and python application level dependencies. It leads to a very fat runtime, thus long host provision time and low elasticity in the worker resource. This makes it challenging to prepare for unexpected burst load, including a large backfill or a rerun of large DAGs.

The lack of runtime isolation makes it challenging and risky to do operations, including adding/upgrading system and python dependencies, and it is almost impossible to remove any dependencies. It also incurs lots of additional operating costs for the team as users do not have permission to add/upgrade python dependencies, which requires us to coordinate with them. When there are package version conflicts, it prevents installing them directly on the host. Users have to use PythonVirtualenvOperator, which slows down their development cycle.


What change do you propose to make?

To solve those problems, we propose introducing runtime isolation for Airflow tasks. It leverages docker as the tasks runtime environment. There are several benefits:

Provide runtime isolation on task level

Customize runtime to parse dag files

Lean runtime on airflow host, which enables high worker resource elasticity

Immutable and portable task execution untime

Process isolation ensures that all subprocesses of a task are cleaned up after docker exits (we have seen some orphaned hive, spark subprocesses after the airflow run process exits)

Changes
Airflow Worker
In the new design, the `airflow run local` and `airflow run raw` processes are running inside a docker container, which is launched by an airflow worker. In this way, the airflow worker runtime only needs minimum requirements to run airflow core and docker.

Airflow Scheduler
Instead of processing the DAG file directly, the DagFileProcessor process

launches a docker container required by that DAG file to process it and persists the serializable DAGs (SimpleDags) to a file so that the result can be read outside the docker container

reads the file persisted from the docker container, deserializes it and puts the result into the multiprocess queue


This ensures the DAG parsing runtime is exactly the same as DAG execution runtime.

This requires a DAG definition file to tell the DAG file processing loop to use which docker image to process it. We can easily achieve this by having a metadata file along with the DAG definition file to define the docker runtime. To ease the burden of users, a default docker image is provided when a DAG definition file does not require customized runtime.

As a Whole





Best wishes

Ping Zhang

Reply via email to