ashb commented on issue #5615: [AIRFLOW-5035] Remove multiprocessing.Manager 
in-favour of Pipes
URL: https://github.com/apache/airflow/pull/5615#issuecomment-515402559
 
 
   I'm running a comparisson between 1.10.3, 1.10.4rc3 and this branch with the 
following dag:
   
   ```
   from airflow import DAG
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.operators.python_operator import BranchPythonOperator
   from airflow.operators.bash_operator import BashOperator
   from datetime import datetime, timedelta
   
   default_args = {
       'owner': 'airflow',
       'depends_on_past': False,
       'email_on_failure': False,
       'email_on_retry': False,
       'retries': 1,
       'retry_delay': timedelta(minutes=5),
   }
   
   
   def make_dag(id):
       dag = DAG(
           dag_id=id,
           start_date=datetime(2019, 7, 1),
           schedule_interval=timedelta(minutes=10),
           default_args=default_args,
           catchup=False,
       )
   
       with dag:
           start = DummyOperator(task_id='start')
   
           bulk = [
               BashOperator(
                   task_id=f'print_date_{n+1}',
                   bash_command='for i in 1 2 3 4 5 6 7 8 9 10; do sleep $[ ( 
$RANDOM % 10 ) + 1 ]s; date; done',
               ) for n in range(20)
           ]
   
           start >> bulk
   
           def should_run(**kwargs):
               print('------------- exec dttm = {} and minute = {}'.
                     format(kwargs['execution_date'], 
kwargs['execution_date'].minute))
               if kwargs['execution_date'].minute % 20 == 0:
                   return "dummy_task_1"
               else:
                   return "dummy_task_2"
   
           cond = BranchPythonOperator(
               task_id='condition',
               provide_context=True,
               python_callable=should_run,
               dag=dag,
           )
   
           cond << bulk
   
           dummy_task_1 = DummyOperator(task_id='dummy_task_1', dag=dag)
           dummy_task_2 = DummyOperator(task_id='dummy_task_2', dag=dag)
           cond >> [dummy_task_1, dummy_task_2]
   
       return dag
   
   
   for n in range(20):
       id = f'dag_{n:02d}'
       globals()[id] = make_dag(id)
   ```
   
   I should have numbers by the end of the day with any luck

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to