[ 
https://issues.apache.org/jira/browse/AIRFLOW-6454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

t oo updated AIRFLOW-6454:
--------------------------
    Description: 
*LUIGI* vs *AIRFLOW*

 

200 sequential tasks (so no parallelism):

 

+LUIGI:+
 mkdir -p test_output8
 #*8.3secs* total time for all 200 (from Downloads folder)
 time python3 -m luigi --module cloop --local-scheduler ManyMany

 

+AIRFLOW:+
 #*1032 sec* total time for all 200, .16s per task but 5sec gap between tasks
 #intention was for tasks in the DAG to be completely sequential ie task 3 must 
wait for task 2 which must wait for task 1..etc but chain() not working as 
intended??
 airflow trigger_dag looper2
 #look at dagrun start-endtime

 

cloop.py
{code:java}
import os
#import time

import luigi

# To run:
# cd ~/luigi_workflows
# pythonpath=.. luigi --module=luigi_workflows.test_resources ManyMany 
--workers=100

class Sleep(luigi.Task):
    #resources = {'foo': 10}

    fname = luigi.Parameter()

    def requires(self):
        #print(self)
        zin=self.fname
        ii=int(zin.split('_')[1])
        if ii > 1:
            return Sleep(fname='marker_{}'.format(ii-1))
        else:
            []

    def full_path(self):
        return os.path.join(os.path.dirname(os.path.realpath(__file__)), 
'test_output8', self.fname)

    def run(self):
        #time.sleep(1)
        with open(self.full_path(), 'w') as f:
            print('', file=f)

    def output(self):
        return luigi.LocalTarget(self.full_path())


class Many(luigi.WrapperTask):
    n = luigi.IntParameter()

    def requires(self):
        for i in range(self.n):
            yield Sleep(fname='marker_{}'.format(i))


class ManyMany(luigi.WrapperTask):
    n = luigi.IntParameter(default=200)

    def requires(self):
        for i in range(self.n):
            yield Many(n=self.n)
{code}
looper2.py
{code:java}
import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.helpers import chain

args = {
    'owner': 'airflow',
    'retries': 3,
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='looper2', default_args=args,
    schedule_interval=None)

chain([DummyOperator(task_id='op' + str(i), dag=dag) for i in range(1, 201)])

if __name__ == "__main__":
    dag.cli()
{code}

I saw similar test in 
https://github.com/apache/airflow/pull/5096 but it did not seem to be 
sequential or using scheduler



  was:
*LUIGI* vs *AIRFLOW*

200 sequential tasks:

LUIGI:
mkdir -p test_output8
#*8.3secs* total time for all 200 (from Downloads folder)
time python3 -m luigi --module cloop --local-scheduler ManyMany

AIRFLOW:
#*1032 sec* total time for all 200, .16s per task but 5sec gap between tasks
#intention was for tasks in the DAG to be completely sequential ie task 3 must 
wait for task 2 which must wait for task 1..etc but chain() not working as 
intended??
airflow trigger_dag looper2
#look at dagrun start-endtime


cloop.py
`import os
#import time

import luigi

# To run:
# cd ~/luigi_workflows
# pythonpath=.. luigi --module=luigi_workflows.test_resources ManyMany 
--workers=100

class Sleep(luigi.Task):
    #resources = {'foo': 10}

    fname = luigi.Parameter()

    def requires(self):
        #print(self)
        zin=self.fname
        ii=int(zin.split('_')[1])
        if ii > 1:
            return Sleep(fname='marker_{}'.format(ii-1))
        else:
            []

    def full_path(self):
        return os.path.join(os.path.dirname(os.path.realpath(__file__)), 
'test_output8', self.fname)

    def run(self):
        #time.sleep(1)
        with open(self.full_path(), 'w') as f:
            print('', file=f)

    def output(self):
        return luigi.LocalTarget(self.full_path())


class Many(luigi.WrapperTask):
    n = luigi.IntParameter()

    def requires(self):
        for i in range(self.n):
            yield Sleep(fname='marker_{}'.format(i))


class ManyMany(luigi.WrapperTask):
    n = luigi.IntParameter(default=200)

    def requires(self):
        for i in range(self.n):
            yield Many(n=self.n)
`


looper2.py
`import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.helpers import chain

args = {
    'owner': 'airflow',
    'retries': 3,
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='looper2', default_args=args,
    schedule_interval=None)

chain([DummyOperator(task_id='op' + str(i), dag=dag) for i in range(1, 201)])

if __name__ == "__main__":
    dag.cli()
`





> add test for time taken by scheduler to run dag of diff num of tasks (2 vs 20 
> vs 200 vs 2000 vs 20000 simple 1 line print tasks)
> --------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-6454
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6454
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: tests
>    Affects Versions: 1.10.7
>            Reporter: t oo
>            Priority: Major
>
> *LUIGI* vs *AIRFLOW*
>  
> 200 sequential tasks (so no parallelism):
>  
> +LUIGI:+
>  mkdir -p test_output8
>  #*8.3secs* total time for all 200 (from Downloads folder)
>  time python3 -m luigi --module cloop --local-scheduler ManyMany
>  
> +AIRFLOW:+
>  #*1032 sec* total time for all 200, .16s per task but 5sec gap between tasks
>  #intention was for tasks in the DAG to be completely sequential ie task 3 
> must wait for task 2 which must wait for task 1..etc but chain() not working 
> as intended??
>  airflow trigger_dag looper2
>  #look at dagrun start-endtime
>  
> cloop.py
> {code:java}
> import os
> #import time
> import luigi
> # To run:
> # cd ~/luigi_workflows
> # pythonpath=.. luigi --module=luigi_workflows.test_resources ManyMany 
> --workers=100
> class Sleep(luigi.Task):
>     #resources = {'foo': 10}
>     fname = luigi.Parameter()
>     def requires(self):
>         #print(self)
>         zin=self.fname
>         ii=int(zin.split('_')[1])
>         if ii > 1:
>             return Sleep(fname='marker_{}'.format(ii-1))
>         else:
>             []
>     def full_path(self):
>         return os.path.join(os.path.dirname(os.path.realpath(__file__)), 
> 'test_output8', self.fname)
>     def run(self):
>         #time.sleep(1)
>         with open(self.full_path(), 'w') as f:
>             print('', file=f)
>     def output(self):
>         return luigi.LocalTarget(self.full_path())
> class Many(luigi.WrapperTask):
>     n = luigi.IntParameter()
>     def requires(self):
>         for i in range(self.n):
>             yield Sleep(fname='marker_{}'.format(i))
> class ManyMany(luigi.WrapperTask):
>     n = luigi.IntParameter(default=200)
>     def requires(self):
>         for i in range(self.n):
>             yield Many(n=self.n)
> {code}
> looper2.py
> {code:java}
> import airflow
> from airflow.models import DAG
> from airflow.operators.bash_operator import BashOperator
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.utils.helpers import chain
> args = {
>     'owner': 'airflow',
>     'retries': 3,
>     'start_date': airflow.utils.dates.days_ago(2)
> }
> dag = DAG(
>     dag_id='looper2', default_args=args,
>     schedule_interval=None)
> chain([DummyOperator(task_id='op' + str(i), dag=dag) for i in range(1, 201)])
> if __name__ == "__main__":
>     dag.cli()
> {code}
> I saw similar test in 
> https://github.com/apache/airflow/pull/5096 but it did not seem to be 
> sequential or using scheduler



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to