This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch python-platform in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 68069a50b2c7808048bca8b4557cfed7a5b73fe1 Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Wed Apr 6 18:07:45 2022 +0200 [WAYANG-#8] Translate the operator to execute Signed-off-by: bertty <[email protected]> --- python/src/pywy/dataquanta.py | 15 +++++--- python/src/pywy/graph/graph.py | 7 ++-- python/src/pywy/graph/graphtypes.py | 24 ++++++++----- python/src/pywy/platforms/basic/mapping.py | 4 +-- python/src/pywy/platforms/basic/plugin.py | 10 +++--- python/src/pywy/platforms/python/plugin/plugin.py | 2 +- python/src/pywy/test.py | 44 +++++++++++------------ python/src/pywy/translate/translator.py | 44 +++++++++++++++++++++++ python/src/pywy/wayangplan/base.py | 5 +++ python/src/pywy/wayangplan/wayang.py | 6 ++-- 10 files changed, 113 insertions(+), 48 deletions(-) diff --git a/python/src/pywy/dataquanta.py b/python/src/pywy/dataquanta.py index ce9d1a9d..f44540e9 100644 --- a/python/src/pywy/dataquanta.py +++ b/python/src/pywy/dataquanta.py @@ -1,3 +1,6 @@ +from typing import Set + +from pywy.translate.translator import Translator from pywy.types import (GenericTco, Predicate, Function, FlatmapFunction, IterableO) from pywy.wayangplan import * from pywy.wayangplan.wayang import PywyPlan @@ -7,13 +10,15 @@ class WayangContext: """ This is the entry point for users to work with Wayang. """ + plugins: Set[Plugin] + def __init__(self): self.plugins = set() """ add a :class:`Plugin` to the :class:`Context` """ - def register(self, *p: Plugin): + def register(self, p: Plugin): self.plugins.add(p) return self @@ -28,7 +33,7 @@ class WayangContext: return DataQuanta(self, TextFileSource(file_path)) def __str__(self): - return "Plugins: {} \n".format(str(self.plugins)) + return "Plugins: {}".format(str(self.plugins)) def __repr__(self): return self.__str__() @@ -38,6 +43,7 @@ class DataQuanta(GenericTco): Represents an intermediate result/data flow edge in a [[WayangPlan]]. """ previous : WyOperator = None + context: WayangContext def __init__(self, context:WayangContext, operator: WyOperator): self.operator = operator @@ -55,8 +61,9 @@ class DataQuanta(GenericTco): def storeTextFile(self: "DataQuanta[I]", path: str) : last = self.__connect(TextFileSink(path)) plan = PywyPlan(self.context.plugins, [last]) - #plan.print() - plan.printTuple() + + trs: Translator = Translator(self.context.plugins.pop(), plan) + trs.translate() # TODO add the logic to execute the plan def __connect(self, op:WyOperator, port_op: int = 0) -> WyOperator: diff --git a/python/src/pywy/graph/graph.py b/python/src/pywy/graph/graph.py index c15c86e5..6e0a2d08 100644 --- a/python/src/pywy/graph/graph.py +++ b/python/src/pywy/graph/graph.py @@ -35,7 +35,7 @@ class GraphNode(Generic[T]): def visit(self, parent: 'GraphNode[T]', udf: Callable[['GraphNode[T]', 'GraphNode[T]'], Any], visit_status: bool = True): if(self.visited == visit_status): return - self.visited = visit_status + self.visited != visit_status return udf(self, parent) @@ -59,9 +59,10 @@ class WayangGraph(Generic[T]): self, origin: GraphNode[T], nodes: Iterable[GraphNode[T]], - udf: Callable[['GraphNode[T]', 'GraphNode[T]'], Any] + udf: Callable[['GraphNode[T]', 'GraphNode[T]'], Any], + visit_status: bool = True ): for node in nodes: adjacents = node.adjacents(self.created_nodes) - self.traversal(node, adjacents, udf) + self.traversal(node, adjacents, udf, visit_status) node.visit(origin, udf) \ No newline at end of file diff --git a/python/src/pywy/graph/graphtypes.py b/python/src/pywy/graph/graphtypes.py index 005ef5f4..cef0ff06 100644 --- a/python/src/pywy/graph/graphtypes.py +++ b/python/src/pywy/graph/graphtypes.py @@ -26,24 +26,30 @@ class WGraphOfOperator(WayangGraph[NodeOperator]): return NodeOperator(t) -class NodeTuple(GraphNode[Tuple[WyOperator, WyOperator]]): +class NodeVec(GraphNode[List[WyOperator]]): def __init__(self, op: WyOperator): - super(NodeTuple, self).__init__((op, None)) + super(NodeVec, self).__init__([op, None]) - def getadjacents(self) -> Iterable[Tuple[WyOperator, WyOperator]]: + def getadjacents(self) -> Iterable[List[WyOperator]]: operator: WyOperator = self.current[0] if operator is None or operator.inputs == 0: return [] return operator.inputOperator - def build_node(self, t:WyOperator) -> 'NodeTuple': - return NodeTuple(t) + def build_node(self, t:WyOperator) -> 'NodeVec': + return NodeVec(t) -class WGraphOfTuple(WayangGraph[NodeTuple]): + def __str__(self): + return "NodeVec {}".format(self.current) + + def __repr__(self): + return self.__str__() + +class WGraphOfVec(WayangGraph[NodeVec]): def __init__(self, nodes: List[WyOperator]): - super(WGraphOfTuple, self).__init__(nodes) + super(WGraphOfVec, self).__init__(nodes) - def build_node(self, t:WyOperator) -> NodeTuple: - return NodeTuple(t) \ No newline at end of file + def build_node(self, t:WyOperator) -> NodeVec: + return NodeVec(t) \ No newline at end of file diff --git a/python/src/pywy/platforms/basic/mapping.py b/python/src/pywy/platforms/basic/mapping.py index 63c7e4ca..e8f14a20 100644 --- a/python/src/pywy/platforms/basic/mapping.py +++ b/python/src/pywy/platforms/basic/mapping.py @@ -8,10 +8,10 @@ class Mapping: self.mappings = {} def add_mapping(self, operator: WyOperator): - self.mappings[operator.name] = type(operator) + self.mappings[operator.name_basic()] = type(operator) def get_instanceof(self, operator: WyOperator): - template = self.mappings[operator.name] + template = self.mappings[operator.name_basic()] if template is None: raise Exception( "the operator {} does not have valid mapping".format( diff --git a/python/src/pywy/platforms/basic/plugin.py b/python/src/pywy/platforms/basic/plugin.py index 9176e9f3..88da7f73 100644 --- a/python/src/pywy/platforms/basic/plugin.py +++ b/python/src/pywy/platforms/basic/plugin.py @@ -1,3 +1,5 @@ +from typing import List, Set + from pywy.platforms.basic.platform import Platform from pywy.platforms.basic.mapping import Mapping @@ -10,18 +12,18 @@ class Plugin: In turn, it may require several :clas:`Platform`s for its operation. """ - platforms = [] + platforms: Set[Platform] mappings: Mapping - def __init__(self, *platform:Platform, mappings: Mapping = Mapping()): - self.platforms = list(platform) + def __init__(self, platforms:Set[Platform], mappings: Mapping = Mapping()): + self.platforms = platforms self.mappings = mappings def get_mappings(self) -> Mapping: return self.mappings def __str__(self): - return "Platforms: {}".format(str(self.platforms)) + return "Platforms: {}, Mappings: {}".format(str(self.platforms), str(self.mappings)) def __repr__(self): return self.__str__() diff --git a/python/src/pywy/platforms/python/plugin/plugin.py b/python/src/pywy/platforms/python/plugin/plugin.py index 0f870029..0d42db7a 100644 --- a/python/src/pywy/platforms/python/plugin/plugin.py +++ b/python/src/pywy/platforms/python/plugin/plugin.py @@ -7,4 +7,4 @@ from pywy.platforms.python.mappings import PywyOperatorMappings class PythonPlugin(Plugin): def __init__(self): - super(PythonPlugin, self).__init__(PythonPlatform(), PywyOperatorMappings) \ No newline at end of file + super(PythonPlugin, self).__init__({PythonPlatform()}, PywyOperatorMappings) \ No newline at end of file diff --git a/python/src/pywy/test.py b/python/src/pywy/test.py index de0153d6..2de508e6 100644 --- a/python/src/pywy/test.py +++ b/python/src/pywy/test.py @@ -1,37 +1,37 @@ from pywy.platforms.basic.platform import Platform from pywy.dataquanta import WayangContext from pywy.platforms.python.channels import Channel -from pywy.plugins import java, spark +from pywy.plugins import java, spark, python from pywy.wayangplan.unary import * -p = Platform("nana") -print("LALA "+str(p)) -pt = type(p) -print(pt) -p2 = pt("chao") -print(p2) -print(type(p2)) - - -print(str(WayangContext().register(java, spark))) +# p = Platform("nana") +# print("LALA "+str(p)) +# pt = type(p) +# print(pt) +# p2 = pt("chao") +# print(p2) +# print(type(p2)) +# +# +# print(str(WayangContext().register(java, spark))) from pywy.types import Predicate, getTypePredicate - -predicate : Predicate = lambda x: x % 2 == 0 -getTypePredicate(predicate) +# +# predicate : Predicate = lambda x: x % 2 == 0 +# getTypePredicate(predicate) def pre(a:str): return len(a) > 3 - -def func(s:str) -> int: - return len(s) - -def fmfunc(i:int) -> str: - for x in range(i): - yield str(x) +# +# def func(s:str) -> int: +# return len(s) +# +# def fmfunc(i:int) -> str: +# for x in range(i): +# yield str(x) fileop = WayangContext()\ - .register(java)\ + .register(python)\ .textFile("/Users/bertty/databloom/blossom/python/resources/test.input")\ .filter(pre)\ .storeTextFile("/Users/bertty/databloom/blossom/python/resources/test.output") diff --git a/python/src/pywy/translate/translator.py b/python/src/pywy/translate/translator.py index 8ec29399..bb646c4f 100644 --- a/python/src/pywy/translate/translator.py +++ b/python/src/pywy/translate/translator.py @@ -1,12 +1,56 @@ +from pywy.graph.graphtypes import WGraphOfVec, NodeVec from pywy.platforms.basic.plugin import Plugin +from pywy.wayangplan import WyOperator from pywy.wayangplan.wayang import PywyPlan from pywy.platforms.basic.mapping import Mapping class Translator: + plugin: Plugin + plan : PywyPlan + def __init__(self, plugin: Plugin, plan: PywyPlan): self.plugin = plugin self.plan = plan def translate(self): mappings:Mapping = self.plugin.get_mappings() + graph = WGraphOfVec(self.plan.sinks) + def translate2plugin(current: NodeVec, previous: NodeVec): + if current is None: + return + + if current.current[1] is None: + current.current[1] = mappings.get_instanceof(current.current[0]) + + if previous is None: + return + if previous.current[1] is None: + previous.current[1] = mappings.get_instanceof(previous.current[0]) + + # TODO not necesary it it 0 + current.current[1].connect(0, previous.current[1], 0) + + graph.traversal(None, graph.starting_nodes, translate2plugin) + + def print_plan(current: NodeVec, previous: NodeVec): + if current is None: + print("this is source") + print(previous.current) + return + if previous is None: + print("this is sink") + print(current.current) + return + + print( + "############\n{}\n@@@@@ => previous is\n{}\n############\n" + .format( + current.current, + previous.current + ) + ) + + graph.traversal(None, graph.starting_nodes, print_plan, False) + print("here") + diff --git a/python/src/pywy/wayangplan/base.py b/python/src/pywy/wayangplan/base.py index a87a2f6a..92d4dfca 100644 --- a/python/src/pywy/wayangplan/base.py +++ b/python/src/pywy/wayangplan/base.py @@ -63,6 +63,11 @@ class WyOperator: def postfix(self) -> str: return '' + def name_basic(self, with_prefix: bool = False, with_postfix:bool = True): + prefix = len(self.prefix()) if not with_prefix else 0 + postfix = len(self.postfix()) if not with_postfix else 0 + return self.name[prefix:len(self.name) - postfix] + def __str__(self): return "BaseOperator: \n\t- name: {}\n\t- inputs: {} {}\n\t- outputs: {} {} \n".format( str(self.name), diff --git a/python/src/pywy/wayangplan/wayang.py b/python/src/pywy/wayangplan/wayang.py index 220f4416..c4d8205d 100644 --- a/python/src/pywy/wayangplan/wayang.py +++ b/python/src/pywy/wayangplan/wayang.py @@ -1,7 +1,7 @@ from typing import Iterable, Set from pywy.graph.graph import WayangGraph -from pywy.graph.graphtypes import WGraphOfOperator, NodeOperator, WGraphOfTuple, NodeTuple +from pywy.graph.graphtypes import WGraphOfOperator, NodeOperator, WGraphOfVec, NodeVec from pywy.wayangplan.sink import SinkOperator from pywy.platforms.basic.plugin import Plugin @@ -16,7 +16,7 @@ class PywyPlan: self.set_graph() def set_graph(self): - self.graph = WGraphOfTuple(self.sinks) + self.graph = WGraphOfVec(self.sinks) def print(self): def print_plan(current: NodeOperator, previous: NodeOperator): @@ -40,7 +40,7 @@ class PywyPlan: def printTuple(self): - def print_plan(current: NodeTuple, previous: NodeTuple): + def print_plan(current: NodeVec, previous: NodeVec): if current is None: print("this is source") print(previous.current)
