[
https://issues.apache.org/jira/browse/AIRFLOW-6361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17046846#comment-17046846
]
ASF GitHub Bot commented on AIRFLOW-6361:
-----------------------------------------
mik-laj commented on pull request #6905: [AIRFLOW-6361] Run LocalTaskJob
directly in Celery task
URL: https://github.com/apache/airflow/pull/6905
Hello,
The executor runs multiple processes to perform one task. Many processes
have a very short life cycle, so the process of starting it is a significant
overhead.
Firstly, the Celery executor trigger Celery tasks - app.task. This task runs
the CLI command (first process), which contains LocalTaskJob. LocalTaskJob
runs the separate command (second process) that executes user-code. This level
of isolation is redundant because LocalTaskJob doesn't execute unsafe code. The
first command is run by a new process creation, not by a fork, so this is an
expensive operation. I suggest running code from the first process as part of
the celery task to reduce the need to create new processes.
The code currently uses CLIFactory to run the LocalTaskJob It is better to
do this without unnecessary dependence on CLI, but it is a big change and I
plan to do it in a separate PR.
WIP PR: https://github.com/mik-laj/incubator-airflow/pull/10 (Travis green
:-D )
Performance benchmark:
===================
Example DAG from Airflow with unneeded sleep instructions deleted.
```python
"""Example DAG demonstrating the usage of the BashOperator."""
from datetime import timedelta
import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
dag_id='example_bash_operator',
default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60),
)
run_this_last = DummyOperator(
task_id='run_this_last',
dag=dag,
)
# [START howto_operator_bash]
run_this = BashOperator(
task_id='run_after_loop',
bash_command='echo 1',
dag=dag,
)
# [END howto_operator_bash]
run_this >> run_this_last
for i in range(3):
task = BashOperator(
task_id='runme_' + str(i),
bash_command='echo "{{ task_instance_key_str }}",
dag=dag,
)
task >> run_this
# [START howto_operator_bash_template]
also_run_this = BashOperator(
task_id='also_run_this',
bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
dag=dag,
)
# [END howto_operator_bash_template]
also_run_this >> run_this_last
if __name__ == "__main__":
dag.cli()
```
```python
import airflow
from airflow import DAG
from airflow.models import DagBag
dagbag = airflow.models.DagBag()
dag: DAG = dagbag.get_dag("example_bash_operator")
dag.clear()
dag.run()
```
Environment: Brreze
```
unset AIRFLOW__CORE__DAGS_FOLDER
unset AIRFLOW__CORE__UNIT_TEST_MODE
chmod -R 777 /root
sudo -E su airflow
export AIRFLOW__CORE__EXECUTOR="CeleryExecutor"
export AIRFLOW__CELERY__BROKER_URL="redis://redis:6379/0"
export AIRFLOW__CELERY__WORKER_CONCURRENCY=8
seq 1 10 | xargs -n 1 -I {} bash -c "time python /files/benchmark_speed.py >
/dev/null 2>&1" | grep '^(real\|user\|sys)';
```
Result:
|Fn. | After | Before | Change|
|--------|-------|--------|-------|
|AVERAGE | 56.48 | 38.32 | -32% |
|VAR | 23.60 | 0.04 | -98% |
|MAX | 68.29 | 38.68 | -43% |
|MIN | 53.26 | 38.08 | -28% |
|STDEV | 4.86 | 0.19 | -96%. |
Raw data
After:
```
real 0m38.394s
user 0m4.340s
sys 0m1.600s
real 0m38.355s
user 0m4.700s
sys 0m1.340s
real 0m38.675s
user 0m4.760s
sys 0m1.530s
real 0m38.488s
user 0m4.770s
sys 0m1.280s
real 0m38.434s
user 0m4.600s
sys 0m1.390s
real 0m38.378s
user 0m4.500s
sys 0m1.270s
real 0m38.106s
user 0m4.200s
sys 0m1.100s
real 0m38.082s
user 0m4.170s
sys 0m1.030s
real 0m38.173s
user 0m4.290s
sys 0m1.340s
real 0m38.161s
user 0m4.460s
sys 0m1.370s
```
Before:
```
real 0m53.488s
user 0m5.140s
sys 0m1.700s
real 1m8.288s
user 0m6.430s
sys 0m2.200s
real 0m53.371s
user 0m5.330s
sys 0m1.630s
real 0m58.939s
user 0m6.470s
sys 0m1.730s
real 0m53.255s
user 0m4.950s
sys 0m1.640s
real 0m58.802s
user 0m5.970s
sys 0m1.790s
real 0m58.449s
user 0m5.380s
sys 0m1.580s
real 0m53.308s
user 0m5.120s
sys 0m1.430s
real 0m53.485s
user 0m5.220s
sys 0m1.290s
real 0m53.387s
user 0m5.020s
sys 0m1.590s
```
---
Link to JIRA issue: https://issues.apache.org/jira/browse/AIRFLOW-6361
- [x] Description above provides context of the change
- [x] Commit message starts with `[AIRFLOW-NNNN]`, where AIRFLOW-NNNN = JIRA
ID*
- [x] Unit tests coverage for changes (not needed for documentation changes)
- [x] Commits follow "[How to write a good git commit
message](http://chris.beams.io/posts/git-commit/)"
- [x] Relevant documentation is updated including usage instructions.
- [x] I will engage committers as explained in [Contribution Workflow
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
(*) For document-only changes, no JIRA issue is needed. Commit message
starts `[AIRFLOW-XXXX]`.
---
In case of fundamental code change, Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party
License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
Read the [Pull Request
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
for more information.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Run LocalTaskJob directly in Celery task
> ----------------------------------------
>
> Key: AIRFLOW-6361
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6361
> Project: Apache Airflow
> Issue Type: Improvement
> Components: executors
> Affects Versions: 1.10.6
> Reporter: Kamil Bregula
> Priority: Major
> Labels: performance
>
> Hello,
> Celery runs the CLI first command, which contains LocalTaskJob. LocalTaskJob
> is responsible for starting the next user-code process. This level of
> isolation is redundant because LocalTaskJob doesn't execute unsafe code. The
> first command is run by a new process creation, not by a fork, so this is an
> expensive operation.
> According to preliminary measurements, this change results in an increase in
> performance close to 30%.
> I will provide more information in PR.
> Best regards
> Kamil Bregula
> After:
> ```
> real 0m38.394s
> user 0m4.340s
> sys 0m1.600s
> real 0m38.355s
> user 0m4.700s
> sys 0m1.340s
> real 0m38.675s
> user 0m4.760s
> sys 0m1.530s
> real 0m38.488s
> user 0m4.770s
> sys 0m1.280s
> real 0m38.434s
> user 0m4.600s
> sys 0m1.390s
> real 0m38.378s
> user 0m4.500s
> sys 0m1.270s
> real 0m38.106s
> user 0m4.200s
> sys 0m1.100s
> real 0m38.082s
> user 0m4.170s
> sys 0m1.030s
> real 0m38.173s
> user 0m4.290s
> sys 0m1.340s
> real 0m38.161s
> user 0m4.460s
> sys 0m1.370s
> ```
> Before:
> ```
> real 0m53.488s
> user 0m5.140s
> sys 0m1.700s
> real 1m8.288s
> user 0m6.430s
> sys 0m2.200s
> real 0m53.371s
> user 0m5.330s
> sys 0m1.630s
> real 0m58.939s
> user 0m6.470s
> sys 0m1.730s
> real 0m53.255s
> user 0m4.950s
> sys 0m1.640s
> real 0m58.802s
> user 0m5.970s
> sys 0m1.790s
> real 0m58.449s
> user 0m5.380s
> sys 0m1.580s
> real 0m53.308s
> user 0m5.120s
> sys 0m1.430s
> real 0m53.485s
> user 0m5.220s
> sys 0m1.290s
> real 0m53.387s
> user 0m5.020s
> sys 0m1.590s
> ```
--
This message was sent by Atlassian Jira
(v8.3.4#803005)