This is an automated email from the ASF dual-hosted git repository. jbonofre pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git
commit 326a042ce87e979c4d97bb6f1e9c8e89ec6c09c0 Author: aviemzur <[email protected]> AuthorDate: Sun Mar 15 15:23:57 2020 +0200 Change pythontask config to input/output enhancement --- rainbow/build/python/container-setup.sh | 2 +- .../airflow/operators/kubernetes_pod_operator.py | 140 ------------------- .../kubernetes_pod_operator_with_input_output.py | 148 +++++++++++++++++++++ rainbow/runners/airflow/tasks/python.py | 57 ++++---- .../airflow/build/python/test_python_image.py | 14 +- tests/runners/airflow/build/test_build_rainbow.py | 2 +- .../airflow/rainbow/hello_world/hello_world.py | 9 ++ tests/runners/airflow/rainbow/rainbow.yml | 35 +++-- tests/runners/airflow/tasks/test_python.py | 6 +- 9 files changed, 225 insertions(+), 188 deletions(-) diff --git a/rainbow/build/python/container-setup.sh b/rainbow/build/python/container-setup.sh index 4e20fc2..883f1e1 100755 --- a/rainbow/build/python/container-setup.sh +++ b/rainbow/build/python/container-setup.sh @@ -1,6 +1,6 @@ #!/bin/sh -echo """$RAINBOW_INPUT""" > rainbow_input.json +echo """$RAINBOW_INPUT""" > /rainbow_input.json AIRFLOW_RETURN_FILE=/airflow/xcom/return.json diff --git a/rainbow/runners/airflow/operators/kubernetes_pod_operator.py b/rainbow/runners/airflow/operators/kubernetes_pod_operator.py deleted file mode 100644 index a7b0bdd..0000000 --- a/rainbow/runners/airflow/operators/kubernetes_pod_operator.py +++ /dev/null @@ -1,140 +0,0 @@ -from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator -import json -import traceback -from airflow.models import DAG, TaskInstance -from airflow.utils import timezone -from random import randint - - -def split_list(seq, num): - avg = len(seq) / float(num) - out = [] - last = 0.0 - - while last < len(seq): - out.append(seq[int(last):int(last + avg)]) - last += avg - - return out - - -class ConfigureParallelExecutionOperator(KubernetesPodOperator): - - def __init__(self, - config_type=None, - config_path=None, - executors=1, - *args, - **kwargs): - namespace = kwargs['namespace'] - image = kwargs['image'] - name = kwargs['name'] - - del kwargs['namespace'] - del kwargs['image'] - del kwargs['name'] - - super().__init__( - namespace=namespace, - image=image, - name=name, - *args, - **kwargs) - self.config_type = config_type - self.config_path = config_path - self.executors = executors - - def execute(self, context): - config_dict = {} - - self.log.info(f'config type: {self.config_type}') - - if self.config_type: - if self.config_type == 'file': - config_dict = {} # future feature: return config from file - elif self.config_type == 'sql': - config_dict = {} # future feature: return from sql config - elif self.config_type == 'task': - ti = context['task_instance'] - self.log.info(self.config_path) - config_dict = ti.xcom_pull(task_ids=self.config_path) - elif self.config_type == 'static': - config_dict = json.loads(self.config_path) - else: - raise ValueError(f'Unknown config type: {self.config_type}') - - run_id = context['dag_run'].run_id - - return_conf = {'config_type': self.config_type, - 'splits': {'0': {'run_id': run_id, 'configs': []}}} - - if config_dict: - self.log.info(f'configs dict: {config_dict}') - - configs = config_dict['configs'] - - self.log.info(f'configs: {configs}') - - config_splits = split_list(configs, self.executors) - - for i in range(self.executors): - return_conf['splits'][str(i)] = {'run_id': run_id, 'configs': config_splits[i]} - - return return_conf - - def run_pod(self, context): - return super().execute(context) - - -class ConfigurableKubernetesPodOperator(KubernetesPodOperator): - - def __init__(self, - config_task_id, - task_split, - *args, - **kwargs): - namespace = kwargs['namespace'] - image = kwargs['image'] - name = kwargs['name'] - - del kwargs['namespace'] - del kwargs['image'] - del kwargs['name'] - - super().__init__( - namespace=namespace, - image=image, - name=name, - *args, - **kwargs) - - self.config_task_id = config_task_id - self.task_split = task_split - - def execute(self, context): - if self.config_task_id: - ti = context['task_instance'] - - config = ti.xcom_pull(task_ids=self.config_task_id) - - if config: - split = {} - - if 'configs' in config: - split = configs - else: - split = config['splits'][str(self.task_split)] - - self.log.info(split) - - if split and split['configs']: - self.env_vars.update({'DATA_PIPELINE_CONFIG': json.dumps(split)}) - return super().execute(context) - else: - self.log.info( - f'Empty split config for split {self.task_split}. split config: {split}. config: {config}') - else: - raise ValueError('Config not found in task: ' + self.config_task_id) - else: - self.env_vars.update({'DATA_PIPELINE_CONFIG': '{}'}) - return super().execute(context) diff --git a/rainbow/runners/airflow/operators/kubernetes_pod_operator_with_input_output.py b/rainbow/runners/airflow/operators/kubernetes_pod_operator_with_input_output.py new file mode 100644 index 0000000..eb6fa83 --- /dev/null +++ b/rainbow/runners/airflow/operators/kubernetes_pod_operator_with_input_output.py @@ -0,0 +1,148 @@ +import json + +from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator + + +def split_list(seq, num): + avg = len(seq) / float(num) + out = [] + last = 0.0 + + while last < len(seq): + out.append(seq[int(last):int(last + avg)]) + last += avg + + return out + + +_IS_SPLIT_KEY = 'is_split' + + +class PrepareInputOperator(KubernetesPodOperator): + + def __init__(self, + input_type=None, + input_path=None, + split_input=False, + executors=1, + *args, + **kwargs): + namespace = kwargs['namespace'] + image = kwargs['image'] + name = kwargs['name'] + + del kwargs['namespace'] + del kwargs['image'] + del kwargs['name'] + + super().__init__( + namespace=namespace, + image=image, + name=name, + *args, + **kwargs) + + self.input_type = input_type + self.input_path = input_path + self.executors = executors + self.split_input = split_input + + def execute(self, context): + input_dict = {} + + self.log.info(f'config type: {self.input_type}') + + ti = context['task_instance'] + + if self.input_type: + if self.input_type == 'file': + input_dict = {} # future feature: return config from file + elif self.input_type == 'sql': + input_dict = {} # future feature: return from sql config + elif self.input_type == 'task': + self.log.info(self.input_path) + input_dict = ti.xcom_pull(task_ids=self.input_path) + elif self.input_type == 'static': + input_dict = json.loads(self.input_path) + else: + raise ValueError(f'Unknown config type: {self.input_type}') + + # TODO: pass run_id as well as env var + run_id = context['dag_run'].run_id + print(f'run_id = {run_id}') + + if input_dict: + self.log.info(f'Generated input: {input_dict}') + + if self.split_input: + input_splits = split_list(input_dict, self.executors) + + ti.xcom_push(key=_IS_SPLIT_KEY, value=True) + + return input_splits + else: + return input_dict + else: + return {} + + def run_pod(self, context): + return super().execute(context) + + +class KubernetesPodOperatorWithInputAndOutput(KubernetesPodOperator): + """ + TODO: pydoc + """ + + _RAINBOW_INPUT_ENV_VAR = 'RAINBOW_INPUT' + + def __init__(self, + task_split, + input_task_id=None, + *args, + **kwargs): + namespace = kwargs['namespace'] + image = kwargs['image'] + name = kwargs['name'] + + del kwargs['namespace'] + del kwargs['image'] + del kwargs['name'] + + super().__init__( + namespace=namespace, + image=image, + name=name, + *args, + **kwargs) + + self.input_task_id = input_task_id + self.task_split = task_split + + def execute(self, context): + task_input = {} + + if self.input_task_id: + ti = context['task_instance'] + + self.log.info(f'Fetching input for task {self.task_split}.') + + task_input = ti.xcom_pull(task_ids=self.input_task_id) + + is_split = ti.xcom_pull(task_ids=self.input_task_id, key=_IS_SPLIT_KEY) + self.log.info(f'is_split = {is_split}') + if is_split: + self.log.info(f'Fetching split {self.task_split} of input.') + + task_input = task_input[self.task_split] + + if task_input: + self.log.info(f'task input = {task_input}') + + self.env_vars.update({self._RAINBOW_INPUT_ENV_VAR: json.dumps(task_input)}) + else: + self.env_vars.update({self._RAINBOW_INPUT_ENV_VAR: '{}'}) + + self.log.info(f'Empty input for task {self.task_split}.') + + return super().execute(context) diff --git a/rainbow/runners/airflow/tasks/python.py b/rainbow/runners/airflow/tasks/python.py index ac46d0b..8bd11cf 100644 --- a/rainbow/runners/airflow/tasks/python.py +++ b/rainbow/runners/airflow/tasks/python.py @@ -21,9 +21,9 @@ from airflow.models import Variable from airflow.operators.dummy_operator import DummyOperator from rainbow.runners.airflow.model import task -from rainbow.runners.airflow.operators.kubernetes_pod_operator import \ - ConfigurableKubernetesPodOperator, \ - ConfigureParallelExecutionOperator +from rainbow.runners.airflow.operators.kubernetes_pod_operator_with_input_output import \ + KubernetesPodOperatorWithInputAndOutput, \ + PrepareInputOperator class PythonTask(task.Task): @@ -42,25 +42,24 @@ class PythonTask(task.Task): self.env_vars = self.__env_vars() self.kubernetes_kwargs = self.__kubernetes_kwargs() self.cmds, self.arguments = self.__kubernetes_cmds_and_arguments() - self.config_task_id = self.task_name + '_input' + self.input_task_id = self.task_name + '_input' self.executors = self.__executors() def apply_task_to_dag(self): - - config_task = None + input_task = None if self.input_type in ['static', 'task']: - config_task = self.__config_task(config_task) + input_task = self.__input_task() if self.executors == 1: - return self.__apply_task_to_dag_single_executor(config_task) + return self.__apply_task_to_dag_single_executor(input_task) else: - return self.__apply_task_to_dag_multiple_executors(config_task) + return self.__apply_task_to_dag_multiple_executors(input_task) - def __apply_task_to_dag_multiple_executors(self, config_task): - if not config_task: - config_task = DummyOperator( - task_id=self.config_task_id, + def __apply_task_to_dag_multiple_executors(self, input_task): + if not input_task: + input_task = DummyOperator( + task_id=self.input_task_id, trigger_rule=self.trigger_rule, dag=self.dag ) @@ -71,7 +70,7 @@ class PythonTask(task.Task): ) if self.parent: - self.parent.set_downstream(config_task) + self.parent.set_downstream(input_task) for i in range(self.executors): split_task = self.__create_pod_operator( @@ -80,51 +79,51 @@ class PythonTask(task.Task): image=self.image ) - config_task.set_downstream(split_task) + input_task.set_downstream(split_task) split_task.set_downstream(end_task) return end_task def __create_pod_operator(self, task_id, task_split, image): - return ConfigurableKubernetesPodOperator( + return KubernetesPodOperatorWithInputAndOutput( task_id=task_id, - config_task_id=self.config_task_id, - task_split=task_split, + input_task_id=self.input_task_id, + task_split=task_split if task_split else 0, image=image, cmds=self.cmds, arguments=self.arguments, **self.kubernetes_kwargs ) - def __apply_task_to_dag_single_executor(self, config_task): + def __apply_task_to_dag_single_executor(self, input_task): pod_task = self.__create_pod_operator( task_id=f'{self.task_name}', - task_split=0, + task_split=None, image=f'''{self.image}''' ) first_task = pod_task - if config_task: - first_task = config_task + if input_task: + first_task = input_task first_task.set_downstream(pod_task) if self.parent: self.parent.set_downstream(first_task) return pod_task - def __config_task(self, config_task): - self.env_vars.update({'DATA_PIPELINE_INPUT': self.input_path}) - config_task = ConfigureParallelExecutionOperator( - task_id=self.config_task_id, + def __input_task(self): + return PrepareInputOperator( + task_id=self.input_task_id, image=self.image, - config_type=self.input_type, - config_path=self.input_path, + input_type=self.input_type, + input_path=self.input_path, + split_input=True if 'split_input' in self.config and + self.config['split_input'] else False, executors=self.executors, **self.kubernetes_kwargs ) - return config_task def __executors(self): executors = 1 diff --git a/tests/runners/airflow/build/python/test_python_image.py b/tests/runners/airflow/build/python/test_python_image.py index 368b05d..d190fba 100644 --- a/tests/runners/airflow/build/python/test_python_image.py +++ b/tests/runners/airflow/build/python/test_python_image.py @@ -29,16 +29,24 @@ class TestPythonImage(TestCase): image_name = config['image'] - PythonImage().build('tests/runners/airflow/rainbow', 'hello_world', 'image_name') + PythonImage().build('tests/runners/airflow/rainbow', 'hello_world', image_name) # TODO: elaborate test of image, validate input/output docker_client = docker.from_env() docker_client.images.get(image_name) - container_log = docker_client.containers.run(image_name, "python hello_world.py") + + cmd = 'export RAINBOW_INPUT="{}" && ' + \ + 'sh container-setup.sh && ' + \ + 'python hello_world.py && ' + \ + 'sh container-teardown.sh' + cmds = ['/bin/bash', '-c', cmd] + + container_log = docker_client.containers.run(image_name, cmds) + docker_client.close() - self.assertEqual("b'Hello world!\\n'", str(container_log)) + self.assertEqual("b'Hello world!\\n\\n{}\\n'", str(container_log)) @staticmethod def __create_conf(task_id): diff --git a/tests/runners/airflow/build/test_build_rainbow.py b/tests/runners/airflow/build/test_build_rainbow.py index 533848f..0817d6c 100644 --- a/tests/runners/airflow/build/test_build_rainbow.py +++ b/tests/runners/airflow/build/test_build_rainbow.py @@ -9,7 +9,7 @@ class TestBuildRainbow(TestCase): def test_build_rainbow(self): docker_client = docker.client.from_env() - image_names = ['rainbow_image', 'rainbow_image2'] + image_names = ['my_static_input_task_image', 'my_task_output_input_task_image'] for image_name in image_names: if len(docker_client.images.list(image_name)) > 0: diff --git a/tests/runners/airflow/rainbow/hello_world/hello_world.py b/tests/runners/airflow/rainbow/hello_world/hello_world.py index 9b87c05..3eae465 100644 --- a/tests/runners/airflow/rainbow/hello_world/hello_world.py +++ b/tests/runners/airflow/rainbow/hello_world/hello_world.py @@ -15,4 +15,13 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import json + print('Hello world!') +print() + +with open('/rainbow_input.json') as file: + print(json.loads(file.readline())) + +with open('/output.json', 'w') as file: + file.write(json.dumps({'a': 1, 'b': 2})) diff --git a/tests/runners/airflow/rainbow/rainbow.yml b/tests/runners/airflow/rainbow/rainbow.yml index 3e3ec4b..2000621 100644 --- a/tests/runners/airflow/rainbow/rainbow.yml +++ b/tests/runners/airflow/rainbow/rainbow.yml @@ -25,28 +25,41 @@ pipelines: schedule: 0 * 1 * * metrics-namespace: TestNamespace tasks: - - task: my_static_config_task + - task: my_static_input_task type: python - description: my 1st ds task - image: rainbow_image + description: static input task + image: my_static_input_task_image source: hello_world env_vars: env1: "a" env2: "b" input_type: static - input_path: "{\"configs\": [ { \"campaign_id\": 10 }, { \"campaign_id\": 20 } ]}" - cmd: 'python hello_world.py' - - task: my_static_config_task2 + input_path: '[ { "foo": "bar" }, { "foo": "baz" } ]' + output_path: /output.json + cmd: python hello_world.py +# - task: my_parallelized_static_input_task +# type: python +# description: parallelized static input task +# image: my_static_input_task_image +# env_vars: +# env1: "a" +# env2: "b" +# input_type: static +# input_path: '[ { "foo": "bar" }, { "foo": "baz" } ]' +# split_input: True +# executors: 2 +# cmd: python hello_world.py + - task: my_task_output_input_task type: python - description: my 1st ds task - image: rainbow_image2 + description: parallelized static input task + image: my_task_output_input_task_image source: hello_world env_vars: env1: "a" env2: "b" - input_type: static - input_path: "{\"configs\": [ { \"campaign_id\": 10 }, { \"campaign_id\": 20 } ]}" - cmd: 'python hello_world.py' + input_type: task + input_path: my_static_input_task + cmd: python hello_world.py services: - service: name: myserver1 diff --git a/tests/runners/airflow/tasks/test_python.py b/tests/runners/airflow/tasks/test_python.py index ffdcac3..260f71d 100644 --- a/tests/runners/airflow/tasks/test_python.py +++ b/tests/runners/airflow/tasks/test_python.py @@ -19,8 +19,8 @@ import unittest from unittest import TestCase -from rainbow.runners.airflow.operators.kubernetes_pod_operator import \ - ConfigurableKubernetesPodOperator +from rainbow.runners.airflow.operators.kubernetes_pod_operator_with_input_output import \ + KubernetesPodOperatorWithInputAndOutput from rainbow.runners.airflow.tasks import python from tests.util import dag_test_utils @@ -41,7 +41,7 @@ class TestPythonTask(TestCase): self.assertEqual(len(dag.tasks), 1) dag_task0 = dag.tasks[0] - self.assertIsInstance(dag_task0, ConfigurableKubernetesPodOperator) + self.assertIsInstance(dag_task0, KubernetesPodOperatorWithInputAndOutput) self.assertEqual(dag_task0.task_id, task_id) @staticmethod
