fshehadeh opened a new issue, #56444:
URL: https://github.com/apache/airflow/issues/56444

   ### Apache Airflow version
   
   3.1.0
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   We run Airflow with a LocalExecutor that in turn runs tasks in an AWS ECS 
cluster. We have a situation where we want a large number of tasks to run at 
the same time (some tasks consume data from other tasks). We have been using 
this setup without issues with Airflow 2.x, using a parallelism configuration 
that matches the number of desired tasks. However, when we upgraded to Airflow 
3.x, we noticed that some of the tasks would run right away, while others would 
get queued, and won't execute until the first group of tasks has completed, 
suggesting that the local executor was unable to spawn enough children tasks to 
match the parallelism configuration.
   
   I created a DAG that created a few long running tasks and started a few run 
in parallel, expecting that the number of spawned children would match the 
parallelism configuration, but that didn't happen. The total number of children 
was always smaller than the max.
   
   Looking at the code, I noticed this comment 
https://github.com/apache/airflow/blob/4ecebc2973587ebaa2cb12482de82e93d15c092f/airflow-core/src/airflow/executors/local_executor.py#L186
   >         # If we're using spawn in multiprocessing (default on macOS now) 
to start tasks, this can get called a
   >         # via `sync()` a few times before the spawned process actually 
starts picking up messages. Try not to
   >         # create too much
   >
   >        need_more_workers = len(self.workers) < num_outstanding
   >        if need_more_workers and (self.parallelism == 0 or 
len(self.workers) < self.parallelism):
   >             # This only creates one worker, which is fine as we call this 
directly after putting a message on
   >             # activity_queue in execute_async
   >             self._spawn_worker()
   
   The code can effectively skip spawning a new child, when it sees that we 
have enough workers to cover the outstanding tasks. Unfortunately, these 
workers might actually be already running other tasks, which means that we have 
to wait for them to complete before they start working on the new tasks. This 
defeats the purpose and definition of parallelism. I don't think the check is 
necessary, even if spawning of children might occur in an async manner. The 
important thing is to compare the number of requested workers (no matter if 
they have started or not) and the parallelism config.
   
   In my setup, I ended up patching the above file, removing the check for 
"need_more_workers", and with this change I can now see that the number of 
children can match the parallelism configuration.
   
   As a side note: our tasks can take a long time to complete (more than 10 
minutes), and when the the queued tasks eventually start, they would call the 
API to mark the task as started, but this call would fail with 403, because the 
JWT token would have expired by then. Changing the API JWT expiry might have 
fixed thee 403 error, but the root problem is that not enough tasks were 
allowed to run in parallel.
   
   This is probably a low priority issue, since I guess it is not common to use 
a local executor, and demand high parallelism at the same time.
   
   ### What you think should happen instead?
   
   The maximum number of children tasks spawned by Airflow should match the 
parallelism config.
   
   ### How to reproduce
   
   I created the following DAG to ramp up the number of tasks, and verify how 
many can run in parallel. I used it while configuring Airflow to use the 
LocalExecutor, and the default of 32 for parallelism
   
   ```
   from airflow.models import DAG
   from airflow.models import Variable
   from airflow.providers.standard.operators.python import PythonOperator
   import time
   
   import logging
   LOGGER = logging.getLogger(__name__)
   
   
   # This DAG is meant to help verify the configuration of maximum
   # tasks in the LocalExecutor
   
   
   def simulate_work():
       count = 20
       LOGGER.info(f"Running the loop for {count} times")
       for i in range(count):
           LOGGER.info(f"Attempt: {i+1}")
           time.sleep(60)
       LOGGER.info(f"Done")
   
   dag = DAG(
           dag_id='Test_Parallelism_Bug',
           max_active_runs=10,
           max_active_tasks=100,
           catchup=False
       )
   
   for i in range(5):
       task =  PythonOperator(task_id=f"task-{i}",
           python_callable=simulate_work, dag=dag, retries=0)
   ```
   
   ### Operating System
   
   Using Airflow docker image
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to