mik-laj opened a new pull request #6905: [AIRFLOW-6361] Run LocalTaskJob 
directly in Celery task
URL: https://github.com/apache/airflow/pull/6905
 
 
   Hello,
   
   The executor runs multiple processes to perform one task. Many processes 
have a very short life cycle, so the process of starting it is a significant 
overhead.
   
   Firstly, the Celery executor trigger Celery tasks - app.task. This task runs 
the CLI  command (first process), which contains LocalTaskJob. LocalTaskJob 
runs the separate command (second process) that executes user-code. This level 
of isolation is redundant because LocalTaskJob doesn't execute unsafe code. The 
first command is run by a new process creation, not by a fork, so this is an 
expensive operation. I suggest running code from the first process as part of 
the celery task to reduce the need to create new processes.
   
   The code currently uses CLIFactory to run the LocalTaskJob It is better to 
do this without unnecessary dependence on CLI, but it is a big change and I 
plan to do it in a separate PR.
   WIP PR: https://github.com/mik-laj/incubator-airflow/pull/10 (Travis green 
:-D )
   
   Performance benchmark: 
   ===================
   Example DAG from Airflow with unneeded sleep instructions deleted.
   ```python
   """Example DAG demonstrating the usage of the BashOperator."""
   
   from datetime import timedelta
   
   import airflow
   from airflow.models import DAG
   from airflow.operators.bash_operator import BashOperator
   from airflow.operators.dummy_operator import DummyOperator
   
   args = {
       'owner': 'airflow',
       'start_date': airflow.utils.dates.days_ago(2),
   }
   
   dag = DAG(
       dag_id='example_bash_operator',
       default_args=args,
       schedule_interval='0 0 * * *',
       dagrun_timeout=timedelta(minutes=60),
   )
   
   run_this_last = DummyOperator(
       task_id='run_this_last',
       dag=dag,
   )
   
   # [START howto_operator_bash]
   run_this = BashOperator(
       task_id='run_after_loop',
       bash_command='echo 1',
       dag=dag,
   )
   # [END howto_operator_bash]
   
   run_this >> run_this_last
   
   for i in range(3):
       task = BashOperator(
           task_id='runme_' + str(i),
           bash_command='echo "{{ task_instance_key_str }}",
           dag=dag,
       )
       task >> run_this
   
   # [START howto_operator_bash_template]
   also_run_this = BashOperator(
       task_id='also_run_this',
       bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
       dag=dag,
   )
   # [END howto_operator_bash_template]
   also_run_this >> run_this_last
   
   if __name__ == "__main__":
       dag.cli()
   
   ```
   ```python
   import airflow
   from airflow import DAG
   from airflow.models import DagBag
   
   dagbag = airflow.models.DagBag()
   dag: DAG = dagbag.get_dag("example_bash_operator")
   
   dag.clear()
   dag.run()
   ```
   Environment: Brreze
   ```
   unset AIRFLOW__CORE__DAGS_FOLDER
   unset AIRFLOW__CORE__UNIT_TEST_MODE
   chmod -R 777 /root
   sudo -E su airflow
   export AIRFLOW__CORE__EXECUTOR="CeleryExecutor"
   export AIRFLOW__CELERY__BROKER_URL="redis://redis:6379/0"
   export AIRFLOW__CELERY__WORKER_CONCURRENCY=8
   seq 1 10 | xargs -n 1 -I {} bash -c "time python /files/benchmark_speed.py > 
/dev/null 2>&1" | grep '^(real\|user\|sys)';
   ```
   
   Result: 
   
   |Fn.     | After | Before | Change|
   |--------|-------|--------|-------|
   |AVERAGE | 56.48 | 38.32  | -32%  |
   |VAR     | 23.60 | 0.04   | -98%  |
   |MAX     | 68.29 | 38.68  | -43%  |
   |MIN     | 53.26 | 38.08  | -28%  |
   |STDEV   | 4.86  | 0.19   | -96%. |
   
   Raw data
   After:
   ```
   real 0m38.394s
   user 0m4.340s
   sys 0m1.600s
   
   real 0m38.355s
   user 0m4.700s
   sys 0m1.340s
   
   real 0m38.675s
   user 0m4.760s
   sys 0m1.530s
   
   real 0m38.488s
   user 0m4.770s
   sys 0m1.280s
   
   real 0m38.434s
   user 0m4.600s
   sys 0m1.390s
   
   real 0m38.378s
   user 0m4.500s
   sys 0m1.270s
   
   real 0m38.106s
   user 0m4.200s
   sys 0m1.100s
   
   real 0m38.082s
   user 0m4.170s
   sys 0m1.030s
   
   real 0m38.173s
   user 0m4.290s
   sys 0m1.340s
   
   real 0m38.161s
   user 0m4.460s
   sys 0m1.370s
   ```
   
   Before:
   ```
   real 0m53.488s
   user 0m5.140s
   sys 0m1.700s
   
   real 1m8.288s
   user 0m6.430s
   sys 0m2.200s
   
   real 0m53.371s
   user 0m5.330s
   sys 0m1.630s
   
   real 0m58.939s
   user 0m6.470s
   sys 0m1.730s
   
   real 0m53.255s
   user 0m4.950s
   sys 0m1.640s
   
   real 0m58.802s
   user 0m5.970s
   sys 0m1.790s
   
   real 0m58.449s
   user 0m5.380s
   sys 0m1.580s
   
   real 0m53.308s
   user 0m5.120s
   sys 0m1.430s
   
   real 0m53.485s
   user 0m5.220s
   sys 0m1.290s
   
   real 0m53.387s
   user 0m5.020s
   sys 0m1.590s
   ```
   
   ---
   Link to JIRA issue: https://issues.apache.org/jira/browse/AIRFLOW-6361
   
   - [x] Description above provides context of the change
   - [x] Commit message starts with `[AIRFLOW-NNNN]`, where AIRFLOW-NNNN = JIRA 
ID*
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   (*) For document-only changes, no JIRA issue is needed. Commit message 
starts `[AIRFLOW-XXXX]`.
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to