yuanke7 edited a comment on issue #18422:
URL: https://github.com/apache/airflow/issues/18422#issuecomment-926301771
> Can you please share the python code of your DAG?
> I'll see if I can replicate the issue.
Sure, thanks for your help !!!
`
import pendulum
from airflow import DAG
from proj.config import WIN_SERVER_1, SERVER_1_HEADERS
from proj.utils.dag_related import start_wait
from datetime import timedelta, datetime, time
from airflow.sensors.time_sensor import TimeSensor
from airflow.operators.python import PythonOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
default_args = {
'owner' : 'yuankq1',
'depends_on_past' : False,
}
yest = datetime.date(datetime.today() + timedelta(-7))
dag = DAG(
dag_id='mingle_dag',
default_args=default_args,
description='A mingle dag',
concurrency=8,
max_active_runs=1,
catchup=False,
start_date=datetime(yest.year, yest.month, yest.day,
tzinfo=pendulum.timezone("Asia/Shanghai")),
schedule_interval='0 8 * * 1-5',
tags=['yuankq1', 'mingle', 'varies', 'api'], # search dags by tags in
web
)
start = PythonOperator(
task_id='start_wait',
python_callable=start_wait,
dag=dag
)
sensor_09 = TimeSensor(
task_id='wait_until_09_00',
target_time=time(9, 0)
)
sensor_13 = TimeSensor(
task_id='wait_until_13_00',
target_time=time(13, 0),
)
sensor_14 = TimeSensor(
task_id='wait_until_14_00',
target_time=time(14, 0)
)
sensor_22 = TimeSensor(
task_id='wait_until_22_00',
target_time=time(22, 0)
)
sensor_10_30 = TimeSensor(
task_id='wait_until_10_30',
target_time=time(10, 30)
)
sensor_15_30 = TimeSensor(
task_id='wait_until_15_30',
target_time=time(15, 30)
)
sensor_17 = TimeSensor(
task_id='wait_until_17',
target_time=time(17, 0)
)
sensor_17_30 = TimeSensor(
task_id='wait_until_17_30',
target_time=time(17, 30)
)
save_hub_report = SimpleHttpOperator(
task_id='save_hub_report',
method='GET',
http_conn_id=WIN_SERVER_1,
endpoint='general/save_hub_report',
response_check=lambda response : response.json().get('Success'),
response_filter=lambda response : response.json(),
execution_timeout=timedelta(minutes=30),
dag=dag,
)
matrix_collection = SimpleHttpOperator(
task_id='matrix_collection',
method='GET',
http_conn_id=WIN_SERVER_1,
endpoint='general/matrix_collection',
response_check=lambda response : response.json().get('Success'),
response_filter=lambda response : response.json(),
dag=dag,
execution_timeout=timedelta(minutes=10),
)
psd_report = SimpleHttpOperator(
task_id='psd_report',
method='GET',
http_conn_id=WIN_SERVER_1,
endpoint='general/psd_report',
response_check=lambda response : response.json().get('Success'),
response_filter=lambda response : response.json(),
execution_timeout=timedelta(hours=1),
dag=dag,
)
def flow_a(order_type: str = None, execute_time: str = None) :
flow_a = SimpleHttpOperator(
task_id=f'flow_a_{order_type}_{execute_time}',
method='GET',
http_conn_id=WIN_SERVER_1,
endpoint='sap/flow_a',
data={'order_type' : order_type},
response_check=lambda response : response.json().get('Success'),
response_filter=lambda response : response.json(),
execution_timeout=timedelta(hours=3),
depends_on_past=True,
dag=dag,
)
return flow_a
flow_a_ww2dc_10_30 = flow_a(order_type='ww2dc', execute_time='10_30')
flow_a_ww2dc_17_30 = flow_a(order_type='ww2dc', execute_time='17_30')
flow_a_other_type_10_30 = flow_a(order_type='other_type',
execute_time='10_30')
flow_a_other_type_15_30 = flow_a(order_type='other_type',
execute_time='15_30')
flow_a_other_type_17_30 = flow_a(order_type='other_type',
execute_time='17_30')
flow_b = SimpleHttpOperator(
task_id='flow_b',
method='GET',
http_conn_id=WIN_SERVER_1,
endpoint='sap/flow_b',
response_check=lambda response : response.json().get('Success'),
response_filter=lambda response : response.json(),
execution_timeout=timedelta(hours=5),
dag=dag,
)
flow_d = SimpleHttpOperator(
task_id='flow_d',
method='GET',
http_conn_id=WIN_SERVER_1,
endpoint='sap/flow_d',
response_check=lambda response : response.json().get('Success'),
response_filter=lambda response : response.json(),
execution_timeout=timedelta(hours=3),
dag=dag,
)
po_change = SimpleHttpOperator(
task_id='po_change',
method='GET',
http_conn_id=WIN_SERVER_1,
endpoint='sap/po_change',
response_check=lambda response : response.json().get('Success'),
response_filter=lambda response : response.json(),
execution_timeout=timedelta(minutes=15),
dag=dag,
)
def flow_a_monitor_func(task_id) :
return SimpleHttpOperator(
task_id=f'{task_id}',
method='GET',
http_conn_id=WIN_SERVER_1,
endpoint='sap/flow_a/monitor_repeat',
headers=SERVER_1_HEADERS,
response_check=lambda response :
response.json().get('Success'),
response_filter=lambda response : response.json(),
execution_timeout=timedelta(minutes=5),
dag=dag,
)
flow_a_monitor = flow_a_monitor_func('flow_a_monitor')
flow_a_monitor2 = flow_a_monitor_func('flow_a_monitor2')
start >> matrix_collection
start >> flow_a_monitor
start >> sensor_10_30 >> sensor_13 >> sensor_14 >> sensor_15_30 >> sensor_17
>> sensor_17_30
sensor_10_30 >> flow_a_ww2dc_10_30 >> flow_a_other_type_10_30
sensor_13 >> psd_report
sensor_14 >> save_hub_report
sensor_15_30 >> flow_a_other_type_15_30
sensor_17 >> po_change
sensor_17 >> flow_a_monitor2
sensor_17_30 >> flow_a_ww2dc_17_30 >> flow_a_other_type_17_30
sensor_17_30 >> flow_d >> flow_b
`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]