ayanray089 opened a new issue #10408:
URL: https://github.com/apache/airflow/issues/10408
Hi,
I am facing an issue while creating a dynamic workflow from a JSON file. The
target here is to store all parent and child relationship in a JSON config
file. I want to create the DAG workflow dynamically based on the config defined
in the config file.
It works for simple sequential scenario where the present task is the parent
of the next downstream task.
JSON file:
task_dependency = '{"dependency": [{"Task": "step_1","Parent":"step_0",
"command":"echo hello step 1"},{"Task": "step_2","Parent": "step_1",
"command":"echo hello step 2"},{"Task": "step_3", "Parent": "step_2",
"command":"echo hello step 3"},{"Task": "step_4", "Parent": "step_2",
"command":"echo hello step 4"}]}'
**This works:**
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from helper import operation
import json
import logging
default_args = {
'owner' : 'some.user',
'start_date' : airflow.utils.dates.days_ago(1),
# 'end_date' : datetime(2020, 08, 31), ## optional
'depends_on_past' : False,
'email' : ['[email protected]'],
'email_on_failure': False,
'email_on_success': False,
'retries' : 1,
'retry_delay' : timedelta(minutes = 2)
}
#Instantiate a DAG
dag = DAG (
'task_dependency_json',
default_args = default_args,
description = 'Simple Demo DAG TASK Dependency',
schedule_interval = '*/60 * * * *', ### This should be cron expression
)
#Tasks
dep = json.loads(operation.task_dependency_1)
for i in range(len(dep['dependency'])):
task = BashOperator(
task_id=dep['dependency'][i]["Task"],
bash_command=dep['dependency'][i]["command"],
dag=dag)
if i==0:
parent = task
else:
task.set_upstream(parent)
parent = task
print('Parent',parent)
print('Task',task)
**But it creates a DAG like:**
step 1>> step 2>> step 3>>step 4
**But I want a DAG like:**
step 1>>step 2>>step 3
>>step4
**To implement the above requirement , I wrote the below code:**
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from helper import operation
import json
import logging
default_args = {
'owner' : 'some.user',
'start_date' : airflow.utils.dates.days_ago(1),
# 'end_date' : datetime(2020, 08, 31), ## optional
'depends_on_past' : False,
'email' : ['[email protected]'],
'email_on_failure': False,
'email_on_success': False,
'retries' : 1,
'retry_delay' : timedelta(minutes = 1)
}
#Instantiate a DAG
dag = DAG (
'task_dependency_json',
default_args = default_args,
description = 'Simple Demo DAG TASK Dependency',
schedule_interval = '*/60 * * * *', ### This should be cron expression
)
#Tasks
dep = json.loads(operation.task_dependency)
for i in range(len(dep['dependency'])):
task = BashOperator(
task_id=dep['dependency'][i]["Task"],
bash_command='date',
dag=dag)
#print('Iteration',i)
logging.info("test")
if i>0:
parent = BashOperator(
task_id=dep['dependency'][i]["Parent"],
bash_command='date',
dag=dag)
parent.set_downstream(task)
Can you please let me know how can I achieve it?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]