GitHub user ywmvis edited a comment on the discussion: How to handle long 
running tasks with the Kubernetes Operator ?

Hi potiuk, thank you for the suggestions 👍 

> Use Hybrid Celery Kubernetes Executor and make all the small and fast tasks 
> run through Celery - that wil limit the overhead incurred by many PODs being 
> created to just run a small and fast thing (will decrease the pressure on K8S)

> Limit parallelism of certain tasks in Airlfow - Airlfow has a number of ways 
> to limit parallelism - for example by using Pools, Queues, various dags, 
> tasks and configuration parameters - for example here: 
> https://airflow.apache.org/docs/apache-airflow/stable/faq.html#how-to-improve-dag-performance
>  - this will prevent airflow scheduler to even schedule tasks for execution 
> if there are other - related tasks already scheduled and exceed the 
> parallelism settings

We already use Celery Kubernetes Executor. Basically we started with the Celery 
Executor and now switched to the Celery Kubernetes Executor.

What we do within the DAGs is running "blackbox" docker images doing 
computation on big chunks of data.
Within the DAG we define a workflow like "[Input Data] -> Computation docker A 
-> ( Computation docker B |  Computation docker C)"

The Input data can vary in size from really small chunks of data to large TB 
files. Same for the computation dockers, some run fast, some take a long 
computation time and a lot of resources.

Since we can not influence the input data size and also can not influence the 
computation dockers we can not really estimate the runtime.

The only thing we know about the computation dockers is how many resources each 
computation docker requires in a worst case scenario.

What we did in the past (and worked for us with a few downsights):

- We created pools "ComputationSmall, ComputationMedium, ComputationLarge" and 
assigned the computation dockers to the matching pool.

As an example:

- ComputationSmall is designed to run computation dockers with 2 CPU cores per 
docker and max 1GB Ram
- ComputationMedium is designed to run computation dockers with 4 CPU cores per 
docker and max 16GB Ram
- ComputationLarge is designed to run computation dockers with 4 CPU cores per 
docker and max 32GB Ram

Knowing the available resources of the Celery Node we limited the parallelism 
within the pool to the available resources.
-> ComputationSmall Celery Node with 32CPU cores and 16GB Ram --> 16 Slots

This way we know that airflow wont queue tasks above our resource limits and 
causing individual tasks to get OOM killed.

But this approach had a few downsights:

- If we have only "ComputationLarge" tasks for a few days only the 
ComputationLarge node is used while ComputationSmall and ComputationMedium 
nodes are just in idle state wasting "blocked" resources
- Extending the computation resources cant just be done on the Celery node side 
it always requires to keep the airflow parallelism in sync with the available 
resources
- If a previous tasks fails during computation and is not cleaned up properly 
(e.g. docker is not shut down properly and still running)  parallelism / slots 
are again out of sync with the available resources causing tasks to fail 
because they stuck in queue, or OOM kill because of unavailable resources. 

What airflow seems to be missing here is a way to schedule based on resources 
instead of more abstract pools and slots.
We first thought we could just have one pool and match the amount of slots to 
the available resources e.g. 1 slot = 100mb of available ram. A task which 
requests 1G RAM takes 10 Slots. But this way we can only schedule based on one 
resource (in this example RAM) not based on multiple resources (RAM and CPU).

What we did to resolve the downsights:

- Switched from Celery Executor to Celery Kubernetes Executor
- All small "management" tasks just run on the celery nodes (to eliminate pod 
overhread), all computation tasks run on Kubernetes
- Each computation task is assigned with a Kubernetes resource request for CPU 
und RAM demands
- Kubernetes only picks tasks from the queue if the resources are available

This way we can just scale the resources within the Kubernetes cluster and 
Kubernetes will manage the resource demands and always trys to max out all 
available resources.

During testing all seems to be working just fine this way except for the issue 
that airflow removes the (expectedly) waiting tasks from the queue after a 
while.

What kind of issues would a int max value for "task_queued_timeout" cause ? 
Does it really mean tasks stuck in the queue for known reasons (e.g. Kubernetes 
api error) are not removed before the task_queued_timeout is reached ?

I think the main issue is not the removal of tasks from the queue, this would 
still be fine if the task is just "put back to backlog" and latern on scheduled 
again. It is more the "Failure" state of the task which seems to be causing the 
issues on our side.

We first thought "put back to backlog" would be the equivalent "retry" but this 
also seems to be not the same because there is no way to differentiate between 
"retry tasks stuck in queue" and "retry failed tasks". 

Retry because of "task stuck in queue" in combination with "retry backoff" 
would be great and maybe solve our issue.
But for tasks failing during execution (computation failed) retry does not make 
sense in our case. Running the exact same computation on the exact same data 
would just cause the same failure again. But there seems to be no mechanism to 
retry only "stuck in queued" tasks but not "failed during execution" tasks.  

 
 
 

GitHub link: 
https://github.com/apache/airflow/discussions/45503#discussioncomment-11789623

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to