This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch wayang-211 in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit f738e66fd4db66b08c2c0e67f35ca3101c9a3bf5 Author: bertty Contreras <[email protected]> AuthorDate: Tue Apr 12 00:32:13 2022 +0200 [WAYANG-#211] JVM-Platform translator almost done Signed-off-by: bertty <[email protected]> --- python/src/pywy/core/__init__.py | 4 +- python/src/pywy/core/core.py | 174 +++++++++++++++++++++ python/src/pywy/core/mapping.py | 4 +- python/src/pywy/core/plan.py | 107 ------------- python/src/pywy/core/plugin.py | 51 ------ python/src/pywy/core/translator.py | 58 ------- python/src/pywy/dataquanta.py | 12 +- python/src/pywy/operators/base.py | 4 +- .../jvm_execution_operator.py => context.py} | 20 +-- python/src/pywy/platforms/jvm/execution.py | 14 +- .../jvm/operator/jvm_execution_operator.py | 15 ++ .../platforms/jvm/operator/jvm_sink_textfile.py | 6 +- .../platforms/jvm/operator/jvm_source_textfile.py | 6 +- .../platforms/jvm/operator/jvm_unary_filter.py | 5 +- python/src/pywy/platforms/jvm/plugin.py | 5 +- .../platforms/jvm/serializable/plan_writter.py | 131 ++++++++++++++++ .../jvm/serializable/wayang_jvm_operator.py | 18 +++ .../platforms/python/operator/py_sink_textfile.py | 2 +- .../python/operator/py_source_textfile.py | 2 +- .../platforms/python/operator/py_unary_filter.py | 2 +- .../platforms/python/operator/py_unary_flatmap.py | 2 +- .../pywy/platforms/python/operator/py_unary_map.py | 2 +- .../pywy/tests/integration/jvm_platform_test.py | 1 - 23 files changed, 379 insertions(+), 266 deletions(-) diff --git a/python/src/pywy/core/__init__.py b/python/src/pywy/core/__init__.py index a28e3fcd..f9be82ca 100644 --- a/python/src/pywy/core/__init__.py +++ b/python/src/pywy/core/__init__.py @@ -18,10 +18,8 @@ from pywy.core.channel import Channel, ChannelDescriptor from pywy.core.executor import Executor from pywy.core.mapping import Mapping -from pywy.core.plan import PywyPlan +from pywy.core.core import PywyPlan, Plugin, Translator from pywy.core.platform import Platform -from pywy.core.plugin import Plugin -from pywy.core.translator import Translator __ALL__ = [ Channel, diff --git a/python/src/pywy/core/core.py b/python/src/pywy/core/core.py new file mode 100644 index 00000000..084ffc25 --- /dev/null +++ b/python/src/pywy/core/core.py @@ -0,0 +1,174 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Set, Iterable + +from pywy.core.executor import Executor +from pywy.core.platform import Platform +from pywy.core.mapping import Mapping +from pywy.graph.graph import WayangGraph +from pywy.graph.types import WGraphOfVec, NodeOperator, NodeVec +from pywy.operators import SinkOperator + + +class TranslateContext: + """TranslateContext contextual variables a parameters for the translation + """ + pass + + +class Plugin: + """ TODO: enrich this documentation + A plugin contributes the following components to a :class:`Context` + - mappings + - channels + - configurations + In turn, it may require several :clas:`Platform`s for its operation. + """ + + platforms: Set[Platform] + mappings: Mapping + translate_context: TranslateContext + + def __init__( + self, + platforms: Set[Platform], + mappings: Mapping = Mapping(), + translate_context: TranslateContext = None): + self.platforms = platforms + self.mappings = mappings + self.translate_context = translate_context + + def get_mappings(self) -> Mapping: + return self.mappings + + def get_executor(self) -> Executor: + pass + + def __str__(self): + return "Platforms: {}, Mappings: {}".format(str(self.platforms), str(self.mappings)) + + def __repr__(self): + return self.__str__() + + +class PywyPlan: + """A PywyPlan consists of a set of :py:class:`pywy.operators.base.PywyOperator` + + the operator inside PywyPlan follow a Directed acyclic graph(DAG), and describe + how the execution needs to be performed + + Attributes + ---------- + graph : :py:class:`pywy.graph.graph.WayangGraph` + Graph that describe the DAG, and it provides the iterable properties to + the PywyPlan + plugins : :obj:`set` of :py:class:`pywy.core.plugin.Plugin` + plugins is the set of possible platforms that can be uses to execute + the PywyPlan + sinks : :py:class:`typing.Iterable` of :py:class:`pywy.operators.sink.SinkOperator` + The list of sink operators, this describe the end of the pipeline, and + they are used to build the `graph` + """ + graph: WayangGraph + + def __init__(self, plugins: Set[Plugin], sinks: Iterable[SinkOperator]): + """basic Constructor of PywyPlan + + this constructor set the plugins and sinks element, and it prepares + everything for been executed + + Parameters + ---------- + plugins + Description of `plugins`. + sinks + Description of `sinks`. + """ + self.plugins = plugins + self.sinks = sinks + self.set_graph() + + def set_graph(self): + """ it builds the :py:class:`pywy.graph.graph.WayangGraph` of the current PywyPlan + """ + self.graph = WGraphOfVec(self.sinks) + + def execute(self): + """ Execute the plan with the plugin provided at the moment of creation + """ + plug = next(iter(self.plugins)) + trs: Translator = Translator(plug, self) + new_plan = trs.translate() + plug.get_executor().execute(new_plan) + + +class Translator: + """Translator use the :py:class:`pywy.core.Mapping` to convert the :py:class:`pywy.operators.base.PywyOperator` + + Translator take a plan a produce the executable version of the plan using as tool + the :py:class:`pywy.core.Mapping` of the :py:class:`pywy.core.core.Plugin` and convert + the :py:class:`pywy.operators.base.PywyOperator` into an executable version inside + the :py:class:`pywy.core.Platform` + + Attributes + ---------- + plugin : :py:class:`pywy.core.core.Plugin` + plugin use in the translation + plan : :py:class:`pywy.core.core.PywyPlan` + Plan to be translated by the translator + translate_context: :py:class:`pywy.core.core.TranslateContext` + context used by the translates at runtime in some case is not needed + """ + + plugin: Plugin + plan: PywyPlan + translate_context: TranslateContext + + def __init__(self, plugin: Plugin, plan: PywyPlan): + self.plugin = plugin + self.plan = plan + self.translate_context = plugin.translate_context + + def translate(self): + mappings: Mapping = self.plugin.get_mappings() + graph = WGraphOfVec(self.plan.sinks) + + translate = self.translate_context + + def translate2plugin(current_op: NodeVec, next_op: NodeVec): + if current_op is None: + return + + if current_op.current[1] is None: + current_op.current[1] = mappings.get_instanceof(current_op.current[0], **{'translate_context': translate}) + + if next_op is None: + return + if next_op.current[1] is None: + next_op.current[1] = mappings.get_instanceof(next_op.current[0], **{'translate_context': translate}) + + # TODO not necesary it it 0 + current_op.current[1].connect(0, next_op.current[1], 0) + + graph.traversal(graph.starting_nodes, translate2plugin) + + node = [] + for elem in graph.starting_nodes: + node.append(elem.current[1]) + + return PywyPlan({self.plugin}, node) diff --git a/python/src/pywy/core/mapping.py b/python/src/pywy/core/mapping.py index 81a57715..b44375e2 100644 --- a/python/src/pywy/core/mapping.py +++ b/python/src/pywy/core/mapping.py @@ -50,7 +50,7 @@ class Mapping: """ self.mappings[operator.name_basic()] = type(operator) - def get_instanceof(self, operator: PywyOperator): + def get_instanceof(self, operator: PywyOperator, **kwargs): """Instance the executable version of :py:class:`pywy.operators.base.PywyOperator` Parameters @@ -70,7 +70,7 @@ class Mapping: operator.name ) ) - return template(operator) + return template(operator, **kwargs) def __str__(self): return str(self.mappings) diff --git a/python/src/pywy/core/plan.py b/python/src/pywy/core/plan.py deleted file mode 100644 index 48407b98..00000000 --- a/python/src/pywy/core/plan.py +++ /dev/null @@ -1,107 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from typing import (Iterable, Set) - -from pywy.graph.graph import WayangGraph -from pywy.graph.types import (NodeOperator, WGraphOfVec, NodeVec) -from pywy.operators.sink import SinkOperator -from pywy.core.plugin import Plugin - - -class PywyPlan: - """A PywyPlan consists of a set of :py:class:`pywy.operators.base.PywyOperator` - - the operator inside PywyPlan follow a Directed acyclic graph(DAG), and describe - how the execution needs to be performed - - Attributes - ---------- - graph : :py:class:`pywy.graph.graph.WayangGraph` - Graph that describe the DAG, and it provides the iterable properties to - the PywyPlan - plugins : :obj:`set` of :py:class:`pywy.core.plugin.Plugin` - plugins is the set of possible platforms that can be uses to execute - the PywyPlan - sinks : :py:class:`typing.Iterable` of :py:class:`pywy.operators.sink.SinkOperator` - The list of sink operators, this describe the end of the pipeline, and - they are used to build the `graph` - """ - graph: WayangGraph - - def __init__(self, plugins: Set[Plugin], sinks: Iterable[SinkOperator]): - """basic Constructor of PywyPlan - - this constructor set the plugins and sinks element, and it prepares - everything for been executed - - Parameters - ---------- - plugins - Description of `plugins`. - sinks - Description of `sinks`. - """ - self.plugins = plugins - self.sinks = sinks - self.set_graph() - - def set_graph(self): - """ it builds the :py:class:`pywy.graph.graph.WayangGraph` of the current PywyPlan - """ - self.graph = WGraphOfVec(self.sinks) - - def print(self): - def print_plan(current: NodeOperator, previous: NodeOperator): - if current is None: - print("this is source") - print(previous.current) - return - if previous is None: - print("this is sink") - print(current.current) - return - - print( - "===========\n{}\n@@@@@ => previous is\n{}\n===========\n".format( - current.current, - previous.current - ) - ) - - self.graph.traversal(self.graph.starting_nodes, print_plan) - - def printTuple(self): - def print_plan(current: NodeVec, previous: NodeVec): - if current is None: - print("this is source") - print(previous.current) - return - if previous is None: - print("this is sink") - print(current.current) - return - - print( - "############\n{}\n@@@@@ => previous is\n{}\n############\n" - .format( - current.current, - previous.current - ) - ) - - self.graph.traversal(self.graph.starting_nodes, print_plan) diff --git a/python/src/pywy/core/plugin.py b/python/src/pywy/core/plugin.py deleted file mode 100644 index b79cbf56..00000000 --- a/python/src/pywy/core/plugin.py +++ /dev/null @@ -1,51 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from typing import Set - -from pywy.core.executor import Executor -from pywy.core.platform import Platform -from pywy.core.mapping import Mapping - - -class Plugin: - """ - A plugin contributes the following components to a :class:`Context` - - mappings - - channels - - configurations - In turn, it may require several :clas:`Platform`s for its operation. - """ - - platforms: Set[Platform] - mappings: Mapping - - def __init__(self, platforms: Set[Platform], mappings: Mapping = Mapping()): - self.platforms = platforms - self.mappings = mappings - - def get_mappings(self) -> Mapping: - return self.mappings - - def get_executor(self) -> Executor: - pass - - def __str__(self): - return "Platforms: {}, Mappings: {}".format(str(self.platforms), str(self.mappings)) - - def __repr__(self): - return self.__str__() diff --git a/python/src/pywy/core/translator.py b/python/src/pywy/core/translator.py deleted file mode 100644 index 7c0b99be..00000000 --- a/python/src/pywy/core/translator.py +++ /dev/null @@ -1,58 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from pywy.graph.types import (WGraphOfVec, NodeVec) -from pywy.core.plugin import Plugin -from pywy.core.plan import PywyPlan -from pywy.core.mapping import Mapping - - -class Translator: - - plugin: Plugin - plan: PywyPlan - - def __init__(self, plugin: Plugin, plan: PywyPlan): - self.plugin = plugin - self.plan = plan - - def translate(self): - mappings: Mapping = self.plugin.get_mappings() - graph = WGraphOfVec(self.plan.sinks) - - def translate2plugin(current_op: NodeVec, next_op: NodeVec): - if current_op is None: - return - - if current_op.current[1] is None: - current_op.current[1] = mappings.get_instanceof(current_op.current[0]) - - if next_op is None: - return - if next_op.current[1] is None: - next_op.current[1] = mappings.get_instanceof(next_op.current[0]) - - # TODO not necesary it it 0 - current_op.current[1].connect(0, next_op.current[1], 0) - - graph.traversal(graph.starting_nodes, translate2plugin) - - node = [] - for elem in graph.starting_nodes: - node.append(elem.current[1]) - - return PywyPlan({self.plugin}, node) diff --git a/python/src/pywy/dataquanta.py b/python/src/pywy/dataquanta.py index 6e811cb1..c0836dfb 100644 --- a/python/src/pywy/dataquanta.py +++ b/python/src/pywy/dataquanta.py @@ -17,12 +17,10 @@ from typing import Set, List, cast -from pywy.core import Translator +from pywy.core.core import Plugin, PywyPlan from pywy.operators.base import PO_T from pywy.types import (GenericTco, Predicate, Function, FlatmapFunction, IterableOut, T, In, Out) from pywy.operators import * -from pywy.core import PywyPlan -from pywy.core import Plugin class WayangContext: @@ -94,13 +92,7 @@ class DataQuanta(GenericTco): ) ) ] - plan = PywyPlan(self.context.plugins, last) - - plug = self.context.plugins.pop() - trs: Translator = Translator(plug, plan) - new_plan = trs.translate() - plug.get_executor().execute(new_plan) - # TODO add the logic to execute the plan + PywyPlan(self.context.plugins, last).execute() def _connect(self, op: PO_T, port_op: int = 0) -> PywyOperator: self.operator.connect(0, op, port_op) diff --git a/python/src/pywy/operators/base.py b/python/src/pywy/operators/base.py index 59aabf2a..14620c40 100644 --- a/python/src/pywy/operators/base.py +++ b/python/src/pywy/operators/base.py @@ -38,7 +38,9 @@ class PywyOperator: input_type: TypeVar = None, output_type: TypeVar = None, input_length: Optional[int] = 1, - output_length: Optional[int] = 1 + output_length: Optional[int] = 1, + *args, + **kwargs ): self.name = (self.prefix() + name + self.postfix()).strip() self.inputSlot = [input_type] diff --git a/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py b/python/src/pywy/platforms/jvm/context.py similarity index 67% copy from python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py copy to python/src/pywy/platforms/jvm/context.py index 0049c5c2..100016dd 100644 --- a/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py +++ b/python/src/pywy/platforms/jvm/context.py @@ -15,19 +15,19 @@ # limitations under the License. # -from typing import List, Type - -from pywy.core.channel import CH_T -from pywy.operators.base import PywyOperator +from pywy.core.core import TranslateContext +from pywy.platforms.jvm.serializable.plan_writter import PlanWritter from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMOperator -class JVMExecutionOperator(PywyOperator): +class JVMTranslateContext(TranslateContext): + plan_writer: PlanWritter - dispatch_operator: WayangJVMOperator + def __init__(self): + self.plan_writer = PlanWritter() - def prefix(self) -> str: - return 'JVM' + def add_operator(self, op: WayangJVMOperator): + self.plan_writer.add_operator(op) - def execute(self, inputs: List[Type[CH_T]], output: List[CH_T]): - pass + def generate_request(self): + self.plan_writer.send_message_to_wayang() diff --git a/python/src/pywy/platforms/jvm/execution.py b/python/src/pywy/platforms/jvm/execution.py index 2dad76d6..b64973ef 100644 --- a/python/src/pywy/platforms/jvm/execution.py +++ b/python/src/pywy/platforms/jvm/execution.py @@ -20,6 +20,8 @@ from pywy.core import PywyPlan from pywy.platforms.jvm.channels import DISPATCHABLE_CHANNEL_DESCRIPTOR from pywy.platforms.jvm.graph import NodeDispatch, WGraphDispatch from pywy.platforms.jvm.operator import JVMExecutionOperator +from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMOperator + class JVMExecutor(Executor): @@ -84,16 +86,8 @@ class JVMExecutor(Executor): graph.traversal(graph.starting_nodes, execute) + magic: JVMExecutionOperator = graph.starting_nodes[0].current - - starting: WayangJVMOperator = graph.starting_nodes[0].current.dispatch_operator - while starting.previous[0]: - print(starting) - #print(starting.nexts[0]) - starting = starting.previous[0] - if len(starting.previous) == 0 : - break - print(starting) - + magic.translate_context.generate_request() diff --git a/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py b/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py index 0049c5c2..8bc137a7 100644 --- a/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py +++ b/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py @@ -19,6 +19,7 @@ from typing import List, Type from pywy.core.channel import CH_T from pywy.operators.base import PywyOperator +from pywy.platforms.jvm.context import JVMTranslateContext from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMOperator @@ -26,6 +27,20 @@ class JVMExecutionOperator(PywyOperator): dispatch_operator: WayangJVMOperator + translate_context: JVMTranslateContext + + def set_context(self, **kwargs): + if 'translate_context' not in kwargs: + return + self.translate_context = kwargs['translate_context'] + + def close_operator(self, op: WayangJVMOperator): + if self.translate_context is None: + return + + self.translate_context.add_operator(op) + + def prefix(self) -> str: return 'JVM' diff --git a/python/src/pywy/platforms/jvm/operator/jvm_sink_textfile.py b/python/src/pywy/platforms/jvm/operator/jvm_sink_textfile.py index 555ba048..a047de41 100644 --- a/python/src/pywy/platforms/jvm/operator/jvm_sink_textfile.py +++ b/python/src/pywy/platforms/jvm/operator/jvm_sink_textfile.py @@ -27,11 +27,12 @@ from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMTextFil class JVMTextFileSinkOperator(TextFileSink, JVMExecutionOperator): - def __init__(self, origin: TextFileSink = None): + def __init__(self, origin: TextFileSink = None, **kwargs): path = None if origin is None else origin.path type_class = None if origin is None else origin.inputSlot[0] end_line = None if origin is None else origin.end_line super().__init__(path, type_class, end_line) + self.set_context(**kwargs) def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]): self.validate_channels(inputs, outputs) @@ -45,6 +46,9 @@ class JVMTextFileSinkOperator(TextFileSink, JVMExecutionOperator): operator.connect_to(0, sink, 0) + self.close_operator(operator) + self.close_operator(sink) + self.dispatch_operator = sink else: raise Exception("Channel Type does not supported") diff --git a/python/src/pywy/platforms/jvm/operator/jvm_source_textfile.py b/python/src/pywy/platforms/jvm/operator/jvm_source_textfile.py index 3ca3b911..030c702f 100644 --- a/python/src/pywy/platforms/jvm/operator/jvm_source_textfile.py +++ b/python/src/pywy/platforms/jvm/operator/jvm_source_textfile.py @@ -21,15 +21,15 @@ from pywy.core.channel import (CH_T, ChannelDescriptor) from pywy.operators.source import TextFileSource from pywy.platforms.jvm.channels import DISPATCHABLE_CHANNEL_DESCRIPTOR, DispatchableChannel from pywy.platforms.jvm.operator.jvm_execution_operator import JVMExecutionOperator -from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMTextFileSource +from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMTextFileSource, WayangJVMOperator class JVMTextFileSourceOperator(TextFileSource, JVMExecutionOperator): - def __init__(self, origin: TextFileSource = None): + def __init__(self, origin: TextFileSource = None, **kwargs): path = None if origin is None else origin.path super().__init__(path) - pass + self.set_context(**kwargs) def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]): self.validate_channels(inputs, outputs) diff --git a/python/src/pywy/platforms/jvm/operator/jvm_unary_filter.py b/python/src/pywy/platforms/jvm/operator/jvm_unary_filter.py index 06ba9e16..90da7f84 100644 --- a/python/src/pywy/platforms/jvm/operator/jvm_unary_filter.py +++ b/python/src/pywy/platforms/jvm/operator/jvm_unary_filter.py @@ -29,10 +29,10 @@ from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMMappart class JVMFilterOperator(FilterOperator, JVMExecutionOperator): - def __init__(self, origin: FilterOperator = None): + def __init__(self, origin: FilterOperator = None, **kwargs): predicate = None if origin is None else origin.predicate super().__init__(predicate) - pass + self.set_context(**kwargs) def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]): self.validate_channels(inputs, outputs) @@ -60,6 +60,7 @@ class JVMFilterOperator(FilterOperator, JVMExecutionOperator): current: WayangJVMMappartitionOperator = WayangJVMMappartitionOperator(self.name) # TODO check for the case where the index matter op.connect_to(0, current, 0) + self.close_operator(op) py_out_dispatch_channel.accept_dispatchable(current) else: diff --git a/python/src/pywy/platforms/jvm/plugin.py b/python/src/pywy/platforms/jvm/plugin.py index eff1e04b..eb52b7af 100644 --- a/python/src/pywy/platforms/jvm/plugin.py +++ b/python/src/pywy/platforms/jvm/plugin.py @@ -16,7 +16,8 @@ # from pywy.core import Executor -from pywy.core import Plugin +from pywy.core.core import Plugin +from pywy.platforms.jvm.context import JVMTranslateContext from pywy.platforms.jvm.execution import JVMExecutor from pywy.platforms.jvm.mappings import JVM_OPERATOR_MAPPINGS from pywy.platforms.jvm.platform import JVMPlatform @@ -25,7 +26,7 @@ from pywy.platforms.jvm.platform import JVMPlatform class JVMPlugin(Plugin): def __init__(self): - super(JVMPlugin, self).__init__({JVMPlatform()}, JVM_OPERATOR_MAPPINGS) + super(JVMPlugin, self).__init__({JVMPlatform()}, JVM_OPERATOR_MAPPINGS, JVMTranslateContext()) def get_executor(self) -> Executor: return JVMExecutor() diff --git a/python/src/pywy/platforms/jvm/serializable/plan_writter.py b/python/src/pywy/platforms/jvm/serializable/plan_writter.py new file mode 100644 index 00000000..17929181 --- /dev/null +++ b/python/src/pywy/platforms/jvm/serializable/plan_writter.py @@ -0,0 +1,131 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from typing import Set + +import pywy.platforms.jvm.serializable.pywayangplan_pb2 as pwb +import os +import cloudpickle +import logging +import requests +import base64 + + +# Writes Wayang Plan from several stages +from pywy.exception import PywyException +from pywy.operators import SinkOperator +from pywy.operators.source import SourceUnaryOperator +from pywy.operators.unary import UnaryToUnaryOperator +from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMOperator, WayangJVMTextFileSink + + +class PlanWritter: + + def __init__(self): + self.originals: Set[WayangJVMOperator] = set() + + def add_operator(self, operator: WayangJVMOperator): + self.originals.add(operator) + + def add_proto_unary_operator(self, operator: WayangJVMOperator): + op = pwb.OperatorProto() + op.id = str(operator.name) + op.type = operator.kind + op.udf = cloudpickle.dumps(operator.udf) + op.path = str(None) + return op + + def add_proto_source_operator(self, operator: WayangJVMOperator): + source = pwb.OperatorProto() + source.id = str(operator.name) + source.type = operator.kind + source.path = os.path.abspath(operator.path) + source.udf = chr(0).encode('utf-8') + return source + + def add_proto_sink_operator(self, operator: WayangJVMOperator): + sink = pwb.OperatorProto() + sink.id = str(operator.name) + sink.type = operator.kind + sink.path = os.path.abspath(operator.path) + sink.udf = chr(0).encode('utf-8') + return sink + + def send_message_to_wayang(self): + connections = {} + sources = [] + sinks = [] + operators = [] + for operator in self.originals: + if not operator.is_unary(): + raise PywyException( + "the not unary operator are not supported".format( + type(operator), + operator + ) + ) + if operator.is_operator(): + connections[operator] = self.add_proto_unary_operator(operator) + operators.append(connections[operator]) + elif operator.is_source(): + connections[operator] = self.add_proto_source_operator(operator) + sources.append(connections[operator]) + elif operator.is_sink(): + connections[operator] = self.add_proto_sink_operator(operator) + sinks.append(connections[operator]) + else: + raise PywyException( + "the type {} for the operator {} is not supported {}".format( + type(operator), + operator, + WayangJVMTextFileSink.mro() + ) + ) + for operator in self.originals: + current = connections[operator] + for ele in operator.previous: + current.predecessors.append(connections.get(ele).id) + for ele in operator.nexts: + current.successors.append(connections.get(ele).id) + + plan_configuration = pwb.WayangPlanProto() + + plan = pwb.PlanProto() + plan.sources.extend(sources) + plan.operators.extend(operators) + plan.sinks.extend(sinks) + plan.input = pwb.PlanProto.string + plan.output = pwb.PlanProto.string + + ctx = pwb.ContextProto() + ctx.platforms.extend([pwb.ContextProto.PlatformProto.java]) + + plan_configuration.plan.CopyFrom(plan) + plan_configuration.context.CopyFrom(ctx) + + print("plan!") + print(plan_configuration) + + msg_bytes = plan_configuration.SerializeToString() + msg_64 = base64.b64encode(msg_bytes) + + logging.debug(msg_bytes) + # response = requests.get("http://localhost:8080/plan/create/fromfile") + data = { + 'message': msg_64 + } + response = requests.post("http://localhost:8080/plan/create", data) + logging.debug(response) diff --git a/python/src/pywy/platforms/jvm/serializable/wayang_jvm_operator.py b/python/src/pywy/platforms/jvm/serializable/wayang_jvm_operator.py index d18f155b..ab3a1a4e 100644 --- a/python/src/pywy/platforms/jvm/serializable/wayang_jvm_operator.py +++ b/python/src/pywy/platforms/jvm/serializable/wayang_jvm_operator.py @@ -71,6 +71,18 @@ class WayangJVMOperator: self.nexts ) + def is_source(self): + return False + + def is_sink(self): + return False + + def is_unary(self): + return True + + def is_operator(self): + return False + WJO_T = TypeVar('WJO_T', bound=WayangJVMOperator) @@ -80,6 +92,8 @@ class WayangJVMMappartitionOperator(WayangJVMOperator): super().__init__("MapPartitionOperator", name) self.udf = udf + def is_operator(self): + return True class WayangJVMTextFileSource(WayangJVMOperator): @@ -87,6 +101,8 @@ class WayangJVMTextFileSource(WayangJVMOperator): super().__init__("TextFileSource", name) self.path = path + def is_source(self): + return True class WayangJVMTextFileSink(WayangJVMOperator): @@ -94,3 +110,5 @@ class WayangJVMTextFileSink(WayangJVMOperator): super().__init__("TextFileSink", name) self.path = path + def is_sink(self): + return True diff --git a/python/src/pywy/platforms/python/operator/py_sink_textfile.py b/python/src/pywy/platforms/python/operator/py_sink_textfile.py index 7d8eec1b..bccab733 100644 --- a/python/src/pywy/platforms/python/operator/py_sink_textfile.py +++ b/python/src/pywy/platforms/python/operator/py_sink_textfile.py @@ -28,7 +28,7 @@ from pywy.platforms.python.channels import ( class PyTextFileSinkOperator(TextFileSink, PyExecutionOperator): - def __init__(self, origin: TextFileSink = None): + def __init__(self, origin: TextFileSink = None, **kwargs): path = None if origin is None else origin.path type_class = None if origin is None else origin.inputSlot[0] end_line = None if origin is None else origin.end_line diff --git a/python/src/pywy/platforms/python/operator/py_source_textfile.py b/python/src/pywy/platforms/python/operator/py_source_textfile.py index 245d090d..f5651e79 100644 --- a/python/src/pywy/platforms/python/operator/py_source_textfile.py +++ b/python/src/pywy/platforms/python/operator/py_source_textfile.py @@ -28,7 +28,7 @@ from pywy.platforms.python.channels import ( class PyTextFileSourceOperator(TextFileSource, PyExecutionOperator): - def __init__(self, origin: TextFileSource = None): + def __init__(self, origin: TextFileSource = None, **kwargs): path = None if origin is None else origin.path super().__init__(path) pass diff --git a/python/src/pywy/platforms/python/operator/py_unary_filter.py b/python/src/pywy/platforms/python/operator/py_unary_filter.py index 2d807282..200f85f1 100644 --- a/python/src/pywy/platforms/python/operator/py_unary_filter.py +++ b/python/src/pywy/platforms/python/operator/py_unary_filter.py @@ -32,7 +32,7 @@ from pywy.platforms.python.channels import ( class PyFilterOperator(FilterOperator, PyExecutionOperator): - def __init__(self, origin: FilterOperator = None): + def __init__(self, origin: FilterOperator = None, **kwargs): predicate = None if origin is None else origin.predicate super().__init__(predicate) pass diff --git a/python/src/pywy/platforms/python/operator/py_unary_flatmap.py b/python/src/pywy/platforms/python/operator/py_unary_flatmap.py index 72016a8c..43a02465 100644 --- a/python/src/pywy/platforms/python/operator/py_unary_flatmap.py +++ b/python/src/pywy/platforms/python/operator/py_unary_flatmap.py @@ -33,7 +33,7 @@ from pywy.platforms.python.channels import ( class PyFlatmapOperator(FlatmapOperator, PyExecutionOperator): - def __init__(self, origin: FlatmapOperator = None): + def __init__(self, origin: FlatmapOperator = None, **kwargs): fm_function = None if origin is None else origin.fm_function super().__init__(fm_function) diff --git a/python/src/pywy/platforms/python/operator/py_unary_map.py b/python/src/pywy/platforms/python/operator/py_unary_map.py index a8e53a48..54418d46 100644 --- a/python/src/pywy/platforms/python/operator/py_unary_map.py +++ b/python/src/pywy/platforms/python/operator/py_unary_map.py @@ -32,7 +32,7 @@ from pywy.platforms.python.channels import ( class PyMapOperator(MapOperator, PyExecutionOperator): - def __init__(self, origin: MapOperator = None): + def __init__(self, origin: MapOperator = None, **kwargs): function = None if origin is None else origin.function super().__init__(function) pass diff --git a/python/src/pywy/tests/integration/jvm_platform_test.py b/python/src/pywy/tests/integration/jvm_platform_test.py index 987e7334..c3bb3e39 100644 --- a/python/src/pywy/tests/integration/jvm_platform_test.py +++ b/python/src/pywy/tests/integration/jvm_platform_test.py @@ -80,7 +80,6 @@ class TestIntegrationJVMPlatform(unittest.TestCase): self.assertEqual(lines_filter, lines_platform) def test_grep(self): - dq, path_tmp, pre = self.seed_small_grep(self.file_10e0) dq.store_textfile(path_tmp)
