[ 
https://issues.apache.org/jira/browse/AIRFLOW-6920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

t oo updated AIRFLOW-6920:
--------------------------
    Description: 
*LUIGI* vs *AIRFLOW*

 

200 sequential tasks (so no parallelism):

 

+LUIGI:+
 mkdir -p test_output8
 pip install luigi
 #no need to start web server, scheduler or meta db
 #*8.3secs* total time for all 200
 time python3 -m luigi --module cloop --local-scheduler ManyMany

 

+AIRFLOW v2.0.0rc1 with:+
 #*513 sec* total time for all 200, .1-.2s per task but 1-2sec gap between tasks
 #intention was for tasks in the DAG to be completely sequential ie task 3 must 
wait for task 2 which must wait for task 1..etc but chain() not working as 
intended?? so used default_pool=1
 airflow initdb
 nohup airflow webserver -p 8080 &
 nohup airflow scheduler &
 airflow dags trigger looper3
 #look at dagrun start-endtime

 

+AIRFLOW v1.10.7:+
 #*1032 sec* total time for all 200, .16s per task but 5sec gap between tasks
 #intention was for tasks in the DAG to be completely sequential ie task 3 must 
wait for task 2 which must wait for task 1..etc but chain() not working as 
intended?? so used default_pool=1
 airflow initdb
 nohup airflow webserver -p 8080 &
 nohup airflow scheduler &
 airflow trigger_dag looper2
 #look at dagrun start-endtime

 

cloop.py
{code:java}
import os
#import time

import luigi

# To run:
# cd ~/luigi_workflows
# pythonpath=.. luigi --module=luigi_workflows.test_resources ManyMany 
--workers=100

class Sleep(luigi.Task):
    #resources = {'foo': 10}

    fname = luigi.Parameter()

    def requires(self):
        #print(self)
        zin=self.fname
        ii=int(zin.split('_')[1])
        if ii > 1:
            return Sleep(fname='marker_{}'.format(ii-1))
        else:
            []

    def full_path(self):
        return os.path.join(os.path.dirname(os.path.realpath(__file__)), 
'test_output8', self.fname)

    def run(self):
        #time.sleep(1)
        with open(self.full_path(), 'w') as f:
            print('', file=f)

    def output(self):
        return luigi.LocalTarget(self.full_path())


class Many(luigi.WrapperTask):
    n = luigi.IntParameter()

    def requires(self):
        for i in range(self.n):
            yield Sleep(fname='marker_{}'.format(i))


class ManyMany(luigi.WrapperTask):
    n = luigi.IntParameter(default=200)

    def requires(self):
        for i in range(self.n):
            yield Many(n=self.n)
{code}
looper3.py
{code:java}
import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.helpers import chain

args = {
    'owner': 'airflow',
    'retries': 300,
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='looper3', default_args=args,
    schedule_interval=None)

def print_context(ds, **kwargs):
    print(1)


chain([PythonOperator(python_callable=print_context,task_id='op' + str(i), 
dag=dag,pool='default_pool') for i in range(1, 201)])

if __name__ == "__main__":
    dag.cli()

{code}
looper2.py
{code:java}
import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.helpers import chain

args = {
    'owner': 'airflow',
    'retries': 3,
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='looper2', default_args=args,
    schedule_interval=None)

chain([DummyOperator(task_id='op' + str(i), dag=dag) for i in range(1, 201)])

if __name__ == "__main__":
    dag.cli()
{code}
I saw similar test in 
 [https://github.com/apache/airflow/pull/5096] but it did not seem to be 
sequential or using scheduler

Possible test scenarios:
 1. 1 DAG with 200 tasks running sequentially
 2. 1 DAG with 200 tasks running all in parallel (200 slots)
 3. 1 DAG with 200 tasks running all in parallel (48 slots)
 4. 200 DAGs each with 1 task
 Then repeat above changing 200 to 2000 or 20.etc

Qs: 
 1. any plans for an 'in-memory' scheduler like Luigi's? 
 2. Anyone open to a Luigi Operator? 
 3. Any speedups to make existing scheduler faster? Noting that the tasks here 
are sequential

ControlM comparison:
 is it envisioned that airflow becomes a replacement for 
[https://www.bmcsoftware.uk/it-solutions/control-m.html] ?
 execution_date seems similar to Order Date, DAG seems similar to job, tasks in 
a dag seem similar to a command called by a job but some of the items I see 
missing:
 1. integrating public holiday calendars,
 2. ability to specify schedule like 11am on '2nd weekday of the month', 'last 
5 days of the month', 'last business day of the month'
 3. ability to visualise dependencies between dags (there does not seem to be a 
high level way to say at 11am schedule DAGc after DAGa and DAGb, then at 3pm 
schedule DAGd after DAGc only if DAGc was successful )
 4. ability to click 1 to many dags in a UI and change their state to 
killed/success (force ok).etc and have it instantly affect task instances (ie 
stopping them)
 5. ability to set whole DAGs to 'dummy' on certain days of the week. ie DAGb 
(runs 7 days a week and do stuff) must run after DAGa for each execdate (DAGa 
should do stuff on mon-fri but on sat/sun DAGa should 'do' nothing ie entire 
dag is 'dummy' just to satisfy 'IN condition' of DAGb)
 6. ability to change the number of tasks within a DAG for a diff exec date 
without 'stuffing' up the scheduler/metadb
 7. ability to 'order up' any day in the past/future (for all or some dags) and 
keep it on 'hold', visualise which dags 'would' be scheduled, see dag 
dependencies, and choose to run all/some (or just do nothing and delete them) 
of the DAGs while maintaining dependencies between them and optionally 'forcing 
ok' some to skip dependencies.
 8. ability to feed in conf (ie arguments) to a DAG from a UI or change the 
host the dag runs on
 9. ability to rerun an entire 'exec date' and maintain audit trail in the db 
of timings of the 1st run of that exec date, plus allow different conf on 2nd 
run.
 10. faster execution,
 a) it seems if I want 15 different dag ids of 300 tasks each and all should 
run exact same tasks (just with different conf arguments) the dagbag has to 
parse 4500 tasks instead of recognising a single set of 300 differed only by 
conf
 b) 'push' flow of tasks within a dag, rather than gaps between tasks
 c) scheduler does not get overloaded with 100k tasks 
 11. dagrun timeout (without maxruns constraint)
 12. enforce depends on prior exec date of a dag with schedules that may only 
be weekly, certain days a week
 13. multi pools (ie quantitative resources) on a single dag
 14. ability to edit schedules via the UI
 15. audit trail of changes to a DAG (not tasks but things like schedule, runas 
user)

At the moment:
 ControlM=Enterprise features, stability, speed but no python definitions of 
tasks
 Luigi=Speed and python definitions of tasks but no scheduling
 Airflow=Community momentum and python definitions of tasks but not fast and 
lacking some features of ControlM

  was:
*LUIGI* vs *AIRFLOW*

 

200 sequential tasks (so no parallelism):

 

+LUIGI:+
 mkdir -p test_output8
 pip install luigi
 #no need to start web server, scheduler or meta db
 #*8.3secs* total time for all 200
 time python3 -m luigi --module cloop --local-scheduler ManyMany

 

+AIRFLOW v2.0.0alpha1 with [https://github.com/apache/airflow/pull/11797] fix:+
 #*940 sec* total time for all 200, .25s per task but 3.5sec gap between tasks
 #intention was for tasks in the DAG to be completely sequential ie task 3 must 
wait for task 2 which must wait for task 1..etc but chain() not working as 
intended?? so used default_pool=1
 airflow initdb
 nohup airflow webserver -p 8080 &
 nohup airflow scheduler &
 airflow dags trigger looper3
 #look at dagrun start-endtime

 

+AIRFLOW v1.10.7:+
 #*1032 sec* total time for all 200, .16s per task but 5sec gap between tasks
 #intention was for tasks in the DAG to be completely sequential ie task 3 must 
wait for task 2 which must wait for task 1..etc but chain() not working as 
intended?? so used default_pool=1
 airflow initdb
 nohup airflow webserver -p 8080 &
 nohup airflow scheduler &
 airflow trigger_dag looper2
 #look at dagrun start-endtime

 

cloop.py
{code:java}
import os
#import time

import luigi

# To run:
# cd ~/luigi_workflows
# pythonpath=.. luigi --module=luigi_workflows.test_resources ManyMany 
--workers=100

class Sleep(luigi.Task):
    #resources = {'foo': 10}

    fname = luigi.Parameter()

    def requires(self):
        #print(self)
        zin=self.fname
        ii=int(zin.split('_')[1])
        if ii > 1:
            return Sleep(fname='marker_{}'.format(ii-1))
        else:
            []

    def full_path(self):
        return os.path.join(os.path.dirname(os.path.realpath(__file__)), 
'test_output8', self.fname)

    def run(self):
        #time.sleep(1)
        with open(self.full_path(), 'w') as f:
            print('', file=f)

    def output(self):
        return luigi.LocalTarget(self.full_path())


class Many(luigi.WrapperTask):
    n = luigi.IntParameter()

    def requires(self):
        for i in range(self.n):
            yield Sleep(fname='marker_{}'.format(i))


class ManyMany(luigi.WrapperTask):
    n = luigi.IntParameter(default=200)

    def requires(self):
        for i in range(self.n):
            yield Many(n=self.n)
{code}
looper3.py
{code:java}
import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.helpers import chain

args = {
    'owner': 'airflow',
    'retries': 300,
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='looper3', default_args=args,
    schedule_interval=None)

def print_context(ds, **kwargs):
    print(1)


chain([PythonOperator(python_callable=print_context,task_id='op' + str(i), 
dag=dag,pool='default_pool') for i in range(1, 201)])

if __name__ == "__main__":
    dag.cli()

{code}
looper2.py
{code:java}
import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.helpers import chain

args = {
    'owner': 'airflow',
    'retries': 3,
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='looper2', default_args=args,
    schedule_interval=None)

chain([DummyOperator(task_id='op' + str(i), dag=dag) for i in range(1, 201)])

if __name__ == "__main__":
    dag.cli()
{code}
I saw similar test in 
 [https://github.com/apache/airflow/pull/5096] but it did not seem to be 
sequential or using scheduler

Possible test scenarios:
 1. 1 DAG with 200 tasks running sequentially
 2. 1 DAG with 200 tasks running all in parallel (200 slots)
 3. 1 DAG with 200 tasks running all in parallel (48 slots)
 4. 200 DAGs each with 1 task
 Then repeat above changing 200 to 2000 or 20.etc

Qs: 
 1. any plans for an 'in-memory' scheduler like Luigi's? 
 2. Anyone open to a Luigi Operator? 
 3. Any speedups to make existing scheduler faster? Noting that the tasks here 
are sequential

ControlM comparison:
 is it envisioned that airflow becomes a replacement for 
[https://www.bmcsoftware.uk/it-solutions/control-m.html] ?
 execution_date seems similar to Order Date, DAG seems similar to job, tasks in 
a dag seem similar to a command called by a job but some of the items I see 
missing:
 1. integrating public holiday calendars,
 2. ability to specify schedule like 11am on '2nd weekday of the month', 'last 
5 days of the month', 'last business day of the month'
 3. ability to visualise dependencies between dags (there does not seem to be a 
high level way to say at 11am schedule DAGc after DAGa and DAGb, then at 3pm 
schedule DAGd after DAGc only if DAGc was successful )
 4. ability to click 1 to many dags in a UI and change their state to 
killed/success (force ok).etc and have it instantly affect task instances (ie 
stopping them)
 5. ability to set whole DAGs to 'dummy' on certain days of the week. ie DAGb 
(runs 7 days a week and do stuff) must run after DAGa for each execdate (DAGa 
should do stuff on mon-fri but on sat/sun DAGa should 'do' nothing ie entire 
dag is 'dummy' just to satisfy 'IN condition' of DAGb)
 6. ability to change the number of tasks within a DAG for a diff exec date 
without 'stuffing' up the scheduler/metadb
 7. ability to 'order up' any day in the past/future (for all or some dags) and 
keep it on 'hold', visualise which dags 'would' be scheduled, see dag 
dependencies, and choose to run all/some (or just do nothing and delete them) 
of the DAGs while maintaining dependencies between them and optionally 'forcing 
ok' some to skip dependencies.
 8. ability to feed in conf (ie arguments) to a DAG from a UI or change the 
host the dag runs on
 9. ability to rerun an entire 'exec date' and maintain audit trail in the db 
of timings of the 1st run of that exec date, plus allow different conf on 2nd 
run.
 10. faster execution,
 a) it seems if I want 15 different dag ids of 300 tasks each and all should 
run exact same tasks (just with different conf arguments) the dagbag has to 
parse 4500 tasks instead of recognising a single set of 300 differed only by 
conf
 b) 'push' flow of tasks within a dag, rather than gaps between tasks
 c) scheduler does not get overloaded with 100k tasks 
 11. dagrun timeout (without maxruns constraint)
 12. enforce depends on prior exec date of a dag with schedules that may only 
be weekly, certain days a week
 13. multi pools (ie quantitative resources) on a single dag
 14. ability to edit schedules via the UI
 15. audit trail of changes to a DAG (not tasks but things like schedule, runas 
user)

At the moment:
 ControlM=Enterprise features, stability, speed but no python definitions of 
tasks
 Luigi=Speed and python definitions of tasks but no scheduling
 Airflow=Community momentum and python definitions of tasks but not fast and 
lacking some features of ControlM


> AIRFLOW Feature Parity with LUIGI & CONTROLM 
> ---------------------------------------------
>
>                 Key: AIRFLOW-6920
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6920
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: tests
>    Affects Versions: 1.10.7
>            Reporter: t oo
>            Priority: Major
>
> *LUIGI* vs *AIRFLOW*
>  
> 200 sequential tasks (so no parallelism):
>  
> +LUIGI:+
>  mkdir -p test_output8
>  pip install luigi
>  #no need to start web server, scheduler or meta db
>  #*8.3secs* total time for all 200
>  time python3 -m luigi --module cloop --local-scheduler ManyMany
>  
> +AIRFLOW v2.0.0rc1 with:+
>  #*513 sec* total time for all 200, .1-.2s per task but 1-2sec gap between 
> tasks
>  #intention was for tasks in the DAG to be completely sequential ie task 3 
> must wait for task 2 which must wait for task 1..etc but chain() not working 
> as intended?? so used default_pool=1
>  airflow initdb
>  nohup airflow webserver -p 8080 &
>  nohup airflow scheduler &
>  airflow dags trigger looper3
>  #look at dagrun start-endtime
>  
> +AIRFLOW v1.10.7:+
>  #*1032 sec* total time for all 200, .16s per task but 5sec gap between tasks
>  #intention was for tasks in the DAG to be completely sequential ie task 3 
> must wait for task 2 which must wait for task 1..etc but chain() not working 
> as intended?? so used default_pool=1
>  airflow initdb
>  nohup airflow webserver -p 8080 &
>  nohup airflow scheduler &
>  airflow trigger_dag looper2
>  #look at dagrun start-endtime
>  
> cloop.py
> {code:java}
> import os
> #import time
> import luigi
> # To run:
> # cd ~/luigi_workflows
> # pythonpath=.. luigi --module=luigi_workflows.test_resources ManyMany 
> --workers=100
> class Sleep(luigi.Task):
>     #resources = {'foo': 10}
>     fname = luigi.Parameter()
>     def requires(self):
>         #print(self)
>         zin=self.fname
>         ii=int(zin.split('_')[1])
>         if ii > 1:
>             return Sleep(fname='marker_{}'.format(ii-1))
>         else:
>             []
>     def full_path(self):
>         return os.path.join(os.path.dirname(os.path.realpath(__file__)), 
> 'test_output8', self.fname)
>     def run(self):
>         #time.sleep(1)
>         with open(self.full_path(), 'w') as f:
>             print('', file=f)
>     def output(self):
>         return luigi.LocalTarget(self.full_path())
> class Many(luigi.WrapperTask):
>     n = luigi.IntParameter()
>     def requires(self):
>         for i in range(self.n):
>             yield Sleep(fname='marker_{}'.format(i))
> class ManyMany(luigi.WrapperTask):
>     n = luigi.IntParameter(default=200)
>     def requires(self):
>         for i in range(self.n):
>             yield Many(n=self.n)
> {code}
> looper3.py
> {code:java}
> import airflow
> from airflow.models import DAG
> from airflow.operators.bash_operator import BashOperator
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.operators.python_operator import PythonOperator
> from airflow.utils.helpers import chain
> args = {
>     'owner': 'airflow',
>     'retries': 300,
>     'start_date': airflow.utils.dates.days_ago(2)
> }
> dag = DAG(
>     dag_id='looper3', default_args=args,
>     schedule_interval=None)
> def print_context(ds, **kwargs):
>     print(1)
> chain([PythonOperator(python_callable=print_context,task_id='op' + str(i), 
> dag=dag,pool='default_pool') for i in range(1, 201)])
> if __name__ == "__main__":
>     dag.cli()
> {code}
> looper2.py
> {code:java}
> import airflow
> from airflow.models import DAG
> from airflow.operators.bash_operator import BashOperator
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.utils.helpers import chain
> args = {
>     'owner': 'airflow',
>     'retries': 3,
>     'start_date': airflow.utils.dates.days_ago(2)
> }
> dag = DAG(
>     dag_id='looper2', default_args=args,
>     schedule_interval=None)
> chain([DummyOperator(task_id='op' + str(i), dag=dag) for i in range(1, 201)])
> if __name__ == "__main__":
>     dag.cli()
> {code}
> I saw similar test in 
>  [https://github.com/apache/airflow/pull/5096] but it did not seem to be 
> sequential or using scheduler
> Possible test scenarios:
>  1. 1 DAG with 200 tasks running sequentially
>  2. 1 DAG with 200 tasks running all in parallel (200 slots)
>  3. 1 DAG with 200 tasks running all in parallel (48 slots)
>  4. 200 DAGs each with 1 task
>  Then repeat above changing 200 to 2000 or 20.etc
> Qs: 
>  1. any plans for an 'in-memory' scheduler like Luigi's? 
>  2. Anyone open to a Luigi Operator? 
>  3. Any speedups to make existing scheduler faster? Noting that the tasks 
> here are sequential
> ControlM comparison:
>  is it envisioned that airflow becomes a replacement for 
> [https://www.bmcsoftware.uk/it-solutions/control-m.html] ?
>  execution_date seems similar to Order Date, DAG seems similar to job, tasks 
> in a dag seem similar to a command called by a job but some of the items I 
> see missing:
>  1. integrating public holiday calendars,
>  2. ability to specify schedule like 11am on '2nd weekday of the month', 
> 'last 5 days of the month', 'last business day of the month'
>  3. ability to visualise dependencies between dags (there does not seem to be 
> a high level way to say at 11am schedule DAGc after DAGa and DAGb, then at 
> 3pm schedule DAGd after DAGc only if DAGc was successful )
>  4. ability to click 1 to many dags in a UI and change their state to 
> killed/success (force ok).etc and have it instantly affect task instances (ie 
> stopping them)
>  5. ability to set whole DAGs to 'dummy' on certain days of the week. ie DAGb 
> (runs 7 days a week and do stuff) must run after DAGa for each execdate (DAGa 
> should do stuff on mon-fri but on sat/sun DAGa should 'do' nothing ie entire 
> dag is 'dummy' just to satisfy 'IN condition' of DAGb)
>  6. ability to change the number of tasks within a DAG for a diff exec date 
> without 'stuffing' up the scheduler/metadb
>  7. ability to 'order up' any day in the past/future (for all or some dags) 
> and keep it on 'hold', visualise which dags 'would' be scheduled, see dag 
> dependencies, and choose to run all/some (or just do nothing and delete them) 
> of the DAGs while maintaining dependencies between them and optionally 
> 'forcing ok' some to skip dependencies.
>  8. ability to feed in conf (ie arguments) to a DAG from a UI or change the 
> host the dag runs on
>  9. ability to rerun an entire 'exec date' and maintain audit trail in the db 
> of timings of the 1st run of that exec date, plus allow different conf on 2nd 
> run.
>  10. faster execution,
>  a) it seems if I want 15 different dag ids of 300 tasks each and all should 
> run exact same tasks (just with different conf arguments) the dagbag has to 
> parse 4500 tasks instead of recognising a single set of 300 differed only by 
> conf
>  b) 'push' flow of tasks within a dag, rather than gaps between tasks
>  c) scheduler does not get overloaded with 100k tasks 
>  11. dagrun timeout (without maxruns constraint)
>  12. enforce depends on prior exec date of a dag with schedules that may only 
> be weekly, certain days a week
>  13. multi pools (ie quantitative resources) on a single dag
>  14. ability to edit schedules via the UI
>  15. audit trail of changes to a DAG (not tasks but things like schedule, 
> runas user)
> At the moment:
>  ControlM=Enterprise features, stability, speed but no python definitions of 
> tasks
>  Luigi=Speed and python definitions of tasks but no scheduling
>  Airflow=Community momentum and python definitions of tasks but not fast and 
> lacking some features of ControlM



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to