This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch python-platform in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit bff9f575f244cda75bd5b05f38f5c3224ca2ec24 Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Wed Apr 6 16:39:17 2022 +0200 [WAYANG-#8] Change structure of Graph Signed-off-by: bertty <[email protected]> --- python/src/pywy/__init__.py | 2 +- python/src/pywy/graph/__init__.py | 19 ---- python/src/pywy/graph/graph.py | 124 +++++++++++----------- python/src/pywy/graph/graphtypes.py | 26 +++++ python/src/pywy/{graph => old_graph}/__init__.py | 0 python/src/pywy/{graph => old_graph}/graph.py | 2 +- python/src/pywy/{graph => old_graph}/node.py | 0 python/src/pywy/{graph => old_graph}/traversal.py | 2 +- python/src/pywy/{graph => old_graph}/visitant.py | 0 python/src/pywy/orchestrator/dataquanta.py | 4 +- python/src/pywy/wayangplan/wayang.py | 78 ++------------ 11 files changed, 100 insertions(+), 157 deletions(-) diff --git a/python/src/pywy/__init__.py b/python/src/pywy/__init__.py index 38c001b3..39f92eb3 100644 --- a/python/src/pywy/__init__.py +++ b/python/src/pywy/__init__.py @@ -18,5 +18,5 @@ from .config import * from .orchestrator import * from pywy.translate.protobuf import * -from .graph import * +from .old_graph import * from .test import * \ No newline at end of file diff --git a/python/src/pywy/graph/__init__.py b/python/src/pywy/graph/__init__.py index 8066b5ee..e69de29b 100644 --- a/python/src/pywy/graph/__init__.py +++ b/python/src/pywy/graph/__init__.py @@ -1,19 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -#import graph.graph -#import graph.node \ No newline at end of file diff --git a/python/src/pywy/graph/graph.py b/python/src/pywy/graph/graph.py index a66787fa..c15c86e5 100644 --- a/python/src/pywy/graph/graph.py +++ b/python/src/pywy/graph/graph.py @@ -1,71 +1,67 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from pywy.graph.node import Node -import logging - - -# Adjacency Matrix used to analise the plan -class Graph: - def __init__(self): - self.graph = {} - self.nodes_no = 0 - self.nodes = [] - - # Fills the Graph - def populate(self, sinks): - for sink in iter(sinks): - self.process_operator(sink) - - # Add current operator and set dependencies - def process_operator(self, operator): - self.add_node(operator.operator_type, operator.id, operator) - - if len(operator.previous) > 0: - for parent in operator.previous: - if parent: - self.add_node(parent.operator_type, parent.id, parent) - self.add_link(operator.id, parent.id, 1) - self.process_operator(parent) - - def add_node(self, name, id, operator): - if id in self.nodes: +from pywy.types import T +from typing import Iterable, Dict, Callable, List, Any, Generic + + +class GraphNode(Generic[T]): + + current: T + visited: bool + + def __init__(self, op: T): + self.current = op + self.visited = False + + def getadjacents(self) -> Iterable[T]: + pass + + def build_node(self, t:T) -> 'GraphNode[T]': + pass + + def adjacents(self, created: Dict[T, 'GraphNode[T]']) -> Iterable['GraphNode[T]']: + adjacent = self.getadjacents() + + if len(adjacent) == 0: + return [] + + def wrap(op:T): + if op is None: + return None + if op not in created: + created[op] = self.build_node(op) + return created[op] + + return map(wrap, adjacent) + + def visit(self, parent: 'GraphNode[T]', udf: Callable[['GraphNode[T]', 'GraphNode[T]'], Any], visit_status: bool = True): + if(self.visited == visit_status): return + self.visited = visit_status + return udf(self, parent) - self.nodes_no += 1 - self.nodes.append(id) - new_node = Node(name, id, operator) - self.graph[id] = new_node +class WayangGraph(Generic[T]): - def add_link(self, id_child, id_parent, e): - if id_child in self.nodes: - if id_parent in self.nodes: - self.graph[id_child].add_predecessor(id_parent, e) - self.graph[id_parent].add_successor(id_child, e) + starting_nodes : List[GraphNode[T]] + created_nodes : Dict[T, GraphNode[T]] - def print_adjlist(self): + def __init__(self, nodes: List[T]): + self.created_nodes = {} + self.starting_nodes = list() + for node in nodes: + tmp = self.build_node(node) + self.starting_nodes.append(tmp) + self.created_nodes[node] = tmp - for key in self.graph: - logging.debug("Node: ", self.graph[key].operator_type, " - ", key) - for key2 in self.graph[key].predecessors: - logging.debug("- Parent: ", self.graph[key2].operator_type, " - ", self.graph[key].predecessors[key2], " - ", key2) - for key2 in self.graph[key].successors: - logging.debug("- Child: ", self.graph[key2].operator_type, " - ", self.graph[key].successors[key2], " - ", key2) + def build_node(self, t:T) -> GraphNode[T]: + pass - def get_node(self, id): - return self.graph[id] + def traversal( + self, + origin: GraphNode[T], + nodes: Iterable[GraphNode[T]], + udf: Callable[['GraphNode[T]', 'GraphNode[T]'], Any] + ): + for node in nodes: + adjacents = node.adjacents(self.created_nodes) + self.traversal(node, adjacents, udf) + node.visit(origin, udf) \ No newline at end of file diff --git a/python/src/pywy/graph/graphtypes.py b/python/src/pywy/graph/graphtypes.py new file mode 100644 index 00000000..d31b4cde --- /dev/null +++ b/python/src/pywy/graph/graphtypes.py @@ -0,0 +1,26 @@ +from typing import Iterable, List + +from pywy.graph.graph import GraphNode, WayangGraph +from pywy.wayangplan.base import WyOperator + +class WayangNode(GraphNode[WyOperator]): + + def __init__(self, op: WyOperator): + super(WayangNode, self).__init__(op) + + def getadjacents(self) -> Iterable[WyOperator]: + operator: WyOperator = self.current + if operator is None or operator.inputs == 0: + return [] + return operator.inputOperator + + def build_node(self, t:WyOperator) -> 'WayangNode': + return WayangNode(t) + +class WayangGraphOfWayangNode(WayangGraph[WayangNode]): + + def __init__(self, nodes: List[WyOperator]): + super(WayangGraphOfWayangNode, self).__init__(nodes) + + def build_node(self, t:WyOperator) -> WayangNode: + return WayangNode(t) diff --git a/python/src/pywy/graph/__init__.py b/python/src/pywy/old_graph/__init__.py similarity index 100% copy from python/src/pywy/graph/__init__.py copy to python/src/pywy/old_graph/__init__.py diff --git a/python/src/pywy/graph/graph.py b/python/src/pywy/old_graph/graph.py similarity index 98% copy from python/src/pywy/graph/graph.py copy to python/src/pywy/old_graph/graph.py index a66787fa..13846c48 100644 --- a/python/src/pywy/graph/graph.py +++ b/python/src/pywy/old_graph/graph.py @@ -15,7 +15,7 @@ # limitations under the License. # -from pywy.graph.node import Node +from pywy.old_graph.node import Node import logging diff --git a/python/src/pywy/graph/node.py b/python/src/pywy/old_graph/node.py similarity index 100% rename from python/src/pywy/graph/node.py rename to python/src/pywy/old_graph/node.py diff --git a/python/src/pywy/graph/traversal.py b/python/src/pywy/old_graph/traversal.py similarity index 97% rename from python/src/pywy/graph/traversal.py rename to python/src/pywy/old_graph/traversal.py index 63542a4e..a2714145 100644 --- a/python/src/pywy/graph/traversal.py +++ b/python/src/pywy/old_graph/traversal.py @@ -15,7 +15,7 @@ # limitations under the License. # -from pywy.graph.visitant import Visitant +from pywy.old_graph.visitant import Visitant import logging diff --git a/python/src/pywy/graph/visitant.py b/python/src/pywy/old_graph/visitant.py similarity index 100% rename from python/src/pywy/graph/visitant.py rename to python/src/pywy/old_graph/visitant.py diff --git a/python/src/pywy/orchestrator/dataquanta.py b/python/src/pywy/orchestrator/dataquanta.py index 4e5a5661..8c4468b2 100644 --- a/python/src/pywy/orchestrator/dataquanta.py +++ b/python/src/pywy/orchestrator/dataquanta.py @@ -16,8 +16,8 @@ # from pywy.orchestrator.operator import Operator -from pywy.graph.graph import Graph -from pywy.graph.traversal import Traversal +from pywy.old_graph.graph import Graph +from pywy.old_graph.traversal import Traversal from pywy.translate.protobuf.planwriter import MessageWriter import itertools import collections diff --git a/python/src/pywy/wayangplan/wayang.py b/python/src/pywy/wayangplan/wayang.py index 7da13e18..eacad67c 100644 --- a/python/src/pywy/wayangplan/wayang.py +++ b/python/src/pywy/wayangplan/wayang.py @@ -1,85 +1,25 @@ -from typing import Iterable, Dict, Callable, NoReturn, List, Set +from typing import Iterable, Set +from pywy.graph.graph import WayangGraph +from pywy.graph.graphtypes import WayangGraphOfWayangNode, WayangNode from pywy.wayangplan.sink import SinkOperator -from pywy.wayangplan.base import WyOperator from pywy.platforms.basic.plugin import Plugin -class GraphNodeWayang: - - current: WyOperator - visited: bool - - def __init__(self, op: WyOperator): - self.current = op - self.visited = False - - def successors(self, created: Dict[WyOperator, 'GraphNodeWayang']) -> Iterable['GraphNodeWayang']: - if self.current is None or self.current.outputs == 0: - return [] - - def wrap(op:WyOperator): - if op is None: - return None; - if op not in created: - created[op] = GraphNodeWayang(op) - return created[op] - - adjacent = self.current.outputOperator - return map(wrap, adjacent) - - def predecessors(self, created: Dict[WyOperator, 'GraphNodeWayang']) -> Iterable['GraphNodeWayang']: - print("predecessors") - print(self) - def wrap(op:WyOperator): - if op not in created: - created[op] = GraphNodeWayang(op) - return created[op] - - adjacent = self.current.inputOperator - return map(wrap, adjacent) - - def visit(self, parent: 'GraphNodeWayang', udf: Callable[['GraphNodeWayang', 'GraphNodeWayang'], NoReturn], visit_status: bool = True): - if(self.visited == visit_status): - return - udf(self, parent) - self.visited = visit_status - -class GraphWayang: - - starting_nodes : List[GraphNodeWayang] - created_nodes : Dict[WyOperator, GraphNodeWayang] - - def __init__(self, plan:'PywyPlan'): - self.created_nodes = {} - self.starting_nodes = list() - for sink in plan.sinks: - tmp = GraphNodeWayang(sink) - self.starting_nodes.append(tmp) - self.created_nodes[sink] = tmp - - - def traversal( - self, - origin: GraphNodeWayang, - nodes: Iterable[GraphNodeWayang], - udf: Callable[['GraphNodeWayang', 'GraphNodeWayang'], NoReturn] - ): - for node in nodes: - adjacents = node.predecessors(self.created_nodes) - self.traversal(node, adjacents, udf) - node.visit(origin, udf) class PywyPlan: - graph: GraphWayang + graph: WayangGraph def __init__(self, plugins: Set[Plugin], sinks: Iterable[SinkOperator]): self.plugins = plugins self.sinks = sinks - self.graph = GraphWayang(self) + self.set_graph() + + def set_graph(self): + self.graph = WayangGraphOfWayangNode(self.sinks) def print(self): - def print_plan(current: GraphNodeWayang, previous: GraphNodeWayang): + def print_plan(current: WayangNode, previous: WayangNode): if current is None: print("this is source") print(previous.current)
