assapin commented on a change in pull request #5:
URL: https://github.com/apache/incubator-liminal/pull/5#discussion_r521559299
##########
File path: README.md
##########
@@ -36,52 +36,53 @@ perform), application servers, and more.
## Example YAML config file
```yaml
+---
name: MyPipeline
-owner: Bosco Albert Baracus
+volumes:
+ - volume: myvol1
+ local:
+ path: /tmp/liminal_tests
pipelines:
- pipeline: my_pipeline
+ owner: Bosco Albert Baracus
start_date: 1970-01-01
timeout_minutes: 45
schedule: 0 * 1 * *
+ default_arg_loaded: check
Review comment:
do you think these fields are self explanatory? this yaml is getting
complicated and it's not easy to undertasnd what it exemplifies.
##########
File path: liminal/runners/airflow/tasks/python.py
##########
@@ -15,131 +15,118 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-import json
+from typing import Optional
-from airflow.models import Variable
+from airflow.contrib.kubernetes.volume import Volume
+from airflow.contrib.kubernetes.volume_mount import VolumeMount
+from airflow.contrib.operators.kubernetes_pod_operator import
KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator
+from liminal.kubernetes import volume_util
from liminal.runners.airflow.config.standalone_variable_backend import
get_variable
from liminal.runners.airflow.model import task
-from
liminal.runners.airflow.operators.kubernetes_pod_operator_with_input_output
import \
- KubernetesPodOperatorWithInputAndOutput, \
- PrepareInputOperator
class PythonTask(task.Task):
"""
Python task.
"""
- def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
- super().__init__(dag, pipeline_name, parent, config, trigger_rule)
-
- self.input_type = self.config['input_type']
- self.input_path = self.config['input_path']
- self.task_name = self.config['task']
- self.image = self.config['image']
+ def __init__(self, dag, liminal_config, pipeline_config, task_config,
parent, trigger_rule):
+ super().__init__(dag, liminal_config, pipeline_config, task_config,
parent, trigger_rule)
+ self.task_name = self.task_config['task']
+ self.image = self.task_config['image']
+ self.volumes_config = self._volumes_config()
+ self.mounts = self.task_config.get('mounts', [])
+ self._create_local_volumes()
self.resources = self.__kubernetes_resources()
self.env_vars = self.__env_vars()
self.kubernetes_kwargs = self.__kubernetes_kwargs()
self.cmds, self.arguments = self.__kubernetes_cmds_and_arguments()
- self.input_task_id = self.task_name + '_input'
- self.executors = self.__executors()
+ self.parallelism = self.__parallelism()
def apply_task_to_dag(self):
- input_task = None
+ if self.parallelism == 1:
+ return self.__apply_task_to_dag_single_executor()
+ else:
+ return self.__apply_task_to_dag_multiple_executors()
- if self.input_type in ['static', 'task']:
- input_task = self.__input_task()
+ def _volumes_config(self):
+ volumes_config = self.liminal_config.get('volumes', [])
- if self.executors == 1:
- return self.__apply_task_to_dag_single_executor(input_task)
- else:
- return self.__apply_task_to_dag_multiple_executors(input_task)
+ for volume in volumes_config:
+ if 'local' in volume:
+ volume['persistentVolumeClaim'] = {
+ 'claimName': f"{volume['volume']}-pvc"
+ }
- def __apply_task_to_dag_multiple_executors(self, input_task):
- if not input_task:
- input_task = DummyOperator(
- task_id=self.input_task_id,
- trigger_rule=self.trigger_rule,
- dag=self.dag
- )
+ return volumes_config
+
+ def __apply_task_to_dag_multiple_executors(self):
+ start_task = DummyOperator(
+ task_id=f'{self.task_name}_parallelize',
+ trigger_rule=self.trigger_rule,
+ dag=self.dag
+ )
end_task = DummyOperator(
task_id=self.task_name,
dag=self.dag
)
if self.parent:
- self.parent.set_downstream(input_task)
+ self.parent.set_downstream(start_task)
- for i in range(self.executors):
+ for i in range(self.parallelism):
split_task = self.__create_pod_operator(
- task_id=f'''{self.task_name}_{i}''',
- task_split=i,
- image=self.image
+ image=self.image,
+ task_id=i
)
- input_task.set_downstream(split_task)
+ start_task.set_downstream(split_task)
split_task.set_downstream(end_task)
return end_task
- def __create_pod_operator(self, task_id, task_split, image):
- return KubernetesPodOperatorWithInputAndOutput(
- task_id=task_id,
- input_task_id=self.input_task_id,
- task_split=task_split if task_split else 0,
+ def __create_pod_operator(self, image: str, task_id: Optional[int] = None):
+ env_vars = self.env_vars
+
+ if task_id is not None:
+ env_vars = self.env_vars.copy()
+ env_vars['LIMINAL_SPLIT_ID'] = str(task_id)
Review comment:
is this functinoality needed for volumes? or is it a leak from
executors/parallelism?
##########
File path: README.md
##########
@@ -36,52 +36,53 @@ perform), application servers, and more.
## Example YAML config file
```yaml
+---
name: MyPipeline
-owner: Bosco Albert Baracus
+volumes:
+ - volume: myvol1
+ local:
+ path: /tmp/liminal_tests
pipelines:
- pipeline: my_pipeline
+ owner: Bosco Albert Baracus
start_date: 1970-01-01
timeout_minutes: 45
schedule: 0 * 1 * *
+ default_arg_loaded: check
+ default_array_loaded: [2, 3, 4]
+ default_object_loaded:
+ key1: val1
+ key2: val2
metrics:
- namespace: TestNamespace
- backends: [ 'cloudwatch' ]
+ namespace: TestNamespace
+ backends: [ ]
tasks:
- - task: my_static_input_task
+ - task: my_python_task
type: python
description: static input task
- image: my_static_input_task_image
- source: helloworld
+ image: my_python_task_img
+ source: write_inputs
env_vars:
- env1: "a"
- env2: "b"
- input_type: static
- input_path: '[ { "foo": "bar" }, { "foo": "baz" } ]'
- output_path: /output.json
- cmd: python -u hello_world.py
- - task: my_parallelized_static_input_task
+ NUM_FILES: 10
+ NUM_SPLITS: 3
+ mounts:
+ - mount: mymount
+ volume: myvol1
+ path: /mnt/vol1
+ cmd: python -u write_inputs.py
+ - task: my_parallelized_python_task
type: python
- description: parallelized static input task
- image: my_static_input_task_image
+ description: parallelized python task
+ image: my_parallelized_python_task_img
+ source: write_outputs
env_vars:
- env1: "a"
- env2: "b"
- input_type: static
- input_path: '[ { "foo": "bar" }, { "foo": "baz" } ]'
- split_input: True
- executors: 2
- cmd: python -u helloworld.py
- - task: my_task_output_input_task
- type: python
- description: task with input from other task's output
- image: my_task_output_input_task_image
- source: helloworld
- env_vars:
- env1: "a"
- env2: "b"
- input_type: task
- input_path: my_static_input_task
- cmd: python -u hello_world.py
+ FOO: BAR
+ parallelism: 3
+ mounts:
Review comment:
if we are expecting a DS to write this, can we add some README on how to
use this local vs. regular volumes etc.?
##########
File path: README.md
##########
@@ -36,52 +36,53 @@ perform), application servers, and more.
## Example YAML config file
```yaml
+---
name: MyPipeline
-owner: Bosco Albert Baracus
+volumes:
+ - volume: myvol1
+ local:
+ path: /tmp/liminal_tests
pipelines:
- pipeline: my_pipeline
+ owner: Bosco Albert Baracus
start_date: 1970-01-01
timeout_minutes: 45
schedule: 0 * 1 * *
+ default_arg_loaded: check
+ default_array_loaded: [2, 3, 4]
+ default_object_loaded:
+ key1: val1
+ key2: val2
metrics:
- namespace: TestNamespace
- backends: [ 'cloudwatch' ]
+ namespace: TestNamespace
+ backends: [ ]
tasks:
- - task: my_static_input_task
+ - task: my_python_task
type: python
description: static input task
- image: my_static_input_task_image
- source: helloworld
+ image: my_python_task_img
+ source: write_inputs
env_vars:
- env1: "a"
- env2: "b"
- input_type: static
- input_path: '[ { "foo": "bar" }, { "foo": "baz" } ]'
- output_path: /output.json
- cmd: python -u hello_world.py
- - task: my_parallelized_static_input_task
+ NUM_FILES: 10
+ NUM_SPLITS: 3
+ mounts:
+ - mount: mymount
+ volume: myvol1
+ path: /mnt/vol1
+ cmd: python -u write_inputs.py
+ - task: my_parallelized_python_task
type: python
- description: parallelized static input task
- image: my_static_input_task_image
+ description: parallelized python task
+ image: my_parallelized_python_task_img
+ source: write_outputs
env_vars:
- env1: "a"
- env2: "b"
- input_type: static
- input_path: '[ { "foo": "bar" }, { "foo": "baz" } ]'
- split_input: True
- executors: 2
- cmd: python -u helloworld.py
- - task: my_task_output_input_task
- type: python
- description: task with input from other task's output
- image: my_task_output_input_task_image
- source: helloworld
- env_vars:
- env1: "a"
- env2: "b"
- input_type: task
- input_path: my_static_input_task
- cmd: python -u hello_world.py
+ FOO: BAR
+ parallelism: 3
Review comment:
as agreed revert the terminology
##########
File path: liminal/runners/airflow/tasks/python.py
##########
@@ -15,131 +15,118 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-import json
+from typing import Optional
-from airflow.models import Variable
+from airflow.contrib.kubernetes.volume import Volume
+from airflow.contrib.kubernetes.volume_mount import VolumeMount
+from airflow.contrib.operators.kubernetes_pod_operator import
KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator
+from liminal.kubernetes import volume_util
from liminal.runners.airflow.config.standalone_variable_backend import
get_variable
from liminal.runners.airflow.model import task
-from
liminal.runners.airflow.operators.kubernetes_pod_operator_with_input_output
import \
- KubernetesPodOperatorWithInputAndOutput, \
- PrepareInputOperator
class PythonTask(task.Task):
"""
Python task.
"""
- def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
- super().__init__(dag, pipeline_name, parent, config, trigger_rule)
-
- self.input_type = self.config['input_type']
- self.input_path = self.config['input_path']
- self.task_name = self.config['task']
- self.image = self.config['image']
+ def __init__(self, dag, liminal_config, pipeline_config, task_config,
parent, trigger_rule):
+ super().__init__(dag, liminal_config, pipeline_config, task_config,
parent, trigger_rule)
+ self.task_name = self.task_config['task']
+ self.image = self.task_config['image']
+ self.volumes_config = self._volumes_config()
+ self.mounts = self.task_config.get('mounts', [])
+ self._create_local_volumes()
self.resources = self.__kubernetes_resources()
self.env_vars = self.__env_vars()
self.kubernetes_kwargs = self.__kubernetes_kwargs()
self.cmds, self.arguments = self.__kubernetes_cmds_and_arguments()
- self.input_task_id = self.task_name + '_input'
- self.executors = self.__executors()
+ self.parallelism = self.__parallelism()
def apply_task_to_dag(self):
- input_task = None
+ if self.parallelism == 1:
+ return self.__apply_task_to_dag_single_executor()
+ else:
+ return self.__apply_task_to_dag_multiple_executors()
- if self.input_type in ['static', 'task']:
- input_task = self.__input_task()
+ def _volumes_config(self):
+ volumes_config = self.liminal_config.get('volumes', [])
- if self.executors == 1:
- return self.__apply_task_to_dag_single_executor(input_task)
- else:
- return self.__apply_task_to_dag_multiple_executors(input_task)
+ for volume in volumes_config:
+ if 'local' in volume:
+ volume['persistentVolumeClaim'] = {
+ 'claimName': f"{volume['volume']}-pvc"
+ }
- def __apply_task_to_dag_multiple_executors(self, input_task):
- if not input_task:
- input_task = DummyOperator(
- task_id=self.input_task_id,
- trigger_rule=self.trigger_rule,
- dag=self.dag
- )
+ return volumes_config
+
+ def __apply_task_to_dag_multiple_executors(self):
+ start_task = DummyOperator(
+ task_id=f'{self.task_name}_parallelize',
+ trigger_rule=self.trigger_rule,
+ dag=self.dag
+ )
end_task = DummyOperator(
task_id=self.task_name,
dag=self.dag
)
if self.parent:
- self.parent.set_downstream(input_task)
+ self.parent.set_downstream(start_task)
- for i in range(self.executors):
+ for i in range(self.parallelism):
Review comment:
more usage of parallelism term
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]