[
https://issues.apache.org/jira/browse/AIRFLOW-6454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
t oo updated AIRFLOW-6454:
--------------------------
Description:
*LUIGI* vs *AIRFLOW*
200 sequential tasks:
LUIGI:
mkdir -p test_output8
#*8.3secs* total time for all 200 (from Downloads folder)
time python3 -m luigi --module cloop --local-scheduler ManyMany
AIRFLOW:
#*1032 sec* total time for all 200, .16s per task but 5sec gap between tasks
#intention was for tasks in the DAG to be completely sequential ie task 3 must
wait for task 2 which must wait for task 1..etc but chain() not working as
intended??
airflow trigger_dag looper2
#look at dagrun start-endtime
cloop.py
`import os
#import time
import luigi
# To run:
# cd ~/luigi_workflows
# pythonpath=.. luigi --module=luigi_workflows.test_resources ManyMany
--workers=100
class Sleep(luigi.Task):
#resources = {'foo': 10}
fname = luigi.Parameter()
def requires(self):
#print(self)
zin=self.fname
ii=int(zin.split('_')[1])
if ii > 1:
return Sleep(fname='marker_{}'.format(ii-1))
else:
[]
def full_path(self):
return os.path.join(os.path.dirname(os.path.realpath(__file__)),
'test_output8', self.fname)
def run(self):
#time.sleep(1)
with open(self.full_path(), 'w') as f:
print('', file=f)
def output(self):
return luigi.LocalTarget(self.full_path())
class Many(luigi.WrapperTask):
n = luigi.IntParameter()
def requires(self):
for i in range(self.n):
yield Sleep(fname='marker_{}'.format(i))
class ManyMany(luigi.WrapperTask):
n = luigi.IntParameter(default=200)
def requires(self):
for i in range(self.n):
yield Many(n=self.n)
`
looper2.py
`import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.helpers import chain
args = {
'owner': 'airflow',
'retries': 3,
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
dag_id='looper2', default_args=args,
schedule_interval=None)
chain([DummyOperator(task_id='op' + str(i), dag=dag) for i in range(1, 201)])
if __name__ == "__main__":
dag.cli()
`
> add test for time taken by scheduler to run dag of diff num of tasks (2 vs 20
> vs 200 vs 2000 vs 20000 simple 1 line print tasks)
> --------------------------------------------------------------------------------------------------------------------------------
>
> Key: AIRFLOW-6454
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6454
> Project: Apache Airflow
> Issue Type: Improvement
> Components: tests
> Affects Versions: 1.10.7
> Reporter: t oo
> Priority: Major
>
> *LUIGI* vs *AIRFLOW*
> 200 sequential tasks:
> LUIGI:
> mkdir -p test_output8
> #*8.3secs* total time for all 200 (from Downloads folder)
> time python3 -m luigi --module cloop --local-scheduler ManyMany
> AIRFLOW:
> #*1032 sec* total time for all 200, .16s per task but 5sec gap between tasks
> #intention was for tasks in the DAG to be completely sequential ie task 3
> must wait for task 2 which must wait for task 1..etc but chain() not working
> as intended??
> airflow trigger_dag looper2
> #look at dagrun start-endtime
> cloop.py
> `import os
> #import time
> import luigi
> # To run:
> # cd ~/luigi_workflows
> # pythonpath=.. luigi --module=luigi_workflows.test_resources ManyMany
> --workers=100
> class Sleep(luigi.Task):
> #resources = {'foo': 10}
> fname = luigi.Parameter()
> def requires(self):
> #print(self)
> zin=self.fname
> ii=int(zin.split('_')[1])
> if ii > 1:
> return Sleep(fname='marker_{}'.format(ii-1))
> else:
> []
> def full_path(self):
> return os.path.join(os.path.dirname(os.path.realpath(__file__)),
> 'test_output8', self.fname)
> def run(self):
> #time.sleep(1)
> with open(self.full_path(), 'w') as f:
> print('', file=f)
> def output(self):
> return luigi.LocalTarget(self.full_path())
> class Many(luigi.WrapperTask):
> n = luigi.IntParameter()
> def requires(self):
> for i in range(self.n):
> yield Sleep(fname='marker_{}'.format(i))
> class ManyMany(luigi.WrapperTask):
> n = luigi.IntParameter(default=200)
> def requires(self):
> for i in range(self.n):
> yield Many(n=self.n)
> `
> looper2.py
> `import airflow
> from airflow.models import DAG
> from airflow.operators.bash_operator import BashOperator
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.utils.helpers import chain
> args = {
> 'owner': 'airflow',
> 'retries': 3,
> 'start_date': airflow.utils.dates.days_ago(2)
> }
> dag = DAG(
> dag_id='looper2', default_args=args,
> schedule_interval=None)
> chain([DummyOperator(task_id='op' + str(i), dag=dag) for i in range(1, 201)])
> if __name__ == "__main__":
> dag.cli()
> `
--
This message was sent by Atlassian Jira
(v8.3.4#803005)