zionrubin commented on a change in pull request #28:
URL: https://github.com/apache/incubator-liminal/pull/28#discussion_r596859273



##########
File path: liminal/runners/airflow/dag/liminal_dags.py
##########
@@ -15,105 +15,135 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
+import logging
+import os
+import traceback
 from datetime import datetime, timedelta
 
-import yaml
 from airflow import DAG
 
-from liminal.core import environment
+from liminal.core import environment as env
+from liminal.core.config.config import ConfigUtil
 from liminal.core.util import class_util
-from liminal.core.util import files_util
 from liminal.runners.airflow.model.task import Task
-from liminal.runners.airflow.tasks.defaults.job_end import JobEndTask
-from liminal.runners.airflow.tasks.defaults.job_start import JobStartTask
-import logging
 
 __DEPENDS_ON_PAST = 'depends_on_past'
 
 
+# noinspection PyBroadException
 def register_dags(configs_path):
     """
     Registers pipelines in liminal yml files found in given path (recursively) 
as airflow DAGs.
     """
-    logging.info(f'Registering DAG from path: {configs_path}')
-    config_files = files_util.find_config_files(configs_path)
+    logging.info(f'Registering DAGs from path: {configs_path}')
+    config_util = ConfigUtil(configs_path)
+    # TODO - change is_render_variable to False when runtime resolving is 
available
+    configs = config_util.safe_load(is_render_variables=True)
 
     dags = []
-    logging.info(f'found {len(config_files)} in path: {configs_path}')
-    for config_file in config_files:
-        logging.info(f'Registering DAG for file: {config_file}')
-
-        with open(config_file) as stream:
-            config = yaml.safe_load(stream)
-
-            for pipeline in config['pipelines']:
-                pipeline_name = pipeline['pipeline']
+    logging.info(f'found {len(configs)} liminal configs in path: 
{configs_path}')
+    for config in configs:
+        name = config['name'] if 'name' in config else None
+        try:
+            if not name:
+                raise ValueError('liminal.yml missing field `name`')
 
-                default_args = {k: v for k, v in pipeline.items()}
+            logging.info(f"Registering DAGs for {name}")
 
-                override_args = {
-                    'start_date': datetime.combine(pipeline['start_date'], 
datetime.min.time()),
-                    __DEPENDS_ON_PAST: default_args[
-                        __DEPENDS_ON_PAST] if __DEPENDS_ON_PAST in 
default_args else False,
-                }
+            owner = config.get('owner')
 
-                default_args.update(override_args)
+            trigger_rule = 'all_success'
+            if 'always_run' in config and config['always_run']:
+                trigger_rule = 'all_done'
 
-                dag = DAG(
-                    dag_id=pipeline_name,
-                    default_args=default_args,
-                    
dagrun_timeout=timedelta(minutes=pipeline['timeout_minutes']),
-                    catchup=False
-                )
-
-                job_start_task = JobStartTask(dag, config, pipeline, {}, None, 
'all_success')
-                parent = job_start_task.apply_task_to_dag()
+            for pipeline in config['pipelines']:
+                default_args = __default_args(pipeline)
+                dag = __initialize_dag(default_args, pipeline, owner)
 
-                trigger_rule = 'all_success'
-                if 'always_run' in config and config['always_run']:
-                    trigger_rule = 'all_done'
+                parent = None
 
                 for task in pipeline['tasks']:
                     task_type = task['type']
                     task_instance = get_task_class(task_type)(
-                        dag, config, pipeline, task, parent if parent else 
None, trigger_rule
+                        task_id=task['task'],
+                        dag=dag,
+                        parent=parent,
+                        trigger_rule=trigger_rule,
+                        liminal_config=config,
+                        pipeline_config=pipeline,
+                        task_config=task
                     )
 
                     parent = task_instance.apply_task_to_dag()
 
-                job_end_task = JobEndTask(dag, config, pipeline, {}, parent, 
'all_done')
-                job_end_task.apply_task_to_dag()
-
                 logging.info(f'registered DAG {dag.dag_id}: {dag.tasks}')
 
-                globals()[pipeline_name] = dag
+                globals()[pipeline['pipeline']] = dag
                 dags.append(dag)
 
+        except Exception:
+            logging.error(f'Failed to register DAGs for {name}')
+            traceback.print_exc()
+
     return dags
 
 
+def __initialize_dag(default_args, pipeline, owner):

Review comment:
       not sure what do u mean. 
   note that the code here working to the dags and not the configuration of the 
liminal. 
   
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to