This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch python-platform in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 2894e5429d673fafd7186dcfbdeaae6b2b016090 Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Wed Apr 6 11:24:32 2022 +0200 [WAYANG-#8] Seed creation of Platforms/python Signed-off-by: bertty <[email protected]> --- python/src/pywayang/dataquanta.py | 6 +-- python/src/pywayang/operator/base.py | 34 ++++++++++-- python/src/pywayang/operator/source.py | 4 +- python/src/pywayang/operator/unary.py | 16 ++---- python/src/pywayang/platforms/__init__.py | 0 python/src/pywayang/platforms/python/__init__.py | 0 python/src/pywayang/platforms/python/channels.py | 60 ++++++++++++++++++++++ .../pywayang/platforms/python/compiler/__init__.py | 0 .../platforms/python/execution/__init__.py | 0 python/src/pywayang/platforms/python/mappings.py | 35 +++++++++++++ .../platforms/python/operators/PyFilterOperator.py | 43 ++++++++++++++++ .../python/operators/PythonExecutionOperator.py | 7 +++ .../platforms/python/operators/__init__.py | 7 +++ .../pywayang/platforms/python/platform/__init__.py | 0 .../pywayang/platforms/python/plugin/__init__.py | 0 python/src/pywayang/test.py | 50 +++++++++++++++--- 16 files changed, 236 insertions(+), 26 deletions(-) diff --git a/python/src/pywayang/dataquanta.py b/python/src/pywayang/dataquanta.py index 5f740941..3ccc2839 100644 --- a/python/src/pywayang/dataquanta.py +++ b/python/src/pywayang/dataquanta.py @@ -1,5 +1,5 @@ from pywayang.types import (GenericTco, Predicate, Function, FlatmapFunction, IterableO) -from pywayang.operator.base import (BaseOperator) +from pywayang.operator.base import (WyOperator) from pywayang.operator.unary import (FilterOperator, MapOperator, FlatmapOperator) @@ -7,9 +7,9 @@ class DataQuanta(GenericTco): """ Represents an intermediate result/data flow edge in a [[WayangPlan]]. """ - previous : BaseOperator = None + previous : WyOperator = None - def __init__(self, operator: BaseOperator): + def __init__(self, operator: WyOperator): self.operator = operator diff --git a/python/src/pywayang/operator/base.py b/python/src/pywayang/operator/base.py index ad2deed5..b7834d17 100644 --- a/python/src/pywayang/operator/base.py +++ b/python/src/pywayang/operator/base.py @@ -1,11 +1,13 @@ -from typing import (TypeVar, Optional, List) +from typing import (TypeVar, Optional, List, Set) +from pywayang.platforms.python.channels import ChannelDescriptor - -class BaseOperator: +class WyOperator: inputSlot : List[TypeVar] + inputChannel : ChannelDescriptor inputs : int outputSlot : List[TypeVar] + OutputChannel: ChannelDescriptor outputs: int def __init__(self, @@ -21,6 +23,32 @@ class BaseOperator: self.outputSlot = output self.outputs = output_lenght + def validateInputs(self, vec): + if len(vec) != self.inputs: + raise Exception( + "the inputs channel contains {} elements and need to have {}".format( + len(vec), + self.inputs + ) + ) + def validateOutputs(self, vec): + if len(vec) != self.outputs: + raise Exception( + "the output channel contains {} elements and need to have {}".format( + len(vec), + self.inputs + ) + ) + def validateChannels(self, input, output): + self.validateInputs(input) + self.validateOutputs(output) + + def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]: + pass + + def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]: + pass + def __str__(self): return "BaseOperator: \n\t- name: {}\n\t- inputs: {} {}\n\t- outputs: {} {} \n".format( str(self.name), diff --git a/python/src/pywayang/operator/source.py b/python/src/pywayang/operator/source.py index 34a16644..97aff75a 100644 --- a/python/src/pywayang/operator/source.py +++ b/python/src/pywayang/operator/source.py @@ -1,6 +1,6 @@ -from pywayang.operator.base import BaseOperator +from pywayang.operator.base import WyOperator -class SourceUnaryOperator(BaseOperator): +class SourceUnaryOperator(WyOperator): def __init__(self, name:str): super().__init__(name, None, str, 0, 1) diff --git a/python/src/pywayang/operator/unary.py b/python/src/pywayang/operator/unary.py index 24e5df2f..559effa5 100644 --- a/python/src/pywayang/operator/unary.py +++ b/python/src/pywayang/operator/unary.py @@ -1,4 +1,4 @@ -from pywayang.operator.base import BaseOperator +from pywayang.operator.base import WyOperator from pywayang.types import ( GenericTco, GenericUco, @@ -12,7 +12,7 @@ from pywayang.types import ( from itertools import chain -class UnaryToUnaryOperator(BaseOperator): +class UnaryToUnaryOperator(WyOperator): def __init__(self, name:str, input:GenericTco, output:GenericUco): super().__init__(name, input, output, 1, 1) @@ -30,16 +30,10 @@ class FilterOperator(UnaryToUnaryOperator): predicate: Predicate def __init__(self, predicate: Predicate): - type = getTypePredicate(predicate) + type = getTypePredicate(predicate) if predicate else None super().__init__("FilterOperator", type, type) self.predicate = predicate - def getWrapper(self): - udf = self.predicate - def func(iterator): - return filter(udf, iterator) - return func - def __str__(self): return super().__str__() @@ -51,7 +45,7 @@ class MapOperator(UnaryToUnaryOperator): function: Function def __init__(self, function: Function): - types = getTypeFunction(function) + types = getTypeFunction(function) if function else (None, None) super().__init__("MapOperator", types[0], types[1]) self.function = function @@ -73,7 +67,7 @@ class FlatmapOperator(UnaryToUnaryOperator): fmfunction: FlatmapFunction def __init__(self, fmfunction: FlatmapFunction): - types = getTypeFlatmapFunction(fmfunction) + types = getTypeFlatmapFunction(fmfunction) if fmfunction else (None, None) super().__init__("FlatmapOperator", types[0], types[1]) self.fmfunction = fmfunction diff --git a/python/src/pywayang/platforms/__init__.py b/python/src/pywayang/platforms/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/src/pywayang/platforms/python/__init__.py b/python/src/pywayang/platforms/python/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/src/pywayang/platforms/python/channels.py b/python/src/pywayang/platforms/python/channels.py new file mode 100644 index 00000000..f611a258 --- /dev/null +++ b/python/src/pywayang/platforms/python/channels.py @@ -0,0 +1,60 @@ +from typing import ( Iterable, Callable ) + +class Channel: + + def __init__(self): + pass + + def getchannel(self) -> 'Channel': + return self + + def gettype(self): + return type(self) + +class ChannelDescriptor: + + def __init__(self, channelType: type, isReusable: bool, isSuitableForBreakpoint: bool): + self.channelType = channelType + self.isReusable = isReusable + self.isSuitableForBreakpoint = isSuitableForBreakpoint + + def create_instance(self) -> Channel: + return self.channelType() + + +class PyIteratorChannel(Channel): + + iterable : Iterable + + def __init__(self): + Channel.__init__(self) + + def provide_iterable(self) -> Iterable: + return self.iterable + + def accept_iterable(self, iterable) -> 'PyIteratorChannel': + self.iterable = iterable + return self + +class PyCallableChannel(Channel): + + udf : Callable + + def __init__(self): + Channel.__init__(self) + + def provide_callable(self) -> Callable: + return self.udf + + def accept_callable(self, udf: Callable) -> 'PyCallableChannel': + self.udf = udf + return self + + @staticmethod + def concatenate(function_a: Callable, function_b: Callable): + def executable(iterable): + return function_a(function_b(iterable)) + return executable + +PyIteratorChannelDescriptor = ChannelDescriptor(type(PyIteratorChannel()), False, False) +PyCallableChannelDescriptor = ChannelDescriptor(type(PyCallableChannel()), False, False) \ No newline at end of file diff --git a/python/src/pywayang/platforms/python/compiler/__init__.py b/python/src/pywayang/platforms/python/compiler/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/src/pywayang/platforms/python/execution/__init__.py b/python/src/pywayang/platforms/python/execution/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/src/pywayang/platforms/python/mappings.py b/python/src/pywayang/platforms/python/mappings.py new file mode 100644 index 00000000..977ccada --- /dev/null +++ b/python/src/pywayang/platforms/python/mappings.py @@ -0,0 +1,35 @@ +from typing import Dict + +from pywayang.operator.base import WyOperator +from pywayang.platforms.python.operators import * + +class Mapping: + mappings: Dict[str, type] + + def __init__(self): + self.mappings = {} + + def add_mapping(self, operator: PythonExecutionOperator): + self.mappings[operator.name] = type(operator) + + def get_instanceof(self, operator: WyOperator): + template = self.mappings[operator.name] + if template is None: + raise Exception( + "the operator {} does not have valid mapping".format( + operator.name + ) + ) + return template(operator) + + + def __str__(self): + return str(self.mappings) + + def __repr__(self): + return self.__str__() + +OperatorMappings = Mapping() + +OperatorMappings.add_mapping(PyFilterOperator()) + diff --git a/python/src/pywayang/platforms/python/operators/PyFilterOperator.py b/python/src/pywayang/platforms/python/operators/PyFilterOperator.py new file mode 100644 index 00000000..f1d7dcb4 --- /dev/null +++ b/python/src/pywayang/platforms/python/operators/PyFilterOperator.py @@ -0,0 +1,43 @@ +from pywayang.operator.unary import FilterOperator +from pywayang.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator +from pywayang.platforms.python.channels import (Channel, ChannelDescriptor, PyIteratorChannel, + PyIteratorChannelDescriptor, PyCallableChannelDescriptor, + PyCallableChannel) +from typing import Set + +class PyFilterOperator(FilterOperator, PythonExecutionOperator): + + def __init__(self, origin: FilterOperator = None): + predicate = None if origin is None else origin.predicate + super().__init__(predicate) + pass + + def execute(self, inputs: Channel, outputs: Channel): + self.validateChannels(inputs, outputs) + udf = self.predicate + if isinstance(inputs[0], PyIteratorChannel) : + py_in_iter_channel: PyIteratorChannel = inputs[0] + py_out_iter_channel: PyIteratorChannel = outputs[0] + py_out_iter_channel.accept_iterable(filter(udf, py_in_iter_channel.provide_iterable())) + elif isinstance(inputs[0], PyCallableChannel) : + py_in_call_channel: PyCallableChannel = inputs[0] + py_out_call_channel: PyCallableChannel = outputs[0] + + def func(iterator): + return filter(udf, iterator) + + py_out_call_channel.accept_callable( + PyCallableChannel.concatenate( + func, + py_in_call_channel.provide_callable() + ) + ) + else: + raise Exception("Channel Type does not supported") + + + def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]: + return {PyIteratorChannelDescriptor, PyCallableChannelDescriptor} + + def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]: + return {PyIteratorChannelDescriptor, PyCallableChannelDescriptor} diff --git a/python/src/pywayang/platforms/python/operators/PythonExecutionOperator.py b/python/src/pywayang/platforms/python/operators/PythonExecutionOperator.py new file mode 100644 index 00000000..4a79616c --- /dev/null +++ b/python/src/pywayang/platforms/python/operators/PythonExecutionOperator.py @@ -0,0 +1,7 @@ +from pywayang.operator.base import WyOperator +from pywayang.platforms.python.channels import Channel + +class PythonExecutionOperator(WyOperator): + + def execute(self, inputs: Channel, output: Channel): + pass \ No newline at end of file diff --git a/python/src/pywayang/platforms/python/operators/__init__.py b/python/src/pywayang/platforms/python/operators/__init__.py new file mode 100644 index 00000000..208a2fc0 --- /dev/null +++ b/python/src/pywayang/platforms/python/operators/__init__.py @@ -0,0 +1,7 @@ +from pywayang.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator +from pywayang.platforms.python.operators.PyFilterOperator import PyFilterOperator + +__ALL__ = [ + PythonExecutionOperator, + PyFilterOperator +] \ No newline at end of file diff --git a/python/src/pywayang/platforms/python/platform/__init__.py b/python/src/pywayang/platforms/python/platform/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/src/pywayang/platforms/python/plugin/__init__.py b/python/src/pywayang/platforms/python/plugin/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/src/pywayang/test.py b/python/src/pywayang/test.py index 66ddab0a..884acfc5 100644 --- a/python/src/pywayang/test.py +++ b/python/src/pywayang/test.py @@ -2,11 +2,17 @@ from typing import Iterable from pywayang.platform import Platform from pywayang.context import WayangContext +from pywayang.platforms.python.channels import Channel from pywayang.plugin import java, spark from pywayang.operator.unary import * p = Platform("nana") -print(p) +print("LALA "+str(p)) +pt = type(p) +print(pt) +p2 = pt("chao") +print(p2) +print(type(p2)) print(str(WayangContext().register(java, spark))) @@ -31,8 +37,8 @@ fileop = WayangContext()\ .textFile("here")\ filterop: FilterOperator = fileop.filter(pre).getOperator() -fop_pre = filterop.getWrapper() -fop_pre_res = fop_pre(["la", "lala"]) +#fop_pre = filterop.getWrapper() +#fop_pre_res = fop_pre(["la", "lala"]) #for i in fop_pre_res: # print(i) @@ -55,7 +61,37 @@ def concatenate(function_a, function_b): return function_b(function_a(iterable)) return executable -res = concatenate(concatenate(fop_pre, mop_func), fmop_func) -res_pro = res(["la", "lala"]) -for i in res_pro: - print(i) \ No newline at end of file +#res = concatenate(concatenate(fop_pre, mop_func), fmop_func) +#res_pro = res(["la", "lala"]) +#for i in res_pro: +# print(i) + +from pywayang.platforms.python.mappings import OperatorMappings +from pywayang.platforms.python.operators import * + +print(OperatorMappings) + +pyF = PyFilterOperator() +print(pyF) +print(pyF.getInputChannelDescriptors()) +print(type(pyF.getInputChannelDescriptors().pop().create_instance())) + +qq : Channel = pyF.getInputChannelDescriptors().pop().create_instance() +print(qq) +print(type(qq)) +print("ads") + + +def pre_lala(a:str): + print("executed") + return len(a) > 3 + +ou1 = filter(pre_lala, ["la", "lala"]) +print(ou1) + +for i in ou1: + print(i) + +pyFM = OperatorMappings.get_instanceof(filterop) +print(pyFM) +print(type(pyFM)) \ No newline at end of file
