This is an automated email from the ASF dual-hosted git repository. jbonofre pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git
commit 9292bb85b77084f31c48c1c982a5a7f593fefa66 Author: aviemzur <[email protected]> AuthorDate: Thu Mar 12 10:14:55 2020 +0200 Refactor PythonTask --- rainbow/runners/airflow/tasks/python.py | 122 +++++++++++++++++--------------- 1 file changed, 65 insertions(+), 57 deletions(-) diff --git a/rainbow/runners/airflow/tasks/python.py b/rainbow/runners/airflow/tasks/python.py index 8317854..eb00c0e 100644 --- a/rainbow/runners/airflow/tasks/python.py +++ b/rainbow/runners/airflow/tasks/python.py @@ -58,76 +58,84 @@ class PythonTask(task.Task): def apply_task_to_dag(self): - def create_pod_operator(task_id, task_split, image): - return ConfigurableKubernetesPodOperator( - task_id=task_id, - config_task_id=self.config_task_id, - task_split=task_split, - image=image, - cmds=self.cmds, - arguments=self.arguments, - **self.kubernetes_kwargs - ) - config_task = None if self.input_type in ['static', 'task']: - self.env_vars.update({'DATA_PIPELINE_INPUT': self.input_path}) - - config_task = ConfigureParallelExecutionOperator( - task_id=self.config_task_id, - image=self.image, - config_type=self.input_type, - config_path=self.input_path, - executors=self.executors, - **self.kubernetes_kwargs - ) + config_task = self.__config_task(config_task) if self.executors == 1: - pod_task = create_pod_operator( - task_id=f'{self.task_name}', - task_split=0, - image=f'''{self.image}''' - ) - - first_task = pod_task - - if config_task: - first_task = config_task - first_task.set_downstream(pod_task) - - if self.parent: - self.parent.set_downstream(first_task) - - return pod_task + return self.__apply_task_to_dag_single_executor(config_task) else: - if not config_task: - config_task = DummyOperator( - task_id=self.config_task_id, - trigger_rule=self.trigger_rule, - dag=self.dag - ) + return self.__apply_task_to_dag_multiple_executors(config_task) - end_task = DummyOperator( - task_id=self.task_name, + def __apply_task_to_dag_multiple_executors(self, config_task): + if not config_task: + config_task = DummyOperator( + task_id=self.config_task_id, + trigger_rule=self.trigger_rule, dag=self.dag ) - if self.parent: - self.parent.set_downstream(config_task) - - for i in range(self.executors): - split_task = create_pod_operator( - task_id=f'''{self.task_name}_{i}''', - task_split=i, - image=self.image - ) + end_task = DummyOperator( + task_id=self.task_name, + dag=self.dag + ) - config_task.set_downstream(split_task) + if self.parent: + self.parent.set_downstream(config_task) - split_task.set_downstream(end_task) + for i in range(self.executors): + split_task = self.__create_pod_operator( + task_id=f'''{self.task_name}_{i}''', + task_split=i, + image=self.image + ) - return end_task + config_task.set_downstream(split_task) + + split_task.set_downstream(end_task) + + return end_task + + def __create_pod_operator(self, task_id, task_split, image): + return ConfigurableKubernetesPodOperator( + task_id=task_id, + config_task_id=self.config_task_id, + task_split=task_split, + image=image, + cmds=self.cmds, + arguments=self.arguments, + **self.kubernetes_kwargs + ) + + def __apply_task_to_dag_single_executor(self, config_task): + pod_task = self.__create_pod_operator( + task_id=f'{self.task_name}', + task_split=0, + image=f'''{self.image}''' + ) + + first_task = pod_task + + if config_task: + first_task = config_task + first_task.set_downstream(pod_task) + if self.parent: + self.parent.set_downstream(first_task) + + return pod_task + + def __config_task(self, config_task): + self.env_vars.update({'DATA_PIPELINE_INPUT': self.input_path}) + config_task = ConfigureParallelExecutionOperator( + task_id=self.config_task_id, + image=self.image, + config_type=self.input_type, + config_path=self.input_path, + executors=self.executors, + **self.kubernetes_kwargs + ) + return config_task def __executors(self): executors = 1
