This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch wayang-211 in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit cecaf8ec971beafb689fa6f4344b364c88be8e15 Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Mon Apr 11 11:19:11 2022 +0200 [WAYANG-#211] restructure the channels for Python-Platform Signed-off-by: bertty <[email protected]> --- python/src/pywy/platforms/commons/__init__.py | 16 +++++++++ .../pywy/platforms/{python => commons}/channels.py | 28 ++++----------- python/src/pywy/platforms/python/channels.py | 40 +--------------------- .../platforms/python/operator/py_sink_textfile.py | 3 +- .../python/operator/py_source_textfile.py | 5 ++- .../platforms/python/operator/py_unary_filter.py | 27 ++++++++------- .../platforms/python/operator/py_unary_flatmap.py | 27 ++++++++------- .../pywy/platforms/python/operator/py_unary_map.py | 27 ++++++++------- 8 files changed, 68 insertions(+), 105 deletions(-) diff --git a/python/src/pywy/platforms/commons/__init__.py b/python/src/pywy/platforms/commons/__init__.py new file mode 100644 index 00000000..8d2bad81 --- /dev/null +++ b/python/src/pywy/platforms/commons/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# \ No newline at end of file diff --git a/python/src/pywy/platforms/python/channels.py b/python/src/pywy/platforms/commons/channels.py similarity index 66% copy from python/src/pywy/platforms/python/channels.py copy to python/src/pywy/platforms/commons/channels.py index 0d0f65e6..743a7169 100644 --- a/python/src/pywy/platforms/python/channels.py +++ b/python/src/pywy/platforms/commons/channels.py @@ -15,26 +15,11 @@ # limitations under the License. # -from typing import ( Iterable, Callable ) +from typing import Callable from pywy.core import (Channel, ChannelDescriptor) -class PyIteratorChannel(Channel): - - iterable: Iterable - - def __init__(self): - Channel.__init__(self) - - def provide_iterable(self) -> Iterable: - return self.iterable - - def accept_iterable(self, iterable: Iterable) -> 'PyIteratorChannel': - self.iterable = iterable - return self - - -class PyCallableChannel(Channel): +class CommonsCallableChannel(Channel): udf: Callable @@ -44,7 +29,7 @@ class PyCallableChannel(Channel): def provide_callable(self) -> Callable: return self.udf - def accept_callable(self, udf: Callable) -> 'PyCallableChannel': + def accept_callable(self, udf: Callable) -> 'CommonsCallableChannel': self.udf = udf return self @@ -55,7 +40,7 @@ class PyCallableChannel(Channel): return executable -class PyFileChannel(Channel): +class CommonsFileChannel(Channel): path: str @@ -70,6 +55,5 @@ class PyFileChannel(Channel): return self -PY_ITERATOR_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyIteratorChannel()), False, False) -PY_CALLABLE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyCallableChannel()), False, False) -PY_FILE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyFileChannel()), False, False) +COMMONS_CALLABLE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(CommonsCallableChannel()), False, False) +COMMONS_FILE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(CommonsFileChannel()), False, False) diff --git a/python/src/pywy/platforms/python/channels.py b/python/src/pywy/platforms/python/channels.py index 0d0f65e6..f79a67ed 100644 --- a/python/src/pywy/platforms/python/channels.py +++ b/python/src/pywy/platforms/python/channels.py @@ -15,7 +15,7 @@ # limitations under the License. # -from typing import ( Iterable, Callable ) +from typing import Iterable from pywy.core import (Channel, ChannelDescriptor) @@ -34,42 +34,4 @@ class PyIteratorChannel(Channel): return self -class PyCallableChannel(Channel): - - udf: Callable - - def __init__(self): - Channel.__init__(self) - - def provide_callable(self) -> Callable: - return self.udf - - def accept_callable(self, udf: Callable) -> 'PyCallableChannel': - self.udf = udf - return self - - @staticmethod - def concatenate(function_a: Callable, function_b: Callable): - def executable(iterable): - return function_a(function_b(iterable)) - return executable - - -class PyFileChannel(Channel): - - path: str - - def __init__(self): - Channel.__init__(self) - - def provide_path(self) -> str: - return self.path - - def accept_path(self, path: str) -> 'PyIteratorChannel': - self.path = path - return self - - PY_ITERATOR_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyIteratorChannel()), False, False) -PY_CALLABLE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyCallableChannel()), False, False) -PY_FILE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyFileChannel()), False, False) diff --git a/python/src/pywy/platforms/python/operator/py_sink_textfile.py b/python/src/pywy/platforms/python/operator/py_sink_textfile.py index 9914382f..7d8eec1b 100644 --- a/python/src/pywy/platforms/python/operator/py_sink_textfile.py +++ b/python/src/pywy/platforms/python/operator/py_sink_textfile.py @@ -17,11 +17,10 @@ from typing import Set, List, Type -from pywy.core.channel import CH_T +from pywy.core.channel import (CH_T, ChannelDescriptor) from pywy.operators.sink import TextFileSink from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator from pywy.platforms.python.channels import ( - ChannelDescriptor, PyIteratorChannel, PY_ITERATOR_CHANNEL_DESCRIPTOR ) diff --git a/python/src/pywy/platforms/python/operator/py_source_textfile.py b/python/src/pywy/platforms/python/operator/py_source_textfile.py index 30831880..245d090d 100644 --- a/python/src/pywy/platforms/python/operator/py_source_textfile.py +++ b/python/src/pywy/platforms/python/operator/py_source_textfile.py @@ -17,14 +17,13 @@ from typing import Set, List, Type -from pywy.core.channel import CH_T +from pywy.core.channel import (CH_T, ChannelDescriptor) from pywy.operators.source import TextFileSource from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator from pywy.platforms.python.channels import ( - ChannelDescriptor, PyIteratorChannel, PY_ITERATOR_CHANNEL_DESCRIPTOR - ) +) class PyTextFileSourceOperator(TextFileSource, PyExecutionOperator): diff --git a/python/src/pywy/platforms/python/operator/py_unary_filter.py b/python/src/pywy/platforms/python/operator/py_unary_filter.py index 0788e974..2d807282 100644 --- a/python/src/pywy/platforms/python/operator/py_unary_filter.py +++ b/python/src/pywy/platforms/python/operator/py_unary_filter.py @@ -17,16 +17,17 @@ from typing import Set, List, Type -from pywy.core.channel import CH_T +from pywy.core.channel import CH_T, ChannelDescriptor from pywy.operators.unary import FilterOperator from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator +from pywy.platforms.commons.channels import ( + COMMONS_CALLABLE_CHANNEL_DESCRIPTOR, + CommonsCallableChannel +) from pywy.platforms.python.channels import ( - ChannelDescriptor, - PyIteratorChannel, - PY_ITERATOR_CHANNEL_DESCRIPTOR, - PY_CALLABLE_CHANNEL_DESCRIPTOR, - PyCallableChannel - ) + PyIteratorChannel, + PY_ITERATOR_CHANNEL_DESCRIPTOR, +) class PyFilterOperator(FilterOperator, PyExecutionOperator): @@ -43,15 +44,15 @@ class PyFilterOperator(FilterOperator, PyExecutionOperator): py_in_iter_channel: PyIteratorChannel = inputs[0] py_out_iter_channel: PyIteratorChannel = outputs[0] py_out_iter_channel.accept_iterable(filter(udf, py_in_iter_channel.provide_iterable())) - elif isinstance(inputs[0], PyCallableChannel): - py_in_call_channel: PyCallableChannel = inputs[0] - py_out_call_channel: PyCallableChannel = outputs[0] + elif isinstance(inputs[0], CommonsCallableChannel): + py_in_call_channel: CommonsCallableChannel = inputs[0] + py_out_call_channel: CommonsCallableChannel = outputs[0] def func(iterator): return filter(udf, iterator) py_out_call_channel.accept_callable( - PyCallableChannel.concatenate( + CommonsCallableChannel.concatenate( func, py_in_call_channel.provide_callable() ) @@ -60,7 +61,7 @@ class PyFilterOperator(FilterOperator, PyExecutionOperator): raise Exception("Channel Type does not supported") def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]: - return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR} + return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR} def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]: - return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR} + return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR} diff --git a/python/src/pywy/platforms/python/operator/py_unary_flatmap.py b/python/src/pywy/platforms/python/operator/py_unary_flatmap.py index 97f467d4..72016a8c 100644 --- a/python/src/pywy/platforms/python/operator/py_unary_flatmap.py +++ b/python/src/pywy/platforms/python/operator/py_unary_flatmap.py @@ -18,16 +18,17 @@ from itertools import chain from typing import Set, List, Type -from pywy.core.channel import CH_T +from pywy.core.channel import (CH_T, ChannelDescriptor) from pywy.operators.unary import FlatmapOperator from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator +from pywy.platforms.commons.channels import ( + COMMONS_CALLABLE_CHANNEL_DESCRIPTOR, + CommonsCallableChannel +) from pywy.platforms.python.channels import ( - ChannelDescriptor, - PyIteratorChannel, - PY_ITERATOR_CHANNEL_DESCRIPTOR, - PY_CALLABLE_CHANNEL_DESCRIPTOR, - PyCallableChannel - ) + PyIteratorChannel, + PY_ITERATOR_CHANNEL_DESCRIPTOR, +) class PyFlatmapOperator(FlatmapOperator, PyExecutionOperator): @@ -43,15 +44,15 @@ class PyFlatmapOperator(FlatmapOperator, PyExecutionOperator): py_in_iter_channel: PyIteratorChannel = inputs[0] py_out_iter_channel: PyIteratorChannel = outputs[0] py_out_iter_channel.accept_iterable(chain.from_iterable(map(udf, py_in_iter_channel.provide_iterable()))) - elif isinstance(inputs[0], PyCallableChannel): - py_in_call_channel: PyCallableChannel = inputs[0] - py_out_call_channel: PyCallableChannel = outputs[0] + elif isinstance(inputs[0], CommonsCallableChannel): + py_in_call_channel: CommonsCallableChannel = inputs[0] + py_out_call_channel: CommonsCallableChannel = outputs[0] def fm_func(iterator): return chain.from_iterable(map(udf, iterator)) py_out_call_channel.accept_callable( - PyCallableChannel.concatenate( + CommonsCallableChannel.concatenate( fm_func, py_in_call_channel.provide_callable() ) @@ -60,7 +61,7 @@ class PyFlatmapOperator(FlatmapOperator, PyExecutionOperator): raise Exception("Channel Type does not supported") def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]: - return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR} + return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR} def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]: - return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR} + return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR} diff --git a/python/src/pywy/platforms/python/operator/py_unary_map.py b/python/src/pywy/platforms/python/operator/py_unary_map.py index b8741a92..a8e53a48 100644 --- a/python/src/pywy/platforms/python/operator/py_unary_map.py +++ b/python/src/pywy/platforms/python/operator/py_unary_map.py @@ -17,16 +17,17 @@ from typing import Set, List, Type -from pywy.core.channel import CH_T +from pywy.core.channel import (CH_T, ChannelDescriptor) from pywy.operators.unary import MapOperator from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator +from pywy.platforms.commons.channels import ( + COMMONS_CALLABLE_CHANNEL_DESCRIPTOR, + CommonsCallableChannel +) from pywy.platforms.python.channels import ( - ChannelDescriptor, - PyIteratorChannel, - PY_ITERATOR_CHANNEL_DESCRIPTOR, - PY_CALLABLE_CHANNEL_DESCRIPTOR, - PyCallableChannel - ) + PyIteratorChannel, + PY_ITERATOR_CHANNEL_DESCRIPTOR, +) class PyMapOperator(MapOperator, PyExecutionOperator): @@ -43,15 +44,15 @@ class PyMapOperator(MapOperator, PyExecutionOperator): py_in_iter_channel: PyIteratorChannel = inputs[0] py_out_iter_channel: PyIteratorChannel = outputs[0] py_out_iter_channel.accept_iterable(map(udf, py_in_iter_channel.provide_iterable())) - elif isinstance(inputs[0], PyCallableChannel): - py_in_call_channel: PyCallableChannel = inputs[0] - py_out_call_channel: PyCallableChannel = outputs[0] + elif isinstance(inputs[0], CommonsCallableChannel): + py_in_call_channel: CommonsCallableChannel = inputs[0] + py_out_call_channel: CommonsCallableChannel = outputs[0] def func(iterator): return map(udf, iterator) py_out_call_channel.accept_callable( - PyCallableChannel.concatenate( + CommonsCallableChannel.concatenate( func, py_in_call_channel.provide_callable() ) @@ -60,7 +61,7 @@ class PyMapOperator(MapOperator, PyExecutionOperator): raise Exception("Channel Type does not supported") def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]: - return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR} + return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR} def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]: - return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR} + return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}
