This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch python-platform in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit f32d04a6e06ac0c9eff5e4a2d94d264700faa3c5 Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Wed Apr 6 21:39:30 2022 +0200 [WAYANG-#8] Executor in python platformb Signed-off-by: bertty <[email protected]> --- python/src/pywy/dataquanta.py | 6 ++- python/src/pywy/platforms/basic/executor.py | 9 ++++ python/src/pywy/platforms/basic/plugin.py | 4 ++ python/src/pywy/platforms/basic/translator.py | 56 ++++++++++--------- .../pywy/platforms/python/execution/executor.py | 62 ++++++++++++++++++++++ ...ExecutionOperator.py => PyExecutionOperator.py} | 2 +- .../platforms/python/operators/PyFilterOperator.py | 4 +- .../python/operators/PyTextFileSinkOperator.py | 4 +- .../python/operators/PyTextFileSourceOperator.py | 4 +- .../pywy/platforms/python/operators/__init__.py | 4 +- python/src/pywy/platforms/python/plugin/plugin.py | 7 ++- python/src/pywy/wayangplan/base.py | 10 ++-- 12 files changed, 131 insertions(+), 41 deletions(-) diff --git a/python/src/pywy/dataquanta.py b/python/src/pywy/dataquanta.py index d063cbda..14019423 100644 --- a/python/src/pywy/dataquanta.py +++ b/python/src/pywy/dataquanta.py @@ -62,8 +62,10 @@ class DataQuanta(GenericTco): last = self.__connect(TextFileSink(path)) plan = PywyPlan(self.context.plugins, [last]) - trs: Translator = Translator(self.context.plugins.pop(), plan) - trs.translate() + plug = self.context.plugins.pop() + trs: Translator = Translator(plug, plan) + new_plan = trs.translate() + plug.get_executor().execute(new_plan) # TODO add the logic to execute the plan def __connect(self, op:PywyOperator, port_op: int = 0) -> PywyOperator: diff --git a/python/src/pywy/platforms/basic/executor.py b/python/src/pywy/platforms/basic/executor.py new file mode 100644 index 00000000..31fcb2c7 --- /dev/null +++ b/python/src/pywy/platforms/basic/executor.py @@ -0,0 +1,9 @@ + + +class Executor: + + def __init__(self): + pass + + def execute(self, plan): + pass \ No newline at end of file diff --git a/python/src/pywy/platforms/basic/plugin.py b/python/src/pywy/platforms/basic/plugin.py index 1f156d9d..838d88a4 100644 --- a/python/src/pywy/platforms/basic/plugin.py +++ b/python/src/pywy/platforms/basic/plugin.py @@ -1,5 +1,6 @@ from typing import Set +from pywy.platforms.basic.executor import Executor from pywy.platforms.basic.platform import Platform from pywy.platforms.basic.mapping import Mapping @@ -22,6 +23,9 @@ class Plugin: def get_mappings(self) -> Mapping: return self.mappings + def get_executor(self) -> Executor: + pass + def __str__(self): return "Platforms: {}, Mappings: {}".format(str(self.platforms), str(self.mappings)) diff --git a/python/src/pywy/platforms/basic/translator.py b/python/src/pywy/platforms/basic/translator.py index d7d545e2..6031f7a2 100644 --- a/python/src/pywy/platforms/basic/translator.py +++ b/python/src/pywy/platforms/basic/translator.py @@ -15,41 +15,45 @@ class Translator: def translate(self): mappings:Mapping = self.plugin.get_mappings() graph = WGraphOfVec(self.plan.sinks) - def translate2plugin(current: NodeVec, previous: NodeVec): + def translate2plugin(current: NodeVec, next: NodeVec): if current is None: return if current.current[1] is None: current.current[1] = mappings.get_instanceof(current.current[0]) - if previous is None: + if next is None: return - if previous.current[1] is None: - previous.current[1] = mappings.get_instanceof(previous.current[0]) + if next.current[1] is None: + next.current[1] = mappings.get_instanceof(next.current[0]) # TODO not necesary it it 0 - current.current[1].connect(0, previous.current[1], 0) + current.current[1].connect(0, next.current[1], 0) graph.traversal(None, graph.starting_nodes, translate2plugin) - def print_plan(current: NodeVec, previous: NodeVec): - if current is None: - print("this is source") - print(previous.current) - return - if previous is None: - print("this is sink") - print(current.current) - return - - print( - "############\n{}\n@@@@@ => previous is\n{}\n############\n" - .format( - current.current, - previous.current - ) - ) - - graph.traversal(None, graph.starting_nodes, print_plan, False) - print("here") - + # def print_plan(current: NodeVec, previous: NodeVec): + # if current is None: + # print("this is source") + # print(previous.current) + # return + # if previous is None: + # print("this is sink") + # print(current.current) + # return + # + # print( + # "############\n{}\n@@@@@ => previous is\n{}\n############\n" + # .format( + # current.current, + # previous.current + # ) + # ) + # + # graph.traversal(None, graph.starting_nodes, print_plan, False) + + node = [] + for elem in graph.starting_nodes: + node.append(elem.current[1]) + + return PywyPlan(self.plugin, node) \ No newline at end of file diff --git a/python/src/pywy/platforms/python/execution/executor.py b/python/src/pywy/platforms/python/execution/executor.py new file mode 100644 index 00000000..fa17ae8f --- /dev/null +++ b/python/src/pywy/platforms/python/execution/executor.py @@ -0,0 +1,62 @@ +from typing import List + +from pywy.graph.graphtypes import WGraphOfOperator, NodeOperator +from pywy.platforms.basic.channel import Channel +from pywy.platforms.basic.executor import Executor +from pywy.platforms.basic.plan import PywyPlan +from pywy.platforms.python.operators.PyExecutionOperator import PyExecutionOperator + + +class PyExecutor(Executor): + + def __init__(self): + super(PyExecutor, self).__init__() + + def execute(self, plan): + pywyPlan: PywyPlan = plan + graph = WGraphOfOperator(pywyPlan.sinks) + + def exec(current: NodeOperator, next: NodeOperator): + if current is None: + return + + py_current: PyExecutionOperator = current.current + if py_current.outputs == 0: + py_current.execute(py_current.inputChannel, []) + return + + if next is None: + return + py_next: PyExecutionOperator = next.current + outputs = py_current.get_output_channeldescriptors() + inputs = py_next.get_input_channeldescriptors() + + intersect = outputs.intersection(inputs) + if len(intersect) == 0: + raise Exception( + "The operator(A) {} can't connect with (B) {}, because the output of (A) is {} and the input of (B) is {}" + .format( + py_current, + py_next, + outputs, + inputs + ) + ) + if len(intersect) > 1: + raise Exception( + "The interaction between the operator (A) {} and (B) {}, can't be decided because are several channel availables {}" + .format( + py_current, + py_next, + intersect + ) + ) + #TODO validate if is valite for several output + py_current.outputChannel: List[Channel] = [intersect.pop().create_instance()] + + py_current.execute(py_current.inputChannel, py_current.outputChannel) + + py_next.inputChannel = py_current.outputChannel + + + graph.traversal(None, graph.starting_nodes, exec) \ No newline at end of file diff --git a/python/src/pywy/platforms/python/operators/PythonExecutionOperator.py b/python/src/pywy/platforms/python/operators/PyExecutionOperator.py similarity index 78% rename from python/src/pywy/platforms/python/operators/PythonExecutionOperator.py rename to python/src/pywy/platforms/python/operators/PyExecutionOperator.py index 2db44f03..a9d4ebd5 100644 --- a/python/src/pywy/platforms/python/operators/PythonExecutionOperator.py +++ b/python/src/pywy/platforms/python/operators/PyExecutionOperator.py @@ -1,7 +1,7 @@ from pywy.wayangplan.base import PywyOperator from pywy.platforms.python.channels import Channel -class PythonExecutionOperator(PywyOperator): +class PyExecutionOperator(PywyOperator): def prefix(self) -> str: return 'Py' diff --git a/python/src/pywy/platforms/python/operators/PyFilterOperator.py b/python/src/pywy/platforms/python/operators/PyFilterOperator.py index 7c0bbf7e..7d6a503c 100644 --- a/python/src/pywy/platforms/python/operators/PyFilterOperator.py +++ b/python/src/pywy/platforms/python/operators/PyFilterOperator.py @@ -1,6 +1,6 @@ from typing import Set from pywy.wayangplan.unary import FilterOperator -from pywy.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator +from pywy.platforms.python.operators.PyExecutionOperator import PyExecutionOperator from pywy.platforms.python.channels import ( Channel, ChannelDescriptor, @@ -11,7 +11,7 @@ from pywy.platforms.python.channels import ( ) -class PyFilterOperator(FilterOperator, PythonExecutionOperator): +class PyFilterOperator(FilterOperator, PyExecutionOperator): def __init__(self, origin: FilterOperator = None): predicate = None if origin is None else origin.predicate diff --git a/python/src/pywy/platforms/python/operators/PyTextFileSinkOperator.py b/python/src/pywy/platforms/python/operators/PyTextFileSinkOperator.py index 6d9ffaae..5a67cfaf 100644 --- a/python/src/pywy/platforms/python/operators/PyTextFileSinkOperator.py +++ b/python/src/pywy/platforms/python/operators/PyTextFileSinkOperator.py @@ -1,6 +1,6 @@ from typing import Set from pywy.wayangplan.sink import TextFileSink -from pywy.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator +from pywy.platforms.python.operators.PyExecutionOperator import PyExecutionOperator from pywy.platforms.python.channels import ( Channel, ChannelDescriptor, @@ -9,7 +9,7 @@ from pywy.platforms.python.channels import ( ) -class PyTextFileSinkOperator(TextFileSink, PythonExecutionOperator): +class PyTextFileSinkOperator(TextFileSink, PyExecutionOperator): def __init__(self, origin: TextFileSink = None): path = None if origin is None else origin.path diff --git a/python/src/pywy/platforms/python/operators/PyTextFileSourceOperator.py b/python/src/pywy/platforms/python/operators/PyTextFileSourceOperator.py index 96d9f96d..bac1f844 100644 --- a/python/src/pywy/platforms/python/operators/PyTextFileSourceOperator.py +++ b/python/src/pywy/platforms/python/operators/PyTextFileSourceOperator.py @@ -1,6 +1,6 @@ from typing import Set from pywy.wayangplan.source import TextFileSource -from pywy.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator +from pywy.platforms.python.operators.PyExecutionOperator import PyExecutionOperator from pywy.platforms.python.channels import ( Channel, ChannelDescriptor, @@ -9,7 +9,7 @@ from pywy.platforms.python.channels import ( ) -class PyTextFileSourceOperator(TextFileSource, PythonExecutionOperator): +class PyTextFileSourceOperator(TextFileSource, PyExecutionOperator): def __init__(self, origin: TextFileSource = None): path = None if origin is None else origin.path diff --git a/python/src/pywy/platforms/python/operators/__init__.py b/python/src/pywy/platforms/python/operators/__init__.py index 5ddf61ae..2f7f3ca1 100644 --- a/python/src/pywy/platforms/python/operators/__init__.py +++ b/python/src/pywy/platforms/python/operators/__init__.py @@ -1,10 +1,10 @@ -from pywy.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator +from pywy.platforms.python.operators.PyExecutionOperator import PyExecutionOperator from pywy.platforms.python.operators.PyFilterOperator import PyFilterOperator from pywy.platforms.python.operators.PyTextFileSourceOperator import PyTextFileSourceOperator from pywy.platforms.python.operators.PyTextFileSinkOperator import PyTextFileSinkOperator __ALL__ = [ - PythonExecutionOperator, + PyExecutionOperator, PyFilterOperator, PyTextFileSourceOperator, PyTextFileSinkOperator diff --git a/python/src/pywy/platforms/python/plugin/plugin.py b/python/src/pywy/platforms/python/plugin/plugin.py index 010c49cd..34814638 100644 --- a/python/src/pywy/platforms/python/plugin/plugin.py +++ b/python/src/pywy/platforms/python/plugin/plugin.py @@ -1,3 +1,5 @@ +from pywy.platforms.basic.executor import Executor +from pywy.platforms.python.execution.executor import PyExecutor from pywy.platforms.python.platform import PythonPlatform from pywy.platforms.basic.plugin import Plugin from pywy.platforms.python.mappings import PywyOperatorMappings @@ -6,4 +8,7 @@ from pywy.platforms.python.mappings import PywyOperatorMappings class PythonPlugin(Plugin): def __init__(self): - super(PythonPlugin, self).__init__({PythonPlatform()}, PywyOperatorMappings) \ No newline at end of file + super(PythonPlugin, self).__init__({PythonPlatform()}, PywyOperatorMappings) + + def get_executor(self) -> Executor: + return PyExecutor() \ No newline at end of file diff --git a/python/src/pywy/wayangplan/base.py b/python/src/pywy/wayangplan/base.py index 1a81052a..ebe2e2ff 100644 --- a/python/src/pywy/wayangplan/base.py +++ b/python/src/pywy/wayangplan/base.py @@ -1,14 +1,16 @@ from typing import ( TypeVar, Optional, List, Set ) -from pywy.platforms.basic.channel import ChannelDescriptor +from pywy.platforms.basic.channel import ChannelDescriptor, Channel class PywyOperator: inputSlot : List[TypeVar] - inputChannel : ChannelDescriptor + inputChannel : List[Channel] + inputChannelDescriptor : List[ChannelDescriptor] inputOperator: List['PywyOperator'] inputs : int outputSlot : List[TypeVar] - outputChannel: ChannelDescriptor + outputChannel: List[Channel] + outputChannelDescriptor: List[ChannelDescriptor] outputOperator: List['PywyOperator'] outputs: int @@ -26,6 +28,8 @@ class PywyOperator: self.outputs = output_lenght self.inputOperator = [None] * self.inputs self.outputOperator = [None] * self.outputs + self.inputChannel = [None] * self.inputs + self.outputChannel = [None] * self.outputs def validate_inputs(self, vec): if len(vec) != self.inputs:
