Basic proposal:

Change the queue column in the TaskInstance table from a varchar(n) => pickle. 
Additionally rename the column from queue => executor_config. 

Why?

Currently the `queue` column is only used for the CeleryExecutor since the 
concept of a `queue` doesn't make sense for other executors. Adding this extra 
column that is only used by one executor was a fine solution when airflow did 
not have many pluggable backends, but as the number of backends that need 
configuration increases it probably makes sense to make some abstraction for 
the concept of "executor specific configurations". 

This was driven out of conversations about the KubernetesExecutor. 
Specifically, needing a way to configure on a per-task basis properties for the 
pod on Kubernetes (think: docker image, any volume mounts or secrets, etc). One 
proposal during the meeting was to add a new "docker_image" column in the 
baseoperator/task_instance table. However, this requires adding a new custom 
column that will only be used by one executor, increasing the amount of unused 
fields in the baseoperator and requiring airflow core code changes whenever a 
new configuration field is needed. 

By moving all executor-specific configuration into the `executor_config` field, 
this will let us extend airflow as needed with new backends and new configs 
without any core code changes. 

Here is a before and after view from before the switch to after the switch 
(targeting a celery backend)

t1  = PythonOperator(
    task_id="task1",
    python_callable=my_func,
    queue="my-small-queue"
)

t1  = PythonOperator(
    task_id="task1",
    python_callable=my_func,
    executor_config={"queue": "my-small-queue"}
)

Here is what task configuration for the Kubernetes Executor could look like 
using this concept:

t1  = PythonOperator(
    task_id="task1",
    python_callable=my_func,
    executor_config={"image": "my-special-image:latest", "volume_mounts" : 
"..."}
)

Let me know what your thoughts are. At my current company, I implemented this 
feature for us and we are using it extensively to configure out 
KubernetesExecutor plugin on a per-task basis. While there are a few downsides 
(mainly having to deal with misconfigurations inside the executor) overall it 
has worked well for us. 

Reply via email to