assapin commented on a change in pull request #5:
URL: https://github.com/apache/incubator-liminal/pull/5#discussion_r521559299



##########
File path: README.md
##########
@@ -36,52 +36,53 @@ perform), application servers,  and more.
 
 ## Example YAML config file
 ```yaml
+---
 name: MyPipeline
-owner: Bosco Albert Baracus
+volumes:
+  - volume: myvol1
+    local:
+      path: /tmp/liminal_tests
 pipelines:
   - pipeline: my_pipeline
+    owner: Bosco Albert Baracus
     start_date: 1970-01-01
     timeout_minutes: 45
     schedule: 0 * 1 * *
+    default_arg_loaded: check

Review comment:
       do you think these fields are self explanatory? this yaml is getting 
complicated and it's not easy to undertasnd what it exemplifies.

##########
File path: liminal/runners/airflow/tasks/python.py
##########
@@ -15,131 +15,118 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-import json
+from typing import Optional
 
-from airflow.models import Variable
+from airflow.contrib.kubernetes.volume import Volume
+from airflow.contrib.kubernetes.volume_mount import VolumeMount
+from airflow.contrib.operators.kubernetes_pod_operator import 
KubernetesPodOperator
 from airflow.operators.dummy_operator import DummyOperator
 
+from liminal.kubernetes import volume_util
 from liminal.runners.airflow.config.standalone_variable_backend import 
get_variable
 from liminal.runners.airflow.model import task
-from 
liminal.runners.airflow.operators.kubernetes_pod_operator_with_input_output 
import \
-    KubernetesPodOperatorWithInputAndOutput, \
-    PrepareInputOperator
 
 
 class PythonTask(task.Task):
     """
     Python task.
     """
 
-    def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
-        super().__init__(dag, pipeline_name, parent, config, trigger_rule)
-
-        self.input_type = self.config['input_type']
-        self.input_path = self.config['input_path']
-        self.task_name = self.config['task']
-        self.image = self.config['image']
+    def __init__(self, dag, liminal_config, pipeline_config, task_config, 
parent, trigger_rule):
+        super().__init__(dag, liminal_config, pipeline_config, task_config, 
parent, trigger_rule)
+        self.task_name = self.task_config['task']
+        self.image = self.task_config['image']
+        self.volumes_config = self._volumes_config()
+        self.mounts = self.task_config.get('mounts', [])
+        self._create_local_volumes()
         self.resources = self.__kubernetes_resources()
         self.env_vars = self.__env_vars()
         self.kubernetes_kwargs = self.__kubernetes_kwargs()
         self.cmds, self.arguments = self.__kubernetes_cmds_and_arguments()
-        self.input_task_id = self.task_name + '_input'
-        self.executors = self.__executors()
+        self.parallelism = self.__parallelism()
 
     def apply_task_to_dag(self):
-        input_task = None
+        if self.parallelism == 1:
+            return self.__apply_task_to_dag_single_executor()
+        else:
+            return self.__apply_task_to_dag_multiple_executors()
 
-        if self.input_type in ['static', 'task']:
-            input_task = self.__input_task()
+    def _volumes_config(self):
+        volumes_config = self.liminal_config.get('volumes', [])
 
-        if self.executors == 1:
-            return self.__apply_task_to_dag_single_executor(input_task)
-        else:
-            return self.__apply_task_to_dag_multiple_executors(input_task)
+        for volume in volumes_config:
+            if 'local' in volume:
+                volume['persistentVolumeClaim'] = {
+                    'claimName': f"{volume['volume']}-pvc"
+                }
 
-    def __apply_task_to_dag_multiple_executors(self, input_task):
-        if not input_task:
-            input_task = DummyOperator(
-                task_id=self.input_task_id,
-                trigger_rule=self.trigger_rule,
-                dag=self.dag
-            )
+        return volumes_config
+
+    def __apply_task_to_dag_multiple_executors(self):
+        start_task = DummyOperator(
+            task_id=f'{self.task_name}_parallelize',
+            trigger_rule=self.trigger_rule,
+            dag=self.dag
+        )
 
         end_task = DummyOperator(
             task_id=self.task_name,
             dag=self.dag
         )
 
         if self.parent:
-            self.parent.set_downstream(input_task)
+            self.parent.set_downstream(start_task)
 
-            for i in range(self.executors):
+            for i in range(self.parallelism):
                 split_task = self.__create_pod_operator(
-                    task_id=f'''{self.task_name}_{i}''',
-                    task_split=i,
-                    image=self.image
+                    image=self.image,
+                    task_id=i
                 )
 
-                input_task.set_downstream(split_task)
+                start_task.set_downstream(split_task)
 
                 split_task.set_downstream(end_task)
 
         return end_task
 
-    def __create_pod_operator(self, task_id, task_split, image):
-        return KubernetesPodOperatorWithInputAndOutput(
-            task_id=task_id,
-            input_task_id=self.input_task_id,
-            task_split=task_split if task_split else 0,
+    def __create_pod_operator(self, image: str, task_id: Optional[int] = None):
+        env_vars = self.env_vars
+
+        if task_id is not None:
+            env_vars = self.env_vars.copy()
+            env_vars['LIMINAL_SPLIT_ID'] = str(task_id)

Review comment:
       is this functinoality needed for volumes? or is it a leak from 
executors/parallelism?

##########
File path: README.md
##########
@@ -36,52 +36,53 @@ perform), application servers,  and more.
 
 ## Example YAML config file
 ```yaml
+---
 name: MyPipeline
-owner: Bosco Albert Baracus
+volumes:
+  - volume: myvol1
+    local:
+      path: /tmp/liminal_tests
 pipelines:
   - pipeline: my_pipeline
+    owner: Bosco Albert Baracus
     start_date: 1970-01-01
     timeout_minutes: 45
     schedule: 0 * 1 * *
+    default_arg_loaded: check
+    default_array_loaded: [2, 3, 4]
+    default_object_loaded:
+      key1: val1
+      key2: val2
     metrics:
-     namespace: TestNamespace
-     backends: [ 'cloudwatch' ]
+      namespace: TestNamespace
+      backends: [ ]
     tasks:
-      - task: my_static_input_task
+      - task: my_python_task
         type: python
         description: static input task
-        image: my_static_input_task_image
-        source: helloworld
+        image: my_python_task_img
+        source: write_inputs
         env_vars:
-          env1: "a"
-          env2: "b"
-        input_type: static
-        input_path: '[ { "foo": "bar" }, { "foo": "baz" } ]'
-        output_path: /output.json
-        cmd: python -u hello_world.py
-      - task: my_parallelized_static_input_task
+          NUM_FILES: 10
+          NUM_SPLITS: 3
+        mounts:
+          - mount: mymount
+            volume: myvol1
+            path: /mnt/vol1
+        cmd: python -u write_inputs.py
+      - task: my_parallelized_python_task
         type: python
-        description: parallelized static input task
-        image: my_static_input_task_image
+        description: parallelized python task
+        image: my_parallelized_python_task_img
+        source: write_outputs
         env_vars:
-          env1: "a"
-          env2: "b"
-        input_type: static
-        input_path: '[ { "foo": "bar" }, { "foo": "baz" } ]'
-        split_input: True
-        executors: 2
-        cmd: python -u helloworld.py
-      - task: my_task_output_input_task
-        type: python
-        description: task with input from other task's output
-        image: my_task_output_input_task_image
-        source: helloworld
-        env_vars:
-          env1: "a"
-          env2: "b"
-        input_type: task
-        input_path: my_static_input_task
-        cmd: python -u hello_world.py
+          FOO: BAR
+        parallelism: 3
+        mounts:

Review comment:
       if we are expecting a DS to write this, can we add some README on how to 
use this local vs. regular volumes etc.?

##########
File path: README.md
##########
@@ -36,52 +36,53 @@ perform), application servers,  and more.
 
 ## Example YAML config file
 ```yaml
+---
 name: MyPipeline
-owner: Bosco Albert Baracus
+volumes:
+  - volume: myvol1
+    local:
+      path: /tmp/liminal_tests
 pipelines:
   - pipeline: my_pipeline
+    owner: Bosco Albert Baracus
     start_date: 1970-01-01
     timeout_minutes: 45
     schedule: 0 * 1 * *
+    default_arg_loaded: check
+    default_array_loaded: [2, 3, 4]
+    default_object_loaded:
+      key1: val1
+      key2: val2
     metrics:
-     namespace: TestNamespace
-     backends: [ 'cloudwatch' ]
+      namespace: TestNamespace
+      backends: [ ]
     tasks:
-      - task: my_static_input_task
+      - task: my_python_task
         type: python
         description: static input task
-        image: my_static_input_task_image
-        source: helloworld
+        image: my_python_task_img
+        source: write_inputs
         env_vars:
-          env1: "a"
-          env2: "b"
-        input_type: static
-        input_path: '[ { "foo": "bar" }, { "foo": "baz" } ]'
-        output_path: /output.json
-        cmd: python -u hello_world.py
-      - task: my_parallelized_static_input_task
+          NUM_FILES: 10
+          NUM_SPLITS: 3
+        mounts:
+          - mount: mymount
+            volume: myvol1
+            path: /mnt/vol1
+        cmd: python -u write_inputs.py
+      - task: my_parallelized_python_task
         type: python
-        description: parallelized static input task
-        image: my_static_input_task_image
+        description: parallelized python task
+        image: my_parallelized_python_task_img
+        source: write_outputs
         env_vars:
-          env1: "a"
-          env2: "b"
-        input_type: static
-        input_path: '[ { "foo": "bar" }, { "foo": "baz" } ]'
-        split_input: True
-        executors: 2
-        cmd: python -u helloworld.py
-      - task: my_task_output_input_task
-        type: python
-        description: task with input from other task's output
-        image: my_task_output_input_task_image
-        source: helloworld
-        env_vars:
-          env1: "a"
-          env2: "b"
-        input_type: task
-        input_path: my_static_input_task
-        cmd: python -u hello_world.py
+          FOO: BAR
+        parallelism: 3

Review comment:
       as agreed revert the terminology

##########
File path: liminal/runners/airflow/tasks/python.py
##########
@@ -15,131 +15,118 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-import json
+from typing import Optional
 
-from airflow.models import Variable
+from airflow.contrib.kubernetes.volume import Volume
+from airflow.contrib.kubernetes.volume_mount import VolumeMount
+from airflow.contrib.operators.kubernetes_pod_operator import 
KubernetesPodOperator
 from airflow.operators.dummy_operator import DummyOperator
 
+from liminal.kubernetes import volume_util
 from liminal.runners.airflow.config.standalone_variable_backend import 
get_variable
 from liminal.runners.airflow.model import task
-from 
liminal.runners.airflow.operators.kubernetes_pod_operator_with_input_output 
import \
-    KubernetesPodOperatorWithInputAndOutput, \
-    PrepareInputOperator
 
 
 class PythonTask(task.Task):
     """
     Python task.
     """
 
-    def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
-        super().__init__(dag, pipeline_name, parent, config, trigger_rule)
-
-        self.input_type = self.config['input_type']
-        self.input_path = self.config['input_path']
-        self.task_name = self.config['task']
-        self.image = self.config['image']
+    def __init__(self, dag, liminal_config, pipeline_config, task_config, 
parent, trigger_rule):
+        super().__init__(dag, liminal_config, pipeline_config, task_config, 
parent, trigger_rule)
+        self.task_name = self.task_config['task']
+        self.image = self.task_config['image']
+        self.volumes_config = self._volumes_config()
+        self.mounts = self.task_config.get('mounts', [])
+        self._create_local_volumes()
         self.resources = self.__kubernetes_resources()
         self.env_vars = self.__env_vars()
         self.kubernetes_kwargs = self.__kubernetes_kwargs()
         self.cmds, self.arguments = self.__kubernetes_cmds_and_arguments()
-        self.input_task_id = self.task_name + '_input'
-        self.executors = self.__executors()
+        self.parallelism = self.__parallelism()
 
     def apply_task_to_dag(self):
-        input_task = None
+        if self.parallelism == 1:
+            return self.__apply_task_to_dag_single_executor()
+        else:
+            return self.__apply_task_to_dag_multiple_executors()
 
-        if self.input_type in ['static', 'task']:
-            input_task = self.__input_task()
+    def _volumes_config(self):
+        volumes_config = self.liminal_config.get('volumes', [])
 
-        if self.executors == 1:
-            return self.__apply_task_to_dag_single_executor(input_task)
-        else:
-            return self.__apply_task_to_dag_multiple_executors(input_task)
+        for volume in volumes_config:
+            if 'local' in volume:
+                volume['persistentVolumeClaim'] = {
+                    'claimName': f"{volume['volume']}-pvc"
+                }
 
-    def __apply_task_to_dag_multiple_executors(self, input_task):
-        if not input_task:
-            input_task = DummyOperator(
-                task_id=self.input_task_id,
-                trigger_rule=self.trigger_rule,
-                dag=self.dag
-            )
+        return volumes_config
+
+    def __apply_task_to_dag_multiple_executors(self):
+        start_task = DummyOperator(
+            task_id=f'{self.task_name}_parallelize',
+            trigger_rule=self.trigger_rule,
+            dag=self.dag
+        )
 
         end_task = DummyOperator(
             task_id=self.task_name,
             dag=self.dag
         )
 
         if self.parent:
-            self.parent.set_downstream(input_task)
+            self.parent.set_downstream(start_task)
 
-            for i in range(self.executors):
+            for i in range(self.parallelism):

Review comment:
       more usage of parallelism term




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to