This is an automated email from the ASF dual-hosted git repository. jbonofre pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git
commit 77139de6083d355b8bd80c07a3961b6d9ba8408d Author: aviemzur <[email protected]> AuthorDate: Thu Mar 12 16:52:40 2020 +0200 Fix rainbow_dags python task --- rainbow/build/build_rainbows.py | 2 +- rainbow/{docker => build}/python/Dockerfile | 0 rainbow/build/python/__init__.py | 0 rainbow/build/python/container-setup.sh | 2 +- rainbow/build/python/container-teardown.sh | 2 +- rainbow/{docker => build}/python/python_image.py | 18 ++++++++----- rainbow/core/util/files_util.py | 2 ++ rainbow/docker/python/__init__.py | 17 ------------ rainbow/runners/airflow/dag/rainbow_dags.py | 31 +++++++++++++++------- rainbow/runners/airflow/tasks/python.py | 9 +++---- .../airflow/build/python/test_python_image.py | 2 +- tests/runners/airflow/rainbow/rainbow.yml | 6 ++--- tests/runners/airflow/tasks/test_python.py | 2 -- 13 files changed, 45 insertions(+), 48 deletions(-) diff --git a/rainbow/build/build_rainbows.py b/rainbow/build/build_rainbows.py index 1452bb8..2a9e6a3 100644 --- a/rainbow/build/build_rainbows.py +++ b/rainbow/build/build_rainbows.py @@ -21,7 +21,7 @@ import os import yaml from rainbow.core.util import files_util -from rainbow.docker.python.python_image import PythonImage +from rainbow.build.python.python_image import PythonImage def build_rainbows(path): diff --git a/rainbow/docker/python/Dockerfile b/rainbow/build/python/Dockerfile similarity index 100% rename from rainbow/docker/python/Dockerfile rename to rainbow/build/python/Dockerfile diff --git a/rainbow/build/python/__init__.py b/rainbow/build/python/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rainbow/build/python/container-setup.sh b/rainbow/build/python/container-setup.sh index 6e8d242..4e20fc2 100755 --- a/rainbow/build/python/container-setup.sh +++ b/rainbow/build/python/container-setup.sh @@ -1,4 +1,4 @@ -#!/bin/bash +#!/bin/sh echo """$RAINBOW_INPUT""" > rainbow_input.json diff --git a/rainbow/build/python/container-teardown.sh b/rainbow/build/python/container-teardown.sh index 1219407..ef213a8 100755 --- a/rainbow/build/python/container-teardown.sh +++ b/rainbow/build/python/container-teardown.sh @@ -1,4 +1,4 @@ -#!/bin/bash +#!/bin/sh USER_CONFIG_OUTPUT_FILE=$1 if [ "$USER_CONFIG_OUTPUT_FILE" != "" ]; then diff --git a/rainbow/docker/python/python_image.py b/rainbow/build/python/python_image.py similarity index 82% rename from rainbow/docker/python/python_image.py rename to rainbow/build/python/python_image.py index ae7bc23..f0fb3a0 100644 --- a/rainbow/docker/python/python_image.py +++ b/rainbow/build/python/python_image.py @@ -15,6 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + import os import shutil import tempfile @@ -24,7 +25,7 @@ import docker class PythonImage: - def build(self, base_path, relative_source_path, tag, extra_files=None): + def build(self, base_path, relative_source_path, tag): """ TODO: pydoc @@ -35,9 +36,6 @@ class PythonImage: :return: """ - if extra_files is None: - extra_files = [] - print(f'Building image {tag}') temp_dir = tempfile.mkdtemp() @@ -51,16 +49,24 @@ class PythonImage: with open(requirements_file_path, 'w'): pass - dockerfile_path = os.path.join(os.path.dirname(__file__), 'Dockerfile') + docker_files = [ + os.path.join(os.path.dirname(__file__), 'Dockerfile'), + os.path.join(os.path.dirname(__file__), 'container-setup.sh'), + os.path.join(os.path.dirname(__file__), 'container-teardown.sh') + ] - for file in extra_files + [dockerfile_path]: + for file in docker_files: self.__copy_file(file, temp_dir) docker_client = docker.from_env() + + # TODO: log docker output docker_client.images.build(path=temp_dir, tag=tag) docker_client.close() + print(temp_dir, os.listdir(temp_dir)) + shutil.rmtree(temp_dir) @staticmethod diff --git a/rainbow/core/util/files_util.py b/rainbow/core/util/files_util.py index 403fec9..b1d1daf 100644 --- a/rainbow/core/util/files_util.py +++ b/rainbow/core/util/files_util.py @@ -21,8 +21,10 @@ import os def find_config_files(path): files = [] + print(path) for r, d, f in os.walk(path): for file in f: + print(os.path.basename(file)) if os.path.basename(file) in ['rainbow.yml', 'rainbow.yaml']: files.append(os.path.join(r, file)) return files diff --git a/rainbow/docker/python/__init__.py b/rainbow/docker/python/__init__.py deleted file mode 100644 index 217e5db..0000000 --- a/rainbow/docker/python/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/rainbow/runners/airflow/dag/rainbow_dags.py b/rainbow/runners/airflow/dag/rainbow_dags.py index 8557455..92b6d64 100644 --- a/rainbow/runners/airflow/dag/rainbow_dags.py +++ b/rainbow/runners/airflow/dag/rainbow_dags.py @@ -20,6 +20,7 @@ from datetime import datetime import yaml from airflow import DAG +from airflow.models import Variable from rainbow.core.util import files_util from rainbow.runners.airflow.tasks.python import PythonTask @@ -35,7 +36,7 @@ def register_dags(configs_path): dags = [] for config_file in config_files: - print(f'Registering DAG for file: f{config_file}') + print(f'Registering DAG for file: {config_file}') with open(config_file) as stream: config = yaml.safe_load(stream) @@ -43,24 +44,35 @@ def register_dags(configs_path): for pipeline in config['pipelines']: parent = None + pipeline_name = pipeline['pipeline'] + default_args = { 'owner': config['owner'], - 'start_date': datetime.combine(pipeline['start_date'], datetime.min.time()) + 'start_date': datetime.combine(pipeline['start_date'], datetime.min.time()), + 'depends_on_past': False, } dag = DAG( - dag_id='test_dag', - default_args=default_args + dag_id=pipeline_name, + default_args=default_args, + catchup=False ) + trigger_rule = 'all_success' + if 'always_run' in config and config['always_run']: + trigger_rule = 'all_done' + for task in pipeline['tasks']: task_type = task['type'] task_instance = get_task_class(task_type)( - dag, pipeline['pipeline'], parent if parent else None, task, 'all_success' + dag, pipeline['pipeline'], parent if parent else None, task, trigger_rule ) + parent = task_instance.apply_task_to_dag() - print(f'{parent}{{{task_type}}}') + print(f'{pipeline_name}: {dag.tasks}') + + globals()[pipeline_name] = dag dags.append(dag) return dags @@ -75,7 +87,6 @@ def get_task_class(task_type): return task_classes[task_type] -if __name__ == '__main__': - # TODO: configurable yaml dir - path = 'tests/runners/airflow/dag/rainbow' - register_dags(path) +# TODO: configurable path +path = Variable.get('rainbows_dir') +register_dags(path) diff --git a/rainbow/runners/airflow/tasks/python.py b/rainbow/runners/airflow/tasks/python.py index b2769c8..ac46d0b 100644 --- a/rainbow/runners/airflow/tasks/python.py +++ b/rainbow/runners/airflow/tasks/python.py @@ -16,12 +16,10 @@ # specific language governing permissions and limitations # under the License. import json -import os from airflow.models import Variable from airflow.operators.dummy_operator import DummyOperator -from rainbow.docker.python import python_image from rainbow.runners.airflow.model import task from rainbow.runners.airflow.operators.kubernetes_pod_operator import \ ConfigurableKubernetesPodOperator, \ @@ -136,10 +134,11 @@ class PythonTask(task.Task): def __kubernetes_cmds_and_arguments(self): cmds = ['/bin/bash', '-c'] + output_path = self.config['output_path'] if 'output_path' in self.config else '' arguments = [ - f'''sh container-setup.sh && \ - {self.config['cmd']} && \ - sh container-teardown.sh {self.config['output_path']}''' + f"sh container-setup.sh && " + + f"{self.config['cmd']} && " + + f"sh container-teardown.sh {output_path}" ] return cmds, arguments diff --git a/tests/runners/airflow/build/python/test_python_image.py b/tests/runners/airflow/build/python/test_python_image.py index a8c02b6..368b05d 100644 --- a/tests/runners/airflow/build/python/test_python_image.py +++ b/tests/runners/airflow/build/python/test_python_image.py @@ -19,7 +19,7 @@ from unittest import TestCase import docker -from rainbow.docker.python.python_image import PythonImage +from rainbow.build.python.python_image import PythonImage class TestPythonImage(TestCase): diff --git a/tests/runners/airflow/rainbow/rainbow.yml b/tests/runners/airflow/rainbow/rainbow.yml index 1a834d7..3e3ec4b 100644 --- a/tests/runners/airflow/rainbow/rainbow.yml +++ b/tests/runners/airflow/rainbow/rainbow.yml @@ -35,8 +35,7 @@ pipelines: env2: "b" input_type: static input_path: "{\"configs\": [ { \"campaign_id\": 10 }, { \"campaign_id\": 20 } ]}" - output_path: 'baz' - cmd: 'foo bar' + cmd: 'python hello_world.py' - task: my_static_config_task2 type: python description: my 1st ds task @@ -47,8 +46,7 @@ pipelines: env2: "b" input_type: static input_path: "{\"configs\": [ { \"campaign_id\": 10 }, { \"campaign_id\": 20 } ]}" - output_path: 'baz' - cmd: 'foo bar' + cmd: 'python hello_world.py' services: - service: name: myserver1 diff --git a/tests/runners/airflow/tasks/test_python.py b/tests/runners/airflow/tasks/test_python.py index 8477c69..ffdcac3 100644 --- a/tests/runners/airflow/tasks/test_python.py +++ b/tests/runners/airflow/tasks/test_python.py @@ -19,8 +19,6 @@ import unittest from unittest import TestCase -import docker - from rainbow.runners.airflow.operators.kubernetes_pod_operator import \ ConfigurableKubernetesPodOperator from rainbow.runners.airflow.tasks import python
