
t oo commented on AIRFLOW-6920:

I was using LocalExecutor

> AIRFLOW Feature Parity with LUIGI & CONTROLM 
> ---------------------------------------------
>                 Key: AIRFLOW-6920
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6920
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: tests
>    Affects Versions: 1.10.7
>            Reporter: t oo
>            Priority: Major
> 200 sequential tasks (so no parallelism):
> +LUIGI:+
>  mkdir -p test_output8
>  pip install luigi
>  #no need to start web server, scheduler or meta db
>  #*8.3secs* total time for all 200
>  time python3 -m luigi --module cloop --local-scheduler ManyMany
> +AIRFLOW v2.0.0rc1 with:+
>  #*513 sec* total time for all 200, .1-.2s per task but 1-2sec gap between 
> tasks
>  #intention was for tasks in the DAG to be completely sequential ie task 3 
> must wait for task 2 which must wait for task 1..etc but chain() not working 
> as intended?? so used default_pool=1
>  airflow initdb
>  nohup airflow webserver -p 8080 &
>  nohup airflow scheduler &
>  airflow dags trigger looper3
>  #look at dagrun start-endtime
> +AIRFLOW v1.10.7:+
>  #*1032 sec* total time for all 200, .16s per task but 5sec gap between tasks
>  #intention was for tasks in the DAG to be completely sequential ie task 3 
> must wait for task 2 which must wait for task 1..etc but chain() not working 
> as intended?? so used default_pool=1
>  airflow initdb
>  nohup airflow webserver -p 8080 &
>  nohup airflow scheduler &
>  airflow trigger_dag looper2
>  #look at dagrun start-endtime
> cloop.py
> {code:java}
> import os
> #import time
> import luigi
> # To run:
> # cd ~/luigi_workflows
> # pythonpath=.. luigi --module=luigi_workflows.test_resources ManyMany 
> --workers=100
> class Sleep(luigi.Task):
>     #resources = {'foo': 10}
>     fname = luigi.Parameter()
>     def requires(self):
>         #print(self)
>         zin=self.fname
>         ii=int(zin.split('_')[1])
>         if ii > 1:
>             return Sleep(fname='marker_{}'.format(ii-1))
>         else:
>             []
>     def full_path(self):
>         return os.path.join(os.path.dirname(os.path.realpath(__file__)), 
> 'test_output8', self.fname)
>     def run(self):
>         #time.sleep(1)
>         with open(self.full_path(), 'w') as f:
>             print('', file=f)
>     def output(self):
>         return luigi.LocalTarget(self.full_path())
> class Many(luigi.WrapperTask):
>     n = luigi.IntParameter()
>     def requires(self):
>         for i in range(self.n):
>             yield Sleep(fname='marker_{}'.format(i))
> class ManyMany(luigi.WrapperTask):
>     n = luigi.IntParameter(default=200)
>     def requires(self):
>         for i in range(self.n):
>             yield Many(n=self.n)
> {code}
> looper3.py
> {code:java}
> import airflow
> from airflow.models import DAG
> from airflow.operators.bash_operator import BashOperator
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.operators.python_operator import PythonOperator
> from airflow.utils.helpers import chain
> args = {
>     'owner': 'airflow',
>     'retries': 300,
>     'start_date': airflow.utils.dates.days_ago(2)
> }
> dag = DAG(
>     dag_id='looper3', default_args=args,
>     schedule_interval=None)
> def print_context(ds, **kwargs):
>     print(1)
> chain([PythonOperator(python_callable=print_context,task_id='op' + str(i), 
> dag=dag,pool='default_pool') for i in range(1, 201)])
> if __name__ == "__main__":
>     dag.cli()
> {code}
> looper2.py
> {code:java}
> import airflow
> from airflow.models import DAG
> from airflow.operators.bash_operator import BashOperator
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.utils.helpers import chain
> args = {
>     'owner': 'airflow',
>     'retries': 3,
>     'start_date': airflow.utils.dates.days_ago(2)
> }
> dag = DAG(
>     dag_id='looper2', default_args=args,
>     schedule_interval=None)
> chain([DummyOperator(task_id='op' + str(i), dag=dag) for i in range(1, 201)])
> if __name__ == "__main__":
>     dag.cli()
> {code}
> I saw similar test in 
>  [https://github.com/apache/airflow/pull/5096] but it did not seem to be 
> sequential or using scheduler
> Possible test scenarios:
>  1. 1 DAG with 200 tasks running sequentially
>  2. 1 DAG with 200 tasks running all in parallel (200 slots)
>  3. 1 DAG with 200 tasks running all in parallel (48 slots)
>  4. 200 DAGs each with 1 task
>  Then repeat above changing 200 to 2000 or 20.etc
> Qs: 
>  1. any plans for an 'in-memory' scheduler like Luigi's? 
>  2. Anyone open to a Luigi Operator? 
>  3. Any speedups to make existing scheduler faster? Noting that the tasks 
> here are sequential
> ControlM comparison:
>  is it envisioned that airflow becomes a replacement for 
> [https://www.bmcsoftware.uk/it-solutions/control-m.html] ?
>  execution_date seems similar to Order Date, DAG seems similar to job, tasks 
> in a dag seem similar to a command called by a job but some of the items I 
> see missing:
>  1. integrating public holiday calendars,
>  2. ability to specify schedule like 11am on '2nd weekday of the month', 
> 'last 5 days of the month', 'last business day of the month'
>  3. ability to visualise dependencies between dags (there does not seem to be 
> a high level way to say at 11am schedule DAGc after DAGa and DAGb, then at 
> 3pm schedule DAGd after DAGc only if DAGc was successful )
>  4. ability to click 1 to many dags in a UI and change their state to 
> killed/success (force ok).etc and have it instantly affect task instances (ie 
> stopping them)
>  5. ability to set whole DAGs to 'dummy' on certain days of the week. ie DAGb 
> (runs 7 days a week and do stuff) must run after DAGa for each execdate (DAGa 
> should do stuff on mon-fri but on sat/sun DAGa should 'do' nothing ie entire 
> dag is 'dummy' just to satisfy 'IN condition' of DAGb)
>  6. ability to change the number of tasks within a DAG for a diff exec date 
> without 'stuffing' up the scheduler/metadb
>  7. ability to 'order up' any day in the past/future (for all or some dags) 
> and keep it on 'hold', visualise which dags 'would' be scheduled, see dag 
> dependencies, and choose to run all/some (or just do nothing and delete them) 
> of the DAGs while maintaining dependencies between them and optionally 
> 'forcing ok' some to skip dependencies.
>  8. ability to feed in conf (ie arguments) to a DAG from a UI or change the 
> host the dag runs on
>  9. ability to rerun an entire 'exec date' and maintain audit trail in the db 
> of timings of the 1st run of that exec date, plus allow different conf on 2nd 
> run.
>  10. faster execution,
>  a) it seems if I want 15 different dag ids of 300 tasks each and all should 
> run exact same tasks (just with different conf arguments) the dagbag has to 
> parse 4500 tasks instead of recognising a single set of 300 differed only by 
> conf
>  b) 'push' flow of tasks within a dag, rather than gaps between tasks
>  c) scheduler does not get overloaded with 100k tasks 
>  11. dagrun timeout (without maxruns constraint)
>  12. enforce depends on prior exec date of a dag with schedules that may only 
> be weekly, certain days a week
>  13. multi pools (ie quantitative resources) on a single dag
>  14. ability to edit schedules via the UI
>  15. audit trail of changes to a DAG (not tasks but things like schedule, 
> runas user)
> At the moment:
>  ControlM=Enterprise features, stability, speed but no python definitions of 
> tasks
>  Luigi=Speed and python definitions of tasks but no scheduling
>  Airflow=Community momentum and python definitions of tasks but not fast and 
> lacking some features of ControlM

This message was sent by Atlassian Jira

Reply via email to