[ https://issues.apache.org/jira/browse/AIRFLOW-6920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17292260#comment-17292260 ]
t oo commented on AIRFLOW-6920: ------------------------------- I was using LocalExecutor > AIRFLOW Feature Parity with LUIGI & CONTROLM > --------------------------------------------- > > Key: AIRFLOW-6920 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6920 > Project: Apache Airflow > Issue Type: Improvement > Components: tests > Affects Versions: 1.10.7 > Reporter: t oo > Priority: Major > > *LUIGI* vs *AIRFLOW* > > 200 sequential tasks (so no parallelism): > > +LUIGI:+ > mkdir -p test_output8 > pip install luigi > #no need to start web server, scheduler or meta db > #*8.3secs* total time for all 200 > time python3 -m luigi --module cloop --local-scheduler ManyMany > > +AIRFLOW v2.0.0rc1 with:+ > #*513 sec* total time for all 200, .1-.2s per task but 1-2sec gap between > tasks > #intention was for tasks in the DAG to be completely sequential ie task 3 > must wait for task 2 which must wait for task 1..etc but chain() not working > as intended?? so used default_pool=1 > airflow initdb > nohup airflow webserver -p 8080 & > nohup airflow scheduler & > airflow dags trigger looper3 > #look at dagrun start-endtime > > +AIRFLOW v1.10.7:+ > #*1032 sec* total time for all 200, .16s per task but 5sec gap between tasks > #intention was for tasks in the DAG to be completely sequential ie task 3 > must wait for task 2 which must wait for task 1..etc but chain() not working > as intended?? so used default_pool=1 > airflow initdb > nohup airflow webserver -p 8080 & > nohup airflow scheduler & > airflow trigger_dag looper2 > #look at dagrun start-endtime > > cloop.py > {code:java} > import os > #import time > import luigi > # To run: > # cd ~/luigi_workflows > # pythonpath=.. luigi --module=luigi_workflows.test_resources ManyMany > --workers=100 > class Sleep(luigi.Task): > #resources = {'foo': 10} > fname = luigi.Parameter() > def requires(self): > #print(self) > zin=self.fname > ii=int(zin.split('_')[1]) > if ii > 1: > return Sleep(fname='marker_{}'.format(ii-1)) > else: > [] > def full_path(self): > return os.path.join(os.path.dirname(os.path.realpath(__file__)), > 'test_output8', self.fname) > def run(self): > #time.sleep(1) > with open(self.full_path(), 'w') as f: > print('', file=f) > def output(self): > return luigi.LocalTarget(self.full_path()) > class Many(luigi.WrapperTask): > n = luigi.IntParameter() > def requires(self): > for i in range(self.n): > yield Sleep(fname='marker_{}'.format(i)) > class ManyMany(luigi.WrapperTask): > n = luigi.IntParameter(default=200) > def requires(self): > for i in range(self.n): > yield Many(n=self.n) > {code} > looper3.py > {code:java} > import airflow > from airflow.models import DAG > from airflow.operators.bash_operator import BashOperator > from airflow.operators.dummy_operator import DummyOperator > from airflow.operators.python_operator import PythonOperator > from airflow.utils.helpers import chain > args = { > 'owner': 'airflow', > 'retries': 300, > 'start_date': airflow.utils.dates.days_ago(2) > } > dag = DAG( > dag_id='looper3', default_args=args, > schedule_interval=None) > def print_context(ds, **kwargs): > print(1) > chain([PythonOperator(python_callable=print_context,task_id='op' + str(i), > dag=dag,pool='default_pool') for i in range(1, 201)]) > if __name__ == "__main__": > dag.cli() > {code} > looper2.py > {code:java} > import airflow > from airflow.models import DAG > from airflow.operators.bash_operator import BashOperator > from airflow.operators.dummy_operator import DummyOperator > from airflow.utils.helpers import chain > args = { > 'owner': 'airflow', > 'retries': 3, > 'start_date': airflow.utils.dates.days_ago(2) > } > dag = DAG( > dag_id='looper2', default_args=args, > schedule_interval=None) > chain([DummyOperator(task_id='op' + str(i), dag=dag) for i in range(1, 201)]) > if __name__ == "__main__": > dag.cli() > {code} > I saw similar test in > [https://github.com/apache/airflow/pull/5096] but it did not seem to be > sequential or using scheduler > Possible test scenarios: > 1. 1 DAG with 200 tasks running sequentially > 2. 1 DAG with 200 tasks running all in parallel (200 slots) > 3. 1 DAG with 200 tasks running all in parallel (48 slots) > 4. 200 DAGs each with 1 task > Then repeat above changing 200 to 2000 or 20.etc > Qs: > 1. any plans for an 'in-memory' scheduler like Luigi's? > 2. Anyone open to a Luigi Operator? > 3. Any speedups to make existing scheduler faster? Noting that the tasks > here are sequential > ControlM comparison: > is it envisioned that airflow becomes a replacement for > [https://www.bmcsoftware.uk/it-solutions/control-m.html] ? > execution_date seems similar to Order Date, DAG seems similar to job, tasks > in a dag seem similar to a command called by a job but some of the items I > see missing: > 1. integrating public holiday calendars, > 2. ability to specify schedule like 11am on '2nd weekday of the month', > 'last 5 days of the month', 'last business day of the month' > 3. ability to visualise dependencies between dags (there does not seem to be > a high level way to say at 11am schedule DAGc after DAGa and DAGb, then at > 3pm schedule DAGd after DAGc only if DAGc was successful ) > 4. ability to click 1 to many dags in a UI and change their state to > killed/success (force ok).etc and have it instantly affect task instances (ie > stopping them) > 5. ability to set whole DAGs to 'dummy' on certain days of the week. ie DAGb > (runs 7 days a week and do stuff) must run after DAGa for each execdate (DAGa > should do stuff on mon-fri but on sat/sun DAGa should 'do' nothing ie entire > dag is 'dummy' just to satisfy 'IN condition' of DAGb) > 6. ability to change the number of tasks within a DAG for a diff exec date > without 'stuffing' up the scheduler/metadb > 7. ability to 'order up' any day in the past/future (for all or some dags) > and keep it on 'hold', visualise which dags 'would' be scheduled, see dag > dependencies, and choose to run all/some (or just do nothing and delete them) > of the DAGs while maintaining dependencies between them and optionally > 'forcing ok' some to skip dependencies. > 8. ability to feed in conf (ie arguments) to a DAG from a UI or change the > host the dag runs on > 9. ability to rerun an entire 'exec date' and maintain audit trail in the db > of timings of the 1st run of that exec date, plus allow different conf on 2nd > run. > 10. faster execution, > a) it seems if I want 15 different dag ids of 300 tasks each and all should > run exact same tasks (just with different conf arguments) the dagbag has to > parse 4500 tasks instead of recognising a single set of 300 differed only by > conf > b) 'push' flow of tasks within a dag, rather than gaps between tasks > c) scheduler does not get overloaded with 100k tasks > 11. dagrun timeout (without maxruns constraint) > 12. enforce depends on prior exec date of a dag with schedules that may only > be weekly, certain days a week > 13. multi pools (ie quantitative resources) on a single dag > 14. ability to edit schedules via the UI > 15. audit trail of changes to a DAG (not tasks but things like schedule, > runas user) > At the moment: > ControlM=Enterprise features, stability, speed but no python definitions of > tasks > Luigi=Speed and python definitions of tasks but no scheduling > Airflow=Community momentum and python definitions of tasks but not fast and > lacking some features of ControlM -- This message was sent by Atlassian Jira (v8.3.4#803005)