[
https://issues.apache.org/jira/browse/AIRFLOW-6454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17013454#comment-17013454
]
Kamil Bregula commented on AIRFLOW-6454:
----------------------------------------
Can you send this comparison to the mailing list? I think it will be helpful
for many people. Some solutions are low hanging fruit and will be easy to add.
Some feature are available. For example, DAG dependencies:
https://github.com/ms32035/airflow-dag-dependencies Some feature are WIP:
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-17+Airflow+sensor+optimization
> add test for time taken by scheduler to run dag of diff num of tasks (2 vs 20
> vs 200 vs 2000 vs 20000 simple 1 line print tasks)
> --------------------------------------------------------------------------------------------------------------------------------
>
> Key: AIRFLOW-6454
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6454
> Project: Apache Airflow
> Issue Type: Improvement
> Components: tests
> Affects Versions: 1.10.7
> Reporter: t oo
> Priority: Major
>
> *LUIGI* vs *AIRFLOW*
>
> 200 sequential tasks (so no parallelism):
>
> +LUIGI:+
> mkdir -p test_output8
> pip install luigi
> #no need to start web server, scheduler or meta db
> #*8.3secs* total time for all 200
> time python3 -m luigi --module cloop --local-scheduler ManyMany
>
> +AIRFLOW:+
> #*1032 sec* total time for all 200, .16s per task but 5sec gap between tasks
> #intention was for tasks in the DAG to be completely sequential ie task 3
> must wait for task 2 which must wait for task 1..etc but chain() not working
> as intended?? so used default_pool=1
> airflow initdb
> nohup airflow webserver -p 8080 &
> nohup airflow scheduler &
> airflow trigger_dag looper2
> #look at dagrun start-endtime
>
> cloop.py
> {code:java}
> import os
> #import time
> import luigi
> # To run:
> # cd ~/luigi_workflows
> # pythonpath=.. luigi --module=luigi_workflows.test_resources ManyMany
> --workers=100
> class Sleep(luigi.Task):
> #resources = {'foo': 10}
> fname = luigi.Parameter()
> def requires(self):
> #print(self)
> zin=self.fname
> ii=int(zin.split('_')[1])
> if ii > 1:
> return Sleep(fname='marker_{}'.format(ii-1))
> else:
> []
> def full_path(self):
> return os.path.join(os.path.dirname(os.path.realpath(__file__)),
> 'test_output8', self.fname)
> def run(self):
> #time.sleep(1)
> with open(self.full_path(), 'w') as f:
> print('', file=f)
> def output(self):
> return luigi.LocalTarget(self.full_path())
> class Many(luigi.WrapperTask):
> n = luigi.IntParameter()
> def requires(self):
> for i in range(self.n):
> yield Sleep(fname='marker_{}'.format(i))
> class ManyMany(luigi.WrapperTask):
> n = luigi.IntParameter(default=200)
> def requires(self):
> for i in range(self.n):
> yield Many(n=self.n)
> {code}
> looper2.py
> {code:java}
> import airflow
> from airflow.models import DAG
> from airflow.operators.bash_operator import BashOperator
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.utils.helpers import chain
> args = {
> 'owner': 'airflow',
> 'retries': 3,
> 'start_date': airflow.utils.dates.days_ago(2)
> }
> dag = DAG(
> dag_id='looper2', default_args=args,
> schedule_interval=None)
> chain([DummyOperator(task_id='op' + str(i), dag=dag) for i in range(1, 201)])
> if __name__ == "__main__":
> dag.cli()
> {code}
> I saw similar test in
> https://github.com/apache/airflow/pull/5096 but it did not seem to be
> sequential or using scheduler
> Possible test scenarios:
> 1. 1 DAG with 200 tasks running sequentially
> 2. 1 DAG with 200 tasks running all in parallel (200 slots)
> 3. 1 DAG with 200 tasks running all in parallel (48 slots)
> 4. 200 DAGs each with 1 task
> Then repeat above changing 200 to 2000 or 20.etc
> Qs:
> 1. any plans for an 'in-memory' scheduler like Luigi's?
> 2. Anyone open to a Luigi Operator?
> 3. Any speedups to make existing scheduler faster? Noting that the tasks here
> are sequential (should be similar time to 200 dags of 1 task each)
> ControlM comparison:
> is it envisioned that airflow becomes a replacement for
> https://www.bmcsoftware.uk/it-solutions/control-m.html ?
> execution_date seems similar to Order Date, DAG seems similar to job, tasks
> in a dag seem similar to a command called by a job but some of the items I
> see missing:
> 1. integrating public holiday calendars,
> 2. ability to specify schedule like 11am on '2nd weekday of the month',
> 'last 5 days of the month', 'last business day of the month'
> 3. ability to visualise dependencies between dags (there does not seem to
> be a high level way to say at 11am schedule DAGc after DAGa and DAGb, then at
> 3pm schedule DAGd after DAGc only if DAGc was successful )
> 4. ability to click 1 to many dags in a UI and change their state to
> killed/success (force ok).etc and have it instantly affect task instances (ie
> stopping them)
> 5. ability to set whole DAGs to 'dummy' on certain days of the week. ie
> DAGb (runs 7 days a week and do stuff) must run after DAGa for each execdate
> (DAGa should do stuff on mon-fri but on sat/sun DAGa should 'do' nothing ie
> entire dag is 'dummy' just to satisfy 'IN condition' of DAGb)
> 6. ability to change the number of tasks within a DAG for a diff exec date
> without 'stuffing' up the scheduler/metadb
> 7. ability to 'order up' any day in the past/future (for all or some dags)
> and keep it on 'hold', visualise which dags 'would' be scheduled, see dag
> dependencies, and choose to run all/some (or just do nothing and delete them)
> of the DAGs while maintaining dependencies between them and optionally
> 'forcing ok' some to skip dependencies.
> 8. ability to feed in conf (ie arguments) to a DAG from a UI or change the
> host the dag runs on
> 9. ability to rerun an entire 'exec date' and maintain audit trail in the
> db of timings of the 1st run of that exec date, plus allow different conf on
> 2nd run.
> 10. faster execution,
> a) it seems if I want 15 different dag ids of 300 tasks each and all should
> run exact same tasks (just with different conf arguments) the dagbag has to
> parse 4500 tasks instead of recognising a single set of 300 differed only by
> conf
> b) 'push' flow of tasks within a dag, rather than gaps between tasks
> c) scheduler does not get overloaded with 100k tasks
> 11. dagrun timeout (without maxruns constraint)
> 12. enforce depends on prior exec date of a dag with schedules that may
> only be weekly, certain days a week
> 13. multi pools (ie quantitative resources) on a single dag
> 14. ability to edit schedules via the UI
> 15. audit trail of changes to a DAG (not tasks but things like schedule,
> runas user)
> At the moment:
> ControlM=Enterprise features, stability, speed but no python definitions of
> tasks
> Luigi=Speed and python definitions of tasks but no scheduling
> Airflow=Community momentum and python definitions of tasks but not fast and
> lacking some features of ControlM
--
This message was sent by Atlassian Jira
(v8.3.4#803005)