aviemzur commented on a change in pull request #5:
URL: https://github.com/apache/incubator-liminal/pull/5#discussion_r524043100



##########
File path: liminal/runners/airflow/tasks/python.py
##########
@@ -15,131 +15,118 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-import json
+from typing import Optional
 
-from airflow.models import Variable
+from airflow.contrib.kubernetes.volume import Volume
+from airflow.contrib.kubernetes.volume_mount import VolumeMount
+from airflow.contrib.operators.kubernetes_pod_operator import 
KubernetesPodOperator
 from airflow.operators.dummy_operator import DummyOperator
 
+from liminal.kubernetes import volume_util
 from liminal.runners.airflow.config.standalone_variable_backend import 
get_variable
 from liminal.runners.airflow.model import task
-from 
liminal.runners.airflow.operators.kubernetes_pod_operator_with_input_output 
import \
-    KubernetesPodOperatorWithInputAndOutput, \
-    PrepareInputOperator
 
 
 class PythonTask(task.Task):
     """
     Python task.
     """
 
-    def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
-        super().__init__(dag, pipeline_name, parent, config, trigger_rule)
-
-        self.input_type = self.config['input_type']
-        self.input_path = self.config['input_path']
-        self.task_name = self.config['task']
-        self.image = self.config['image']
+    def __init__(self, dag, liminal_config, pipeline_config, task_config, 
parent, trigger_rule):
+        super().__init__(dag, liminal_config, pipeline_config, task_config, 
parent, trigger_rule)
+        self.task_name = self.task_config['task']
+        self.image = self.task_config['image']
+        self.volumes_config = self._volumes_config()
+        self.mounts = self.task_config.get('mounts', [])
+        self._create_local_volumes()
         self.resources = self.__kubernetes_resources()
         self.env_vars = self.__env_vars()
         self.kubernetes_kwargs = self.__kubernetes_kwargs()
         self.cmds, self.arguments = self.__kubernetes_cmds_and_arguments()
-        self.input_task_id = self.task_name + '_input'
-        self.executors = self.__executors()
+        self.parallelism = self.__parallelism()
 
     def apply_task_to_dag(self):
-        input_task = None
+        if self.parallelism == 1:
+            return self.__apply_task_to_dag_single_executor()
+        else:
+            return self.__apply_task_to_dag_multiple_executors()
 
-        if self.input_type in ['static', 'task']:
-            input_task = self.__input_task()
+    def _volumes_config(self):
+        volumes_config = self.liminal_config.get('volumes', [])
 
-        if self.executors == 1:
-            return self.__apply_task_to_dag_single_executor(input_task)
-        else:
-            return self.__apply_task_to_dag_multiple_executors(input_task)
+        for volume in volumes_config:
+            if 'local' in volume:
+                volume['persistentVolumeClaim'] = {
+                    'claimName': f"{volume['volume']}-pvc"
+                }
 
-    def __apply_task_to_dag_multiple_executors(self, input_task):
-        if not input_task:
-            input_task = DummyOperator(
-                task_id=self.input_task_id,
-                trigger_rule=self.trigger_rule,
-                dag=self.dag
-            )
+        return volumes_config
+
+    def __apply_task_to_dag_multiple_executors(self):
+        start_task = DummyOperator(
+            task_id=f'{self.task_name}_parallelize',
+            trigger_rule=self.trigger_rule,
+            dag=self.dag
+        )
 
         end_task = DummyOperator(
             task_id=self.task_name,
             dag=self.dag
         )
 
         if self.parent:
-            self.parent.set_downstream(input_task)
+            self.parent.set_downstream(start_task)
 
-            for i in range(self.executors):
+            for i in range(self.parallelism):
                 split_task = self.__create_pod_operator(
-                    task_id=f'''{self.task_name}_{i}''',
-                    task_split=i,
-                    image=self.image
+                    image=self.image,
+                    task_id=i
                 )
 
-                input_task.set_downstream(split_task)
+                start_task.set_downstream(split_task)
 
                 split_task.set_downstream(end_task)
 
         return end_task
 
-    def __create_pod_operator(self, task_id, task_split, image):
-        return KubernetesPodOperatorWithInputAndOutput(
-            task_id=task_id,
-            input_task_id=self.input_task_id,
-            task_split=task_split if task_split else 0,
+    def __create_pod_operator(self, image: str, task_id: Optional[int] = None):
+        env_vars = self.env_vars
+
+        if task_id is not None:
+            env_vars = self.env_vars.copy()
+            env_vars['LIMINAL_SPLIT_ID'] = str(task_id)

Review comment:
       We need it.
   It does have to do with executors rather than volumes but I added it in this 
PR so I can test functionality I wanted in `test_python_image_builder`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to