zionrubin commented on a change in pull request #28:
URL: https://github.com/apache/incubator-liminal/pull/28#discussion_r596859273
##########
File path: liminal/runners/airflow/dag/liminal_dags.py
##########
@@ -15,105 +15,135 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
+import logging
+import os
+import traceback
from datetime import datetime, timedelta
-import yaml
from airflow import DAG
-from liminal.core import environment
+from liminal.core import environment as env
+from liminal.core.config.config import ConfigUtil
from liminal.core.util import class_util
-from liminal.core.util import files_util
from liminal.runners.airflow.model.task import Task
-from liminal.runners.airflow.tasks.defaults.job_end import JobEndTask
-from liminal.runners.airflow.tasks.defaults.job_start import JobStartTask
-import logging
__DEPENDS_ON_PAST = 'depends_on_past'
+# noinspection PyBroadException
def register_dags(configs_path):
"""
Registers pipelines in liminal yml files found in given path (recursively)
as airflow DAGs.
"""
- logging.info(f'Registering DAG from path: {configs_path}')
- config_files = files_util.find_config_files(configs_path)
+ logging.info(f'Registering DAGs from path: {configs_path}')
+ config_util = ConfigUtil(configs_path)
+ # TODO - change is_render_variable to False when runtime resolving is
available
+ configs = config_util.safe_load(is_render_variables=True)
dags = []
- logging.info(f'found {len(config_files)} in path: {configs_path}')
- for config_file in config_files:
- logging.info(f'Registering DAG for file: {config_file}')
-
- with open(config_file) as stream:
- config = yaml.safe_load(stream)
-
- for pipeline in config['pipelines']:
- pipeline_name = pipeline['pipeline']
+ logging.info(f'found {len(configs)} liminal configs in path:
{configs_path}')
+ for config in configs:
+ name = config['name'] if 'name' in config else None
+ try:
+ if not name:
+ raise ValueError('liminal.yml missing field `name`')
- default_args = {k: v for k, v in pipeline.items()}
+ logging.info(f"Registering DAGs for {name}")
- override_args = {
- 'start_date': datetime.combine(pipeline['start_date'],
datetime.min.time()),
- __DEPENDS_ON_PAST: default_args[
- __DEPENDS_ON_PAST] if __DEPENDS_ON_PAST in
default_args else False,
- }
+ owner = config.get('owner')
- default_args.update(override_args)
+ trigger_rule = 'all_success'
+ if 'always_run' in config and config['always_run']:
+ trigger_rule = 'all_done'
- dag = DAG(
- dag_id=pipeline_name,
- default_args=default_args,
-
dagrun_timeout=timedelta(minutes=pipeline['timeout_minutes']),
- catchup=False
- )
-
- job_start_task = JobStartTask(dag, config, pipeline, {}, None,
'all_success')
- parent = job_start_task.apply_task_to_dag()
+ for pipeline in config['pipelines']:
+ default_args = __default_args(pipeline)
+ dag = __initialize_dag(default_args, pipeline, owner)
- trigger_rule = 'all_success'
- if 'always_run' in config and config['always_run']:
- trigger_rule = 'all_done'
+ parent = None
for task in pipeline['tasks']:
task_type = task['type']
task_instance = get_task_class(task_type)(
- dag, config, pipeline, task, parent if parent else
None, trigger_rule
+ task_id=task['task'],
+ dag=dag,
+ parent=parent,
+ trigger_rule=trigger_rule,
+ liminal_config=config,
+ pipeline_config=pipeline,
+ task_config=task
)
parent = task_instance.apply_task_to_dag()
- job_end_task = JobEndTask(dag, config, pipeline, {}, parent,
'all_done')
- job_end_task.apply_task_to_dag()
-
logging.info(f'registered DAG {dag.dag_id}: {dag.tasks}')
- globals()[pipeline_name] = dag
+ globals()[pipeline['pipeline']] = dag
dags.append(dag)
+ except Exception:
+ logging.error(f'Failed to register DAGs for {name}')
+ traceback.print_exc()
+
return dags
+def __initialize_dag(default_args, pipeline, owner):
Review comment:
not sure what do u mean.
note that the code here working on the dags and not on the liminal config
files
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]