This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch python-platform in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 8b20bc4eafeab2c83a94e20dfbde5c9ea4b72b70 Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Wed Apr 6 15:38:49 2022 +0200 [WAYANG-#8] Change structure for PywyPlan Signed-off-by: bertty <[email protected]> --- python/src/pywy/context.py | 35 ------ python/src/pywy/dataquanta.py | 50 ++++++++- .../platforms/python/operators/PyFilterOperator.py | 6 +- .../python/operators/PyTextFileSinkOperator.py | 6 +- .../python/operators/PyTextFileSourceOperator.py | 6 +- python/src/pywy/test.py | 122 ++++++++++----------- python/src/pywy/translate/translator.py | 4 +- python/src/pywy/wayangplan/__init__.py | 16 ++- python/src/pywy/wayangplan/base.py | 24 ++-- python/src/pywy/wayangplan/sink.py | 17 +-- python/src/pywy/wayangplan/source.py | 10 +- python/src/pywy/wayangplan/wayang.py | 101 ++++++++++++++++- 12 files changed, 250 insertions(+), 147 deletions(-) diff --git a/python/src/pywy/context.py b/python/src/pywy/context.py deleted file mode 100644 index 1f2e883a..00000000 --- a/python/src/pywy/context.py +++ /dev/null @@ -1,35 +0,0 @@ -from pywy.platforms.basic.plugin import Plugin -from pywy.dataquanta import DataQuanta -from pywy.wayangplan.source import TextFileSource - -class WayangContext: - """ - This is the entry point for users to work with Wayang. - """ - def __init__(self): - self.plugins = set() - - """ - add a :class:`Plugin` to the :class:`Context` - """ - def register(self, *p: Plugin): - self.plugins.add(p) - return self - - """ - remove a :class:`Plugin` from the :class:`Context` - """ - def unregister(self, p: Plugin): - self.plugins.remove(p) - return self - - def textFile(self, file_path: str) -> DataQuanta[str]: - return DataQuanta(TextFileSource(file_path)) - - - def __str__(self): - return "Plugins: {} \n".format(str(self.plugins)) - - def __repr__(self): - return self.__str__() - diff --git a/python/src/pywy/dataquanta.py b/python/src/pywy/dataquanta.py index c4e7b15c..5544b3e6 100644 --- a/python/src/pywy/dataquanta.py +++ b/python/src/pywy/dataquanta.py @@ -1,5 +1,37 @@ from pywy.types import (GenericTco, Predicate, Function, FlatmapFunction, IterableO) from pywy.wayangplan import * +from pywy.wayangplan.wayang import PywyPlan +from pywy.platforms.basic.plugin import Plugin + +class WayangContext: + """ + This is the entry point for users to work with Wayang. + """ + def __init__(self): + self.plugins = set() + + """ + add a :class:`Plugin` to the :class:`Context` + """ + def register(self, *p: Plugin): + self.plugins.add(p) + return self + + """ + remove a :class:`Plugin` from the :class:`Context` + """ + def unregister(self, p: Plugin): + self.plugins.remove(p) + return self + + def textFile(self, file_path: str) -> 'DataQuanta[str]': + return DataQuanta(self, TextFileSource(file_path)) + + def __str__(self): + return "Plugins: {} \n".format(str(self.plugins)) + + def __repr__(self): + return self.__str__() class DataQuanta(GenericTco): """ @@ -7,22 +39,30 @@ class DataQuanta(GenericTco): """ previous : WyOperator = None - def __init__(self, operator: WyOperator): + def __init__(self, context:WayangContext, operator: WyOperator): self.operator = operator + self.context = context def filter(self: "DataQuanta[T]", p: Predicate) -> "DataQuanta[T]" : - return DataQuanta(FilterOperator(p)) + return DataQuanta(self.context, self.__connect(FilterOperator(p))) def map(self: "DataQuanta[I]", f: Function) -> "DataQuanta[O]" : - return DataQuanta(MapOperator(f)) + return DataQuanta(self.context,self.__connect(MapOperator(f))) def flatmap(self: "DataQuanta[I]", f: FlatmapFunction) -> "DataQuanta[IterableO]" : - return DataQuanta(FlatmapOperator(f)) + return DataQuanta(self.context,self.__connect(FlatmapOperator(f))) def storeTextFile(self: "DataQuanta[I]", path: str) : - last = DataQuanta(TextFileSink(path)) + last = self.__connect(TextFileSink(path)) + plan = PywyPlan(self.context.plugins, [last]) + plan.print() + # TODO add the logic to execute the plan + def __connect(self, op:WyOperator, port_op: int = 0) -> WyOperator: + self.operator.connect(0, op, port_op) + return op + def getOperator(self): return self.operator diff --git a/python/src/pywy/platforms/python/operators/PyFilterOperator.py b/python/src/pywy/platforms/python/operators/PyFilterOperator.py index 24b97d70..e01d4310 100644 --- a/python/src/pywy/platforms/python/operators/PyFilterOperator.py +++ b/python/src/pywy/platforms/python/operators/PyFilterOperator.py @@ -13,7 +13,7 @@ class PyFilterOperator(FilterOperator, PythonExecutionOperator): pass def execute(self, inputs: Channel, outputs: Channel): - self.validateChannels(inputs, outputs) + self.validate_channels(inputs, outputs) udf = self.predicate if isinstance(inputs[0], PyIteratorChannel) : py_in_iter_channel: PyIteratorChannel = inputs[0] @@ -36,8 +36,8 @@ class PyFilterOperator(FilterOperator, PythonExecutionOperator): raise Exception("Channel Type does not supported") - def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]: + def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]: return {PyIteratorChannelDescriptor, PyCallableChannelDescriptor} - def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]: + def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]: return {PyIteratorChannelDescriptor, PyCallableChannelDescriptor} diff --git a/python/src/pywy/platforms/python/operators/PyTextFileSinkOperator.py b/python/src/pywy/platforms/python/operators/PyTextFileSinkOperator.py index 8b244119..11a779b4 100644 --- a/python/src/pywy/platforms/python/operators/PyTextFileSinkOperator.py +++ b/python/src/pywy/platforms/python/operators/PyTextFileSinkOperator.py @@ -16,7 +16,7 @@ class PyTextFileSinkOperator(TextFileSink, PythonExecutionOperator): pass def execute(self, inputs: Channel, outputs: Channel): - self.validateChannels(inputs, outputs) + self.validate_channels(inputs, outputs) if isinstance(inputs[0], PyIteratorChannel) : file = open(self.path,'w') py_in_iter_channel: PyIteratorChannel = inputs[0] @@ -29,8 +29,8 @@ class PyTextFileSinkOperator(TextFileSink, PythonExecutionOperator): raise Exception("Channel Type does not supported") - def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]: + def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]: return {PyIteratorChannelDescriptor} - def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]: + def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]: raise Exception("The PyTextFileSource does not support Output Channels") diff --git a/python/src/pywy/platforms/python/operators/PyTextFileSourceOperator.py b/python/src/pywy/platforms/python/operators/PyTextFileSourceOperator.py index d228b0ef..cc36605b 100644 --- a/python/src/pywy/platforms/python/operators/PyTextFileSourceOperator.py +++ b/python/src/pywy/platforms/python/operators/PyTextFileSourceOperator.py @@ -17,7 +17,7 @@ class PyTextFileSourceOperator(TextFileSource, PythonExecutionOperator): pass def execute(self, inputs: Channel, outputs: Channel): - self.validateChannels(inputs, outputs) + self.validate_channels(inputs, outputs) if isinstance(outputs[0], PyIteratorChannel) : py_out_iter_channel: PyIteratorChannel = outputs[0] py_out_iter_channel.accept_iterable( @@ -31,8 +31,8 @@ class PyTextFileSourceOperator(TextFileSource, PythonExecutionOperator): raise Exception("Channel Type does not supported") - def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]: + def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]: raise Exception("The PyTextFileSource does not support Input Channels") - def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]: + def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]: return {PyIteratorChannelDescriptor} diff --git a/python/src/pywy/test.py b/python/src/pywy/test.py index ec4fbad0..de0153d6 100644 --- a/python/src/pywy/test.py +++ b/python/src/pywy/test.py @@ -1,7 +1,7 @@ from pywy.platforms.basic.platform import Platform -from pywy.context import WayangContext +from pywy.dataquanta import WayangContext from pywy.platforms.python.channels import Channel -from pywy.platforms.basic.plugin import java, spark +from pywy.plugins import java, spark from pywy.wayangplan.unary import * p = Platform("nana") @@ -36,62 +36,62 @@ fileop = WayangContext()\ .filter(pre)\ .storeTextFile("/Users/bertty/databloom/blossom/python/resources/test.output") -filterop: FilterOperator = fileop.filter(pre).getOperator() -#fop_pre = filterop.getWrapper() -#fop_pre_res = fop_pre(["la", "lala"]) -#for i in fop_pre_res: -# print(i) - - -mapop: MapOperator = fileop.map(func).getOperator() -mop_func = mapop.getWrapper() -mop_func_res = mop_func(["la", "lala"]) -#for i in mop_func_res: -# print(i) - - -fmop: FlatmapOperator = fileop.flatmap(fmfunc).getOperator() -fmop_func = fmop.getWrapper() -fmop_func_res = fmop_func([2, 3]) -#for i in fmop_func_res: -# print(i) - -def concatenate(function_a, function_b): - def executable(iterable): - return function_b(function_a(iterable)) - return executable - -#res = concatenate(concatenate(fop_pre, mop_func), fmop_func) -#res_pro = res(["la", "lala"]) -#for i in res_pro: -# print(i) - -from pywy.platforms.python.mappings import PywyOperatorMappings -from pywy.platforms.python.operators import * - -print(PywyOperatorMappings) - -pyF = PyFilterOperator() -print(pyF) -print(pyF.getInputChannelDescriptors()) -print(type(pyF.getInputChannelDescriptors().pop().create_instance())) - -qq : Channel = pyF.getInputChannelDescriptors().pop().create_instance() -print(qq) -print(type(qq)) -print("ads") - - -def pre_lala(a:str): - print("executed") - return len(a) > 3 - -ou1 = filter(pre_lala, ["la", "lala"]) -print(ou1) - -for i in ou1: - print(i) - -pyFM = PywyOperatorMappings.get_instanceof(filterop) -print(pyFM) -print(type(pyFM)) \ No newline at end of file +# filterop: FilterOperator = fileop.filter(pre).getOperator() +# #fop_pre = filterop.getWrapper() +# #fop_pre_res = fop_pre(["la", "lala"]) +# #for i in fop_pre_res: +# # print(i) +# +# +# mapop: MapOperator = fileop.map(func).getOperator() +# mop_func = mapop.getWrapper() +# mop_func_res = mop_func(["la", "lala"]) +# #for i in mop_func_res: +# # print(i) +# +# +# fmop: FlatmapOperator = fileop.flatmap(fmfunc).getOperator() +# fmop_func = fmop.getWrapper() +# fmop_func_res = fmop_func([2, 3]) +# #for i in fmop_func_res: +# # print(i) +# +# def concatenate(function_a, function_b): +# def executable(iterable): +# return function_b(function_a(iterable)) +# return executable +# +# #res = concatenate(concatenate(fop_pre, mop_func), fmop_func) +# #res_pro = res(["la", "lala"]) +# #for i in res_pro: +# # print(i) +# +# from pywy.platforms.python.mappings import PywyOperatorMappings +# from pywy.platforms.python.operators import * +# +# print(PywyOperatorMappings) +# +# pyF = PyFilterOperator() +# print(pyF) +# print(pyF.get_input_channeldescriptors()) +# print(type(pyF.get_input_channeldescriptors().pop().create_instance())) +# +# qq : Channel = pyF.get_input_channeldescriptors().pop().create_instance() +# print(qq) +# print(type(qq)) +# print("ads") +# +# +# def pre_lala(a:str): +# print("executed") +# return len(a) > 3 +# +# ou1 = filter(pre_lala, ["la", "lala"]) +# print(ou1) +# +# for i in ou1: +# print(i) +# +# pyFM = PywyOperatorMappings.get_instanceof(filterop) +# print(pyFM) +# print(type(pyFM)) \ No newline at end of file diff --git a/python/src/pywy/translate/translator.py b/python/src/pywy/translate/translator.py index 303d497b..8ec29399 100644 --- a/python/src/pywy/translate/translator.py +++ b/python/src/pywy/translate/translator.py @@ -1,10 +1,10 @@ from pywy.platforms.basic.plugin import Plugin -from pywy.wayangplan.wayang import WayangPlan +from pywy.wayangplan.wayang import PywyPlan from pywy.platforms.basic.mapping import Mapping class Translator: - def __init__(self, plugin: Plugin, plan: WayangPlan): + def __init__(self, plugin: Plugin, plan: PywyPlan): self.plugin = plugin self.plan = plan diff --git a/python/src/pywy/wayangplan/__init__.py b/python/src/pywy/wayangplan/__init__.py index dae59871..265e2d21 100644 --- a/python/src/pywy/wayangplan/__init__.py +++ b/python/src/pywy/wayangplan/__init__.py @@ -1,15 +1,13 @@ -from pywy.wayangplan.wayang import WayangPlan from pywy.wayangplan.base import WyOperator from pywy.wayangplan.sink import TextFileSink from pywy.wayangplan.source import TextFileSource from pywy.wayangplan.unary import FilterOperator, MapOperator, FlatmapOperator - +# __ALL__= [ - WayangPlan, - WyOperator, - TextFileSink, - TextFileSource, - FilterOperator, - MapOperator, - FlatmapOperator + WyOperator, + TextFileSink, + TextFileSource, + FilterOperator, +# MapOperator, +# FlatmapOperator ] \ No newline at end of file diff --git a/python/src/pywy/wayangplan/base.py b/python/src/pywy/wayangplan/base.py index 90e0b622..61a4a5dc 100644 --- a/python/src/pywy/wayangplan/base.py +++ b/python/src/pywy/wayangplan/base.py @@ -5,9 +5,11 @@ class WyOperator: inputSlot : List[TypeVar] inputChannel : ChannelDescriptor + inputOperator: List['WyOperator'] inputs : int outputSlot : List[TypeVar] - OutputChannel: ChannelDescriptor + outputChannel: ChannelDescriptor + outputOperator: List['WyOperator'] outputs: int def __init__(self, @@ -22,8 +24,10 @@ class WyOperator: self.inputs = input_lenght self.outputSlot = output self.outputs = output_lenght + self.inputOperator = [None] * self.inputs + self.outputOperator = [None] * self.outputs - def validateInputs(self, vec): + def validate_inputs(self, vec): if len(vec) != self.inputs: raise Exception( "the inputs channel contains {} elements and need to have {}".format( @@ -31,7 +35,7 @@ class WyOperator: self.inputs ) ) - def validateOutputs(self, vec): + def validate_outputs(self, vec): if len(vec) != self.outputs: raise Exception( "the output channel contains {} elements and need to have {}".format( @@ -39,14 +43,18 @@ class WyOperator: self.inputs ) ) - def validateChannels(self, input, output): - self.validateInputs(input) - self.validateOutputs(output) + def validate_channels(self, input, output): + self.validate_inputs(input) + self.validate_outputs(output) - def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]: + def connect(self, port:int, that: 'WyOperator', port_that:int): + self.outputOperator[port] = that + that.inputOperator[port_that] = self + + def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]: pass - def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]: + def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]: pass def __str__(self): diff --git a/python/src/pywy/wayangplan/sink.py b/python/src/pywy/wayangplan/sink.py index 89443248..01cb0e9e 100644 --- a/python/src/pywy/wayangplan/sink.py +++ b/python/src/pywy/wayangplan/sink.py @@ -1,20 +1,15 @@ +from typing import Any + +from pywy.types import GenericTco from pywy.wayangplan.base import WyOperator class SinkOperator(WyOperator): - - def __init__(self, name:str): - super().__init__(name, None, str, 0, 1) - - def __str__(self): - return super().__str__() - - def __repr__(self): - return super().__repr__() + pass class SinkUnaryOperator(SinkOperator): - def __init__(self, name:str): - super().__init__(name, None, str, 0, 1) + def __init__(self, name:str, input:GenericTco=Any): + super().__init__(name, input, None, 1, 0) def __str__(self): return super().__str__() diff --git a/python/src/pywy/wayangplan/source.py b/python/src/pywy/wayangplan/source.py index c9ae2ba1..7428b904 100644 --- a/python/src/pywy/wayangplan/source.py +++ b/python/src/pywy/wayangplan/source.py @@ -3,7 +3,13 @@ from pywy.wayangplan.base import WyOperator class SourceUnaryOperator(WyOperator): def __init__(self, name:str): - super().__init__(name, None, str, 0, 1) + super(SourceUnaryOperator, self).__init__( + name = name, + input = None, + output = str, + input_lenght = 0, + output_lenght = 1 + ) def __str__(self): return super().__str__() @@ -18,7 +24,7 @@ class TextFileSource(SourceUnaryOperator): path: str def __init__(self, path: str): - super().__init__('TextFileSource') + super(TextFileSource, self).__init__('TextFileSource') self.path = path def __str__(self): diff --git a/python/src/pywy/wayangplan/wayang.py b/python/src/pywy/wayangplan/wayang.py index a08b572a..7da13e18 100644 --- a/python/src/pywy/wayangplan/wayang.py +++ b/python/src/pywy/wayangplan/wayang.py @@ -1,11 +1,102 @@ -from typing import Iterable +from typing import Iterable, Dict, Callable, NoReturn, List, Set from pywy.wayangplan.sink import SinkOperator -from pywy.platforms.basic.platform import Platform +from pywy.wayangplan.base import WyOperator +from pywy.platforms.basic.plugin import Plugin +class GraphNodeWayang: -class WayangPlan: + current: WyOperator + visited: bool - def __init__(self, platforms: Iterable[Platform], sinks: Iterable[SinkOperator]): - self.platforms = platforms + def __init__(self, op: WyOperator): + self.current = op + self.visited = False + + def successors(self, created: Dict[WyOperator, 'GraphNodeWayang']) -> Iterable['GraphNodeWayang']: + if self.current is None or self.current.outputs == 0: + return [] + + def wrap(op:WyOperator): + if op is None: + return None; + if op not in created: + created[op] = GraphNodeWayang(op) + return created[op] + + adjacent = self.current.outputOperator + return map(wrap, adjacent) + + def predecessors(self, created: Dict[WyOperator, 'GraphNodeWayang']) -> Iterable['GraphNodeWayang']: + print("predecessors") + print(self) + def wrap(op:WyOperator): + if op not in created: + created[op] = GraphNodeWayang(op) + return created[op] + + adjacent = self.current.inputOperator + return map(wrap, adjacent) + + def visit(self, parent: 'GraphNodeWayang', udf: Callable[['GraphNodeWayang', 'GraphNodeWayang'], NoReturn], visit_status: bool = True): + if(self.visited == visit_status): + return + udf(self, parent) + self.visited = visit_status + +class GraphWayang: + + starting_nodes : List[GraphNodeWayang] + created_nodes : Dict[WyOperator, GraphNodeWayang] + + def __init__(self, plan:'PywyPlan'): + self.created_nodes = {} + self.starting_nodes = list() + for sink in plan.sinks: + tmp = GraphNodeWayang(sink) + self.starting_nodes.append(tmp) + self.created_nodes[sink] = tmp + + + def traversal( + self, + origin: GraphNodeWayang, + nodes: Iterable[GraphNodeWayang], + udf: Callable[['GraphNodeWayang', 'GraphNodeWayang'], NoReturn] + ): + for node in nodes: + adjacents = node.predecessors(self.created_nodes) + self.traversal(node, adjacents, udf) + node.visit(origin, udf) + +class PywyPlan: + + graph: GraphWayang + + def __init__(self, plugins: Set[Plugin], sinks: Iterable[SinkOperator]): + self.plugins = plugins self.sinks = sinks + self.graph = GraphWayang(self) + + def print(self): + def print_plan(current: GraphNodeWayang, previous: GraphNodeWayang): + if current is None: + print("this is source") + print(previous.current) + return + if previous is None: + print("this is sink") + print(current.current) + return + + print( + "===========\n{}\n@@@@@ => previous is\n{}\n===========\n" + .format( + current.current, + previous.current + ) + ) + self.graph.traversal(None, self.graph.starting_nodes, print_plan) + + +
