yuanke7 commented on issue #18422:
URL: https://github.com/apache/airflow/issues/18422#issuecomment-926301771


   > Can you please share the python code of your DAG?
   > I'll see if I can replicate the issue.
   
   Sure, thanks for your help !!!
   `
   #!/usr/bin/python3
   # -*- coding: utf-8 -*-
   # @Time : 2021/8/20 上午9:55
   # @Author : yuankq1
   # @File : mingle_dag.py
   import pendulum
   from airflow import DAG
   from proj.config import WIN_SERVER_1, SERVER_1_HEADERS
   from proj.utils.dag_related import start_wait
   from datetime import timedelta, datetime, time
   from airflow.sensors.time_sensor import TimeSensor
   from airflow.operators.python import PythonOperator
   from airflow.providers.http.operators.http import SimpleHttpOperator
   
   default_args = {
       'owner' : 'yuankq1',
       'depends_on_past' : False,
   }
   
   yest = datetime.date(datetime.today() + timedelta(-7))
   dag = DAG(
       dag_id='mingle_dag',
       default_args=default_args,
       description='A mingle dag',
       concurrency=8,
       max_active_runs=1,
       catchup=False,
       start_date=datetime(yest.year, yest.month, yest.day, 
tzinfo=pendulum.timezone("Asia/Shanghai")),
       schedule_interval='0 8 * * 1-5',
       tags=['yuankq1', 'mingle', 'varies', 'api'],  # search dags by tags in 
web
   )
   
   start = PythonOperator(
       task_id='start_wait',
       python_callable=start_wait,
       dag=dag
   )
   # sensor
   sensor_09 = TimeSensor(
       task_id='wait_until_09_00',
       target_time=time(9, 0)
   )
   sensor_13 = TimeSensor(
       task_id='wait_until_13_00',
       target_time=time(13, 0),
   )
   sensor_14 = TimeSensor(
       task_id='wait_until_14_00',
       target_time=time(14, 0)
   )
   sensor_22 = TimeSensor(
       task_id='wait_until_22_00',
       target_time=time(22, 0)
   )
   sensor_10_30 = TimeSensor(
       task_id='wait_until_10_30',
       target_time=time(10, 30)
   )
   sensor_15_30 = TimeSensor(
       task_id='wait_until_15_30',
       target_time=time(15, 30)
   )
   sensor_17 = TimeSensor(
       task_id='wait_until_17',
       target_time=time(17, 0)
   )
   sensor_17_30 = TimeSensor(
       task_id='wait_until_17_30',
       target_time=time(17, 30)
   )
   
   # tasks
   save_hub_report = SimpleHttpOperator(
       task_id='save_hub_report',
       method='GET',
       http_conn_id=WIN_SERVER_1,
       endpoint='general/save_hub_report',
       response_check=lambda response : response.json().get('Success'),
       response_filter=lambda response : response.json(),
       execution_timeout=timedelta(minutes=30),
       dag=dag,
   )
   
   matrix_collection = SimpleHttpOperator(
       task_id='matrix_collection',
       method='GET',
       http_conn_id=WIN_SERVER_1,
       endpoint='general/matrix_collection',
       response_check=lambda response : response.json().get('Success'),
       response_filter=lambda response : response.json(),
       dag=dag,
       execution_timeout=timedelta(minutes=10),
   )
   
   psd_report = SimpleHttpOperator(
       task_id='psd_report',
       method='GET',
       http_conn_id=WIN_SERVER_1,
       endpoint='general/psd_report',
       response_check=lambda response : response.json().get('Success'),
       response_filter=lambda response : response.json(),
       execution_timeout=timedelta(hours=1),
       dag=dag,
   )
   
   
   def flow_a(order_type: str = None, execute_time: str = None) :
       flow_a = SimpleHttpOperator(
           task_id=f'flow_a_{order_type}_{execute_time}',
           method='GET',
           http_conn_id=WIN_SERVER_1,
           endpoint='sap/flow_a',
           data={'order_type' : order_type},
           response_check=lambda response : response.json().get('Success'),
           response_filter=lambda response : response.json(),
           execution_timeout=timedelta(hours=3),
           depends_on_past=True,
           dag=dag,
       )
       return flow_a
   
   
   flow_a_ww2dc_10_30 = flow_a(order_type='ww2dc', execute_time='10_30')
   flow_a_ww2dc_17_30 = flow_a(order_type='ww2dc', execute_time='17_30')
   flow_a_other_type_10_30 = flow_a(order_type='other_type', 
execute_time='10_30')
   flow_a_other_type_15_30 = flow_a(order_type='other_type', 
execute_time='15_30')
   flow_a_other_type_17_30 = flow_a(order_type='other_type', 
execute_time='17_30')
   
   flow_b = SimpleHttpOperator(
       task_id='flow_b',
       method='GET',
       http_conn_id=WIN_SERVER_1,
       endpoint='sap/flow_b',
       response_check=lambda response : response.json().get('Success'),
       response_filter=lambda response : response.json(),
       execution_timeout=timedelta(hours=5),
       dag=dag,
   )
   
   flow_d = SimpleHttpOperator(
       task_id='flow_d',
       method='GET',
       http_conn_id=WIN_SERVER_1,
       endpoint='sap/flow_d',
       response_check=lambda response : response.json().get('Success'),
       response_filter=lambda response : response.json(),
       execution_timeout=timedelta(hours=3),
       dag=dag,
   )
   
   po_change = SimpleHttpOperator(
       task_id='po_change',
       method='GET',
       http_conn_id=WIN_SERVER_1,
       endpoint='sap/po_change',
       response_check=lambda response : response.json().get('Success'),
       response_filter=lambda response : response.json(),
       execution_timeout=timedelta(minutes=15),
       dag=dag,
   )
   
   
   def flow_a_monitor_func(task_id) :
       return SimpleHttpOperator(
           task_id=f'{task_id}',
           method='GET',
           http_conn_id=WIN_SERVER_1,
           endpoint='sap/flow_a/monitor_repeat',
           headers=SERVER_1_HEADERS,
           response_check=lambda response : response.json().get('Success'),
           response_filter=lambda response : response.json(),
           execution_timeout=timedelta(minutes=5),
           dag=dag,
       )
   
   
   flow_a_monitor = flow_a_monitor_func('flow_a_monitor')
   flow_a_monitor2 = flow_a_monitor_func('flow_a_monitor2')
   
   # sel_crawler = SimpleHttpOperator(
   #     task_id='sel_crawler',
   #     method='GET',
   #     http_conn_id=WIN_SERVER_1,
   #     endpoint='general/sel_crawler',
   #     response_check=lambda response : response.json().get('Success'),
   #     response_filter=lambda response : response.json(),
   #     execution_timeout=timedelta(minutes=10),
   #     dag=dag,
   # )
   
   start >> matrix_collection
   start >> flow_a_monitor
   start >> sensor_10_30 >> sensor_13 >> sensor_14 >> sensor_15_30 >> sensor_17 
>> sensor_17_30
   sensor_10_30 >> flow_a_ww2dc_10_30 >> flow_a_other_type_10_30
   sensor_13 >> psd_report
   sensor_14 >> save_hub_report
   sensor_15_30 >> flow_a_other_type_15_30
   sensor_17 >> po_change
   sensor_17 >> flow_a_monitor2
   sensor_17_30 >> flow_a_ww2dc_17_30 >> flow_a_other_type_17_30
   sensor_17_30 >> flow_d >> flow_b
   `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to