ayanray089 opened a new issue #10408:
URL: https://github.com/apache/airflow/issues/10408


   Hi,
   
   I am facing an issue while creating a dynamic workflow from a JSON file. The 
target here is to store all parent and child relationship in a JSON config 
file. I want to create the DAG workflow dynamically based on the config defined 
in the config file.
   It works for simple sequential scenario where the present task is the parent 
of the next downstream task.
   
   JSON file:
   task_dependency = '{"dependency": [{"Task": "step_1","Parent":"step_0", 
"command":"echo hello step 1"},{"Task": "step_2","Parent": "step_1", 
"command":"echo hello step 2"},{"Task": "step_3", "Parent": "step_2", 
"command":"echo hello step 3"},{"Task": "step_4", "Parent": "step_2", 
"command":"echo hello step 4"}]}'
   
   **This works:**
   
   from datetime import timedelta
   import airflow
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from airflow.operators.python_operator import PythonOperator
   from airflow.operators.dummy_operator import DummyOperator
   from helper import operation
   import json
   import logging
   
   default_args = {
       'owner' : 'some.user',
       'start_date' : airflow.utils.dates.days_ago(1),
       # 'end_date' : datetime(2020, 08, 31), ## optional
       'depends_on_past' : False,
       'email' : ['[email protected]'],
       'email_on_failure': False,
       'email_on_success': False,
       'retries' : 1,
       'retry_delay' : timedelta(minutes = 2)
   }
   #Instantiate a DAG
   dag = DAG (
       'task_dependency_json',
       default_args = default_args,
       description = 'Simple Demo DAG TASK Dependency',
       schedule_interval = '*/60 * * * *', ### This should be cron expression
   )
   #Tasks
   dep = json.loads(operation.task_dependency_1)
   
   
   
   for i in range(len(dep['dependency'])):
   
   
       task = BashOperator(
         task_id=dep['dependency'][i]["Task"],
         bash_command=dep['dependency'][i]["command"],
         dag=dag)
       
   
       if i==0:
           parent = task
   
       else:
           
           task.set_upstream(parent)
           parent = task
   
           
       print('Parent',parent)
       print('Task',task)
       
   **But it creates a DAG like:**
   
   step 1>> step 2>> step 3>>step 4
   
   **But I want a DAG like:**
   
   step 1>>step 2>>step 3
                            >>step4
   
   **To implement the above requirement , I wrote the below code:**
   
   from datetime import timedelta
   import airflow
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from airflow.operators.python_operator import PythonOperator
   from helper import operation
   import json
   import logging
   default_args = {
       'owner' : 'some.user',
       'start_date' : airflow.utils.dates.days_ago(1),
       # 'end_date' : datetime(2020, 08, 31), ## optional
       'depends_on_past' : False,
       'email' : ['[email protected]'],
       'email_on_failure': False,
       'email_on_success': False,
       'retries' : 1,
       'retry_delay' : timedelta(minutes = 1)
   }
   #Instantiate a DAG
   dag = DAG (
       'task_dependency_json',
       default_args = default_args,
       description = 'Simple Demo DAG TASK Dependency',
       schedule_interval = '*/60 * * * *', ### This should be cron expression
   )
   #Tasks
   dep = json.loads(operation.task_dependency)
   for i in range(len(dep['dependency'])):
       task = BashOperator(
         task_id=dep['dependency'][i]["Task"],
         bash_command='date',
         dag=dag)
       #print('Iteration',i)
       logging.info("test")
       if i>0:
           parent = BashOperator(
                   task_id=dep['dependency'][i]["Parent"],
                   bash_command='date',
                   dag=dag)
           parent.set_downstream(task)
   
   
   Can you please let me know how can I achieve it?
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to