fshehadeh opened a new issue, #56444: URL: https://github.com/apache/airflow/issues/56444
### Apache Airflow version 3.1.0 ### If "Other Airflow 2 version" selected, which one? _No response_ ### What happened? We run Airflow with a LocalExecutor that in turn runs tasks in an AWS ECS cluster. We have a situation where we want a large number of tasks to run at the same time (some tasks consume data from other tasks). We have been using this setup without issues with Airflow 2.x, using a parallelism configuration that matches the number of desired tasks. However, when we upgraded to Airflow 3.x, we noticed that some of the tasks would run right away, while others would get queued, and won't execute until the first group of tasks has completed, suggesting that the local executor was unable to spawn enough children tasks to match the parallelism configuration. I created a DAG that created a few long running tasks and started a few run in parallel, expecting that the number of spawned children would match the parallelism configuration, but that didn't happen. The total number of children was always smaller than the max. Looking at the code, I noticed this comment https://github.com/apache/airflow/blob/4ecebc2973587ebaa2cb12482de82e93d15c092f/airflow-core/src/airflow/executors/local_executor.py#L186 > # If we're using spawn in multiprocessing (default on macOS now) to start tasks, this can get called a > # via `sync()` a few times before the spawned process actually starts picking up messages. Try not to > # create too much > > need_more_workers = len(self.workers) < num_outstanding > if need_more_workers and (self.parallelism == 0 or len(self.workers) < self.parallelism): > # This only creates one worker, which is fine as we call this directly after putting a message on > # activity_queue in execute_async > self._spawn_worker() The code can effectively skip spawning a new child, when it sees that we have enough workers to cover the outstanding tasks. Unfortunately, these workers might actually be already running other tasks, which means that we have to wait for them to complete before they start working on the new tasks. This defeats the purpose and definition of parallelism. I don't think the check is necessary, even if spawning of children might occur in an async manner. The important thing is to compare the number of requested workers (no matter if they have started or not) and the parallelism config. In my setup, I ended up patching the above file, removing the check for "need_more_workers", and with this change I can now see that the number of children can match the parallelism configuration. As a side note: our tasks can take a long time to complete (more than 10 minutes), and when the the queued tasks eventually start, they would call the API to mark the task as started, but this call would fail with 403, because the JWT token would have expired by then. Changing the API JWT expiry might have fixed thee 403 error, but the root problem is that not enough tasks were allowed to run in parallel. This is probably a low priority issue, since I guess it is not common to use a local executor, and demand high parallelism at the same time. ### What you think should happen instead? The maximum number of children tasks spawned by Airflow should match the parallelism config. ### How to reproduce I created the following DAG to ramp up the number of tasks, and verify how many can run in parallel. I used it while configuring Airflow to use the LocalExecutor, and the default of 32 for parallelism ``` from airflow.models import DAG from airflow.models import Variable from airflow.providers.standard.operators.python import PythonOperator import time import logging LOGGER = logging.getLogger(__name__) # This DAG is meant to help verify the configuration of maximum # tasks in the LocalExecutor def simulate_work(): count = 20 LOGGER.info(f"Running the loop for {count} times") for i in range(count): LOGGER.info(f"Attempt: {i+1}") time.sleep(60) LOGGER.info(f"Done") dag = DAG( dag_id='Test_Parallelism_Bug', max_active_runs=10, max_active_tasks=100, catchup=False ) for i in range(5): task = PythonOperator(task_id=f"task-{i}", python_callable=simulate_work, dag=dag, retries=0) ``` ### Operating System Using Airflow docker image ### Versions of Apache Airflow Providers _No response_ ### Deployment Other Docker-based deployment ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
