assapin commented on a change in pull request #28:
URL: https://github.com/apache/incubator-liminal/pull/28#discussion_r587494158
##########
File path: liminal/build/python.py
##########
@@ -76,37 +44,56 @@ def _write_additional_files(self, temp_dir):
with open(requirements_file_path, 'w'):
pass
+ version_file_path = os.path.join(temp_dir, 'VERSION')
+ if not os.path.exists(version_file_path):
+ with open(version_file_path, 'w') as file:
+ repo = git.Repo(search_parent_directories=True)
+ file.write(repo.head.object.hexsha)
+
super()._write_additional_files(temp_dir)
+ @staticmethod
+ def _additional_files_from_paths():
+ python_dir = os.path.join(os.path.join(os.path.dirname(__file__),
'image'), 'python')
+ return [
+ os.path.join(python_dir, 'container-setup.sh'),
+ os.path.join(python_dir, 'container-teardown.sh'),
+ ]
+
def _additional_files_from_filename_content_pairs(self):
with open(self._dockerfile_path()) as original:
data = original.read()
data = self.__mount_pip_conf(data)
- data = self.__add_python_base_version(data)
return [('Dockerfile', data)]
def __mount_pip_conf(self, data):
new_data = data
-
if self.__PIP_CONF in self.config:
- new_data = '# syntax = docker/dockerfile:1.0-experimental\n' + data
- new_data = new_data.replace('{{mount}}',
-
'--mount=type=secret,id=pip_config,dst=/etc/pip.conf \\\n')
+ if os.getenv(self.USE_LEGACY_DOCKER_VERSION):
+ if os.getenv("PIP_ENV") is not None:
+ new_data = new_data.replace('{{copy_pip_conf}}', 'COPY
$PIP_ENV /etc/')
+ else:
+ new_data = new_data.replace('{{copy_pip_conf}}',
Review comment:
this is in order to use internal pypi repos inside docker build?
##########
File path: liminal/build/python.py
##########
@@ -76,37 +44,56 @@ def _write_additional_files(self, temp_dir):
with open(requirements_file_path, 'w'):
pass
+ version_file_path = os.path.join(temp_dir, 'VERSION')
+ if not os.path.exists(version_file_path):
+ with open(version_file_path, 'w') as file:
+ repo = git.Repo(search_parent_directories=True)
+ file.write(repo.head.object.hexsha)
+
super()._write_additional_files(temp_dir)
+ @staticmethod
+ def _additional_files_from_paths():
+ python_dir = os.path.join(os.path.join(os.path.dirname(__file__),
'image'), 'python')
+ return [
+ os.path.join(python_dir, 'container-setup.sh'),
+ os.path.join(python_dir, 'container-teardown.sh'),
+ ]
+
def _additional_files_from_filename_content_pairs(self):
with open(self._dockerfile_path()) as original:
data = original.read()
data = self.__mount_pip_conf(data)
- data = self.__add_python_base_version(data)
return [('Dockerfile', data)]
def __mount_pip_conf(self, data):
new_data = data
-
if self.__PIP_CONF in self.config:
- new_data = '# syntax = docker/dockerfile:1.0-experimental\n' + data
- new_data = new_data.replace('{{mount}}',
-
'--mount=type=secret,id=pip_config,dst=/etc/pip.conf \\\n')
+ if os.getenv(self.USE_LEGACY_DOCKER_VERSION):
+ if os.getenv("PIP_ENV") is not None:
+ new_data = new_data.replace('{{copy_pip_conf}}', 'COPY
$PIP_ENV /etc/')
Review comment:
why not jinja templating?
##########
File path: liminal/core/config/config.py
##########
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import traceback
+
+from liminal.core.config.defaults import hyperliminal, default_configs
+from liminal.core.util import dict_util
+from liminal.core.util import files_util
+
+
+class ConfigUtil:
+ """
+ Load and enrich config files under configs_path.
+ """
+ __HYPERLIMINAL = "hyperliminal"
+ __PIPELINES = "pipelines"
+ __SUPER = "super"
+ __TYPE = "type"
+ __SUB = "sub"
+ __SERVICES = "services"
+ __TASKS = "tasks"
+ __PIPELINE_DEFAULTS = "pipeline_defaults"
+
+ def __init__(self, configs_path):
+ self.configs_path = configs_path
+ self.config_files = files_util.load(configs_path)
+ self.hyperliminal = hyperliminal.HYPERLIMINAL
+ self.loaded_subliminals = []
+
+ def safe_load(self, is_render_variables):
+ """
+ :returns list of config files after enrich with defaults and supers
+ """
+ if self.loaded_subliminals:
+ return self.loaded_subliminals
+
+ configs = self.config_files.values()
+ enriched_configs = []
+
+ for subliminal in [config for config in configs if
self.__is_subliminal(config)]:
+ name = subliminal.get('name')
+ logging.info(f'Loading yml {name}')
+ # noinspection PyBroadException
+ try:
+ superliminal = self.__get_superliminal(subliminal)
+ enriched_config = self.__merge_configs(subliminal,
superliminal,
+ is_render_variables)
+ enriched_configs.append(enriched_config)
+ except Exception:
+ logging.error(f'Failed to load yml {name}')
+ traceback.print_exc()
+
+ self.loaded_subliminals = enriched_configs
Review comment:
is there a way to see the result of the "merged" liminal files as
output?
similar to kustomize build ? useful to test/verify that the compilation was
correct?
Perhaps it makes sense also to deploy the merged yamls from file system
(i.g. they are part of the deploy cli)?
##########
File path: liminal/runners/airflow/dag/liminal_dags.py
##########
@@ -15,105 +15,135 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
+import logging
+import os
+import traceback
from datetime import datetime, timedelta
-import yaml
from airflow import DAG
-from liminal.core import environment
+from liminal.core import environment as env
+from liminal.core.config.config import ConfigUtil
from liminal.core.util import class_util
-from liminal.core.util import files_util
from liminal.runners.airflow.model.task import Task
-from liminal.runners.airflow.tasks.defaults.job_end import JobEndTask
-from liminal.runners.airflow.tasks.defaults.job_start import JobStartTask
-import logging
__DEPENDS_ON_PAST = 'depends_on_past'
+# noinspection PyBroadException
def register_dags(configs_path):
"""
Registers pipelines in liminal yml files found in given path (recursively)
as airflow DAGs.
"""
- logging.info(f'Registering DAG from path: {configs_path}')
- config_files = files_util.find_config_files(configs_path)
+ logging.info(f'Registering DAGs from path: {configs_path}')
+ config_util = ConfigUtil(configs_path)
+ # TODO - change is_render_variable to False when runtime resolving is
available
+ configs = config_util.safe_load(is_render_variables=True)
dags = []
- logging.info(f'found {len(config_files)} in path: {configs_path}')
- for config_file in config_files:
- logging.info(f'Registering DAG for file: {config_file}')
-
- with open(config_file) as stream:
- config = yaml.safe_load(stream)
-
- for pipeline in config['pipelines']:
- pipeline_name = pipeline['pipeline']
+ logging.info(f'found {len(configs)} liminal configs in path:
{configs_path}')
+ for config in configs:
+ name = config['name'] if 'name' in config else None
+ try:
+ if not name:
+ raise ValueError('liminal.yml missing field `name`')
- default_args = {k: v for k, v in pipeline.items()}
+ logging.info(f"Registering DAGs for {name}")
- override_args = {
- 'start_date': datetime.combine(pipeline['start_date'],
datetime.min.time()),
- __DEPENDS_ON_PAST: default_args[
- __DEPENDS_ON_PAST] if __DEPENDS_ON_PAST in
default_args else False,
- }
+ owner = config.get('owner')
- default_args.update(override_args)
+ trigger_rule = 'all_success'
+ if 'always_run' in config and config['always_run']:
+ trigger_rule = 'all_done'
- dag = DAG(
- dag_id=pipeline_name,
- default_args=default_args,
-
dagrun_timeout=timedelta(minutes=pipeline['timeout_minutes']),
- catchup=False
- )
-
- job_start_task = JobStartTask(dag, config, pipeline, {}, None,
'all_success')
- parent = job_start_task.apply_task_to_dag()
+ for pipeline in config['pipelines']:
+ default_args = __default_args(pipeline)
+ dag = __initialize_dag(default_args, pipeline, owner)
- trigger_rule = 'all_success'
- if 'always_run' in config and config['always_run']:
- trigger_rule = 'all_done'
+ parent = None
for task in pipeline['tasks']:
task_type = task['type']
task_instance = get_task_class(task_type)(
- dag, config, pipeline, task, parent if parent else
None, trigger_rule
+ task_id=task['task'],
+ dag=dag,
+ parent=parent,
+ trigger_rule=trigger_rule,
+ liminal_config=config,
+ pipeline_config=pipeline,
+ task_config=task
)
parent = task_instance.apply_task_to_dag()
- job_end_task = JobEndTask(dag, config, pipeline, {}, parent,
'all_done')
- job_end_task.apply_task_to_dag()
-
logging.info(f'registered DAG {dag.dag_id}: {dag.tasks}')
- globals()[pipeline_name] = dag
+ globals()[pipeline['pipeline']] = dag
dags.append(dag)
+ except Exception:
+ logging.error(f'Failed to register DAGs for {name}')
+ traceback.print_exc()
+
return dags
+def __initialize_dag(default_args, pipeline, owner):
Review comment:
Will it be useful if these default behaviors will reside in the
super/hyper liminal definitions?
##########
File path: liminal/core/config/config.py
##########
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import traceback
+
+from liminal.core.config.defaults import hyperliminal, default_configs
+from liminal.core.util import dict_util
+from liminal.core.util import files_util
+
+
+class ConfigUtil:
+ """
+ Load and enrich config files under configs_path.
+ """
+ __HYPERLIMINAL = "hyperliminal"
+ __PIPELINES = "pipelines"
+ __SUPER = "super"
+ __TYPE = "type"
+ __SUB = "sub"
+ __SERVICES = "services"
+ __TASKS = "tasks"
+ __PIPELINE_DEFAULTS = "pipeline_defaults"
+
+ def __init__(self, configs_path):
+ self.configs_path = configs_path
+ self.config_files = files_util.load(configs_path)
+ self.hyperliminal = hyperliminal.HYPERLIMINAL
Review comment:
wouldn't it be better to name hyperliminal something with a more
"standard" association?
like archetype?
##########
File path: liminal/build/image/python/Dockerfile
##########
@@ -28,13 +29,37 @@ WORKDIR /app
# Order of operations is important here for docker's caching & incremental
build performance. !
# Be careful when changing this code.
!
+# Update apt
+RUN echo "Updating apt"
+RUN apt-get update --fix-missing
+
+# Install gcc
+RUN echo "Installing gcc"
+RUN apt-get install -y gcc
+
+# Install git
+RUN echo "Installing git"
+RUN apt-get install -y git
+ARG GIT_USER=no_user_supplied
+ARG GIT_PASSWORD=""
+ARG GIT_AUTH=$GIT_USER":"$GIT_PASSWORD
+
# Install any needed packages specified in requirements.txt
COPY ./requirements.txt /app/
+{{copy_pip_conf}}
# mount the secret in the correct location, then run pip install
RUN pip install --upgrade pip
RUN {{mount}} pip install -r requirements.txt
+# Install pipenv
+RUN echo "Installing pipenv.."
+RUN pip install pipenv
+
+# Install awscli
+RUN echo "Installing awscli.."
Review comment:
why do you need awscli inside the docker of all liminal tasks?
##########
File path: liminal/core/util/class_util.py
##########
@@ -61,10 +60,11 @@ def import_module(package, recursive=True):
package = importlib.import_module(package)
results = {}
for loader, name, is_pkg in pkgutil.walk_packages(package.__path__):
- full_name = package.__name__ + '.' + name
- results[full_name] = importlib.import_module(full_name)
- if recursive and is_pkg:
- results.update(import_module(full_name))
+ if not name == 'liminal_python_server':
Review comment:
feels like a workaround. why this if?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]