aviemzur commented on a change in pull request #5:
URL: https://github.com/apache/incubator-liminal/pull/5#discussion_r524043100
##########
File path: liminal/runners/airflow/tasks/python.py
##########
@@ -15,131 +15,118 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-import json
+from typing import Optional
-from airflow.models import Variable
+from airflow.contrib.kubernetes.volume import Volume
+from airflow.contrib.kubernetes.volume_mount import VolumeMount
+from airflow.contrib.operators.kubernetes_pod_operator import
KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator
+from liminal.kubernetes import volume_util
from liminal.runners.airflow.config.standalone_variable_backend import
get_variable
from liminal.runners.airflow.model import task
-from
liminal.runners.airflow.operators.kubernetes_pod_operator_with_input_output
import \
- KubernetesPodOperatorWithInputAndOutput, \
- PrepareInputOperator
class PythonTask(task.Task):
"""
Python task.
"""
- def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
- super().__init__(dag, pipeline_name, parent, config, trigger_rule)
-
- self.input_type = self.config['input_type']
- self.input_path = self.config['input_path']
- self.task_name = self.config['task']
- self.image = self.config['image']
+ def __init__(self, dag, liminal_config, pipeline_config, task_config,
parent, trigger_rule):
+ super().__init__(dag, liminal_config, pipeline_config, task_config,
parent, trigger_rule)
+ self.task_name = self.task_config['task']
+ self.image = self.task_config['image']
+ self.volumes_config = self._volumes_config()
+ self.mounts = self.task_config.get('mounts', [])
+ self._create_local_volumes()
self.resources = self.__kubernetes_resources()
self.env_vars = self.__env_vars()
self.kubernetes_kwargs = self.__kubernetes_kwargs()
self.cmds, self.arguments = self.__kubernetes_cmds_and_arguments()
- self.input_task_id = self.task_name + '_input'
- self.executors = self.__executors()
+ self.parallelism = self.__parallelism()
def apply_task_to_dag(self):
- input_task = None
+ if self.parallelism == 1:
+ return self.__apply_task_to_dag_single_executor()
+ else:
+ return self.__apply_task_to_dag_multiple_executors()
- if self.input_type in ['static', 'task']:
- input_task = self.__input_task()
+ def _volumes_config(self):
+ volumes_config = self.liminal_config.get('volumes', [])
- if self.executors == 1:
- return self.__apply_task_to_dag_single_executor(input_task)
- else:
- return self.__apply_task_to_dag_multiple_executors(input_task)
+ for volume in volumes_config:
+ if 'local' in volume:
+ volume['persistentVolumeClaim'] = {
+ 'claimName': f"{volume['volume']}-pvc"
+ }
- def __apply_task_to_dag_multiple_executors(self, input_task):
- if not input_task:
- input_task = DummyOperator(
- task_id=self.input_task_id,
- trigger_rule=self.trigger_rule,
- dag=self.dag
- )
+ return volumes_config
+
+ def __apply_task_to_dag_multiple_executors(self):
+ start_task = DummyOperator(
+ task_id=f'{self.task_name}_parallelize',
+ trigger_rule=self.trigger_rule,
+ dag=self.dag
+ )
end_task = DummyOperator(
task_id=self.task_name,
dag=self.dag
)
if self.parent:
- self.parent.set_downstream(input_task)
+ self.parent.set_downstream(start_task)
- for i in range(self.executors):
+ for i in range(self.parallelism):
split_task = self.__create_pod_operator(
- task_id=f'''{self.task_name}_{i}''',
- task_split=i,
- image=self.image
+ image=self.image,
+ task_id=i
)
- input_task.set_downstream(split_task)
+ start_task.set_downstream(split_task)
split_task.set_downstream(end_task)
return end_task
- def __create_pod_operator(self, task_id, task_split, image):
- return KubernetesPodOperatorWithInputAndOutput(
- task_id=task_id,
- input_task_id=self.input_task_id,
- task_split=task_split if task_split else 0,
+ def __create_pod_operator(self, image: str, task_id: Optional[int] = None):
+ env_vars = self.env_vars
+
+ if task_id is not None:
+ env_vars = self.env_vars.copy()
+ env_vars['LIMINAL_SPLIT_ID'] = str(task_id)
Review comment:
We need it.
It does have to do with executors rather than volumes but I added it in this
PR so I can test functionality I wanted in `test_python_image_builder`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]