neilharia7 commented on issue #15607:
URL: https://github.com/apache/airflow/issues/15607#issuecomment-889282522
@kaxil
```
import datetime as dt
import json
import boto3
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator,
BranchPythonOperator
from tasks_functions.custom_functions import customized_function
from zeus.config import *
from zeus.utils import *
default_args = {
'owner': 'neilharia7',
'start_date': dt.datetime(2020, 6, 26),
'retries': 1,
'retry_delay': 30
}
# fetch Dags from s3 bucket
s3_client = boto3.client('s3', region_name='ap-south-1')
flag, dag_information = False, dict()
try:
dag_information = json.loads(
s3_client.get_object(Bucket=Config.AWS.S3.bucket_name,
Key=Config.AWS.S3.key_path)['Body'].read())
if dag_information:
flag = True
except Exception as e:
print(f'No dags registered {e}')
def number_of_keys(obj):
"""
# check if the number of keys are greater than 1
:param obj:
:return:
"""
if isinstance(obj, dict):
return True if len([k for k, v in obj.items()]) > 1 else False
if isinstance(obj, list):
return True if len(obj) > 1 else False
if flag:
for dag_data in dag_information.get('dag_structure', list()):
dag_registry = {
'owner': default_args.get('owner'),
'start_date': dt.datetime(2020, 6, 26),
'retries': dag_data.get('retries',
default_args.get('retries')),
'retry_delay':
dt.timedelta(seconds=dag_data.get('retry_delay',
default_args.get('retry_delay'))),
'max_retry_delay':
dt.timedelta(seconds=dag_data.get('max_retry_delay', 3600)),
'retry_exponential_backoff':
dag_data.get('exponential_retry', True)
}
with DAG(
dag_id=dag_data.get('name',
dag_data.get('dag_id')),
default_args=dag_registry,
schedule_interval=dag_data.get('scheduler',
None)
) as dag:
# reverse mapping
data_list = dag_data.get('data', list())[::-1]
task_register = [dynamic_task_composer(task_data, dag)
for task_data in data_list]
reverse_dict = {"data": data_list}
task_len = len(task_register)
# dynamic mapping
for child_idx, child_info in
enumerate(reverse_dict['data']):
if child_info.get('parent_task'): # check if
there are any parents of this task
for parent_idx, parent_info in
enumerate(dag_data.get('data')):
if parent_info.get('task_name')
in child_info.get('parent_task'):
task_register[child_idx] << task_register[task_len - parent_idx - 1]
# dynamic dag registration
globals()[dag_data.get('dag_id')] = dag
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]