This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch wayang-211 in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 8ec4cc82d651014affdcdbb53c31b24c33ca7912 Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Mon Apr 11 18:40:46 2022 +0200 [WAYANG-#211] seed structure for JVM-Platform Signed-off-by: bertty <[email protected]> --- python/src/pywy/platforms/commons/channels.py | 6 + .../pywy/{plugins.py => platforms/jvm/__init__.py} | 11 -- .../pywy/platforms/{commons => jvm}/channels.py | 46 +++---- python/src/pywy/platforms/jvm/execution.py | 99 ++++++++++++++ python/src/pywy/platforms/jvm/graph.py | 52 ++++++++ .../pywy/{plugins.py => platforms/jvm/mappings.py} | 17 ++- .../jvm/operator/__init__.py} | 19 +-- .../jvm/operator/jvm_execution_operator.py} | 24 ++-- .../platforms/jvm/operator/jvm_sink_textfile.py | 56 ++++++++ .../platforms/jvm/operator/jvm_source_textfile.py | 48 +++++++ .../platforms/jvm/operator/jvm_unary_filter.py | 72 +++++++++++ .../pywy/{plugins.py => platforms/jvm/platform.py} | 13 +- .../pywy/{plugins.py => platforms/jvm/plugin.py} | 20 +-- .../jvm/serializable/__init__.py} | 11 -- .../jvm/serializable/wayang_jvm_operator.py | 96 ++++++++++++++ python/src/pywy/plugins.py | 2 + .../pywy/tests/integration/jvm_platform_test.py | 144 +++++++++++++++++++++ 17 files changed, 642 insertions(+), 94 deletions(-) diff --git a/python/src/pywy/platforms/commons/channels.py b/python/src/pywy/platforms/commons/channels.py index 743a7169..57cebeb1 100644 --- a/python/src/pywy/platforms/commons/channels.py +++ b/python/src/pywy/platforms/commons/channels.py @@ -17,6 +17,7 @@ from typing import Callable from pywy.core import (Channel, ChannelDescriptor) +from pywy.exception import PywyException class CommonsCallableChannel(Channel): @@ -35,6 +36,11 @@ class CommonsCallableChannel(Channel): @staticmethod def concatenate(function_a: Callable, function_b: Callable): + if function_a is None: + raise PywyException("the function_a can't be None") + if function_b is None: + return function_a + def executable(iterable): return function_a(function_b(iterable)) return executable diff --git a/python/src/pywy/plugins.py b/python/src/pywy/platforms/jvm/__init__.py similarity index 70% copy from python/src/pywy/plugins.py copy to python/src/pywy/platforms/jvm/__init__.py index e3509b68..d9e26de2 100644 --- a/python/src/pywy/plugins.py +++ b/python/src/pywy/platforms/jvm/__init__.py @@ -14,14 +14,3 @@ # See the License for the specific language governing permissions and # limitations under the License. # - -from pywy.core.platform import Platform -from pywy.core import Plugin -from pywy.platforms.python.plugin import PythonPlugin - -# define the basic plugins that can be used -JAVA = Plugin({Platform('java')}) -SPARK = Plugin({Platform('spark')}) -FLINK = Plugin({Platform('flink')}) -# plugin for the python platform -PYTHON = PythonPlugin() diff --git a/python/src/pywy/platforms/commons/channels.py b/python/src/pywy/platforms/jvm/channels.py similarity index 51% copy from python/src/pywy/platforms/commons/channels.py copy to python/src/pywy/platforms/jvm/channels.py index 743a7169..9c1e76f2 100644 --- a/python/src/pywy/platforms/commons/channels.py +++ b/python/src/pywy/platforms/jvm/channels.py @@ -14,46 +14,34 @@ # See the License for the specific language governing permissions and # limitations under the License. # - from typing import Callable + from pywy.core import (Channel, ChannelDescriptor) +from pywy.exception import PywyException +from pywy.platforms.commons.channels import CommonsCallableChannel +from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMOperator, WayangJVMMappartitionOperator -class CommonsCallableChannel(Channel): +class DispatchableChannel(CommonsCallableChannel): - udf: Callable + operator: WayangJVMOperator def __init__(self): Channel.__init__(self) + self.udf = None + self.operator = None - def provide_callable(self) -> Callable: - return self.udf - - def accept_callable(self, udf: Callable) -> 'CommonsCallableChannel': - self.udf = udf - return self - - @staticmethod - def concatenate(function_a: Callable, function_b: Callable): - def executable(iterable): - return function_a(function_b(iterable)) - return executable - - -class CommonsFileChannel(Channel): - - path: str - - def __init__(self): - Channel.__init__(self) + def provide_dispatchable(self, do_wrapper: bool = False) -> WayangJVMOperator: + if self.operator is None: + raise PywyException("The operator was not define") + if do_wrapper: + self.operator.udf = self.udf - def provide_path(self) -> str: - return self.path + return self.operator - def accept_path(self, path: str) -> 'PyIteratorChannel': - self.path = path + def accept_dispatchable(self, operator: WayangJVMOperator) -> 'WayangJVMOperator': + self.operator = operator return self -COMMONS_CALLABLE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(CommonsCallableChannel()), False, False) -COMMONS_FILE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(CommonsFileChannel()), False, False) +DISPATCHABLE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(DispatchableChannel()), True, True) diff --git a/python/src/pywy/platforms/jvm/execution.py b/python/src/pywy/platforms/jvm/execution.py new file mode 100644 index 00000000..2dad76d6 --- /dev/null +++ b/python/src/pywy/platforms/jvm/execution.py @@ -0,0 +1,99 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from pywy.core import Executor, ChannelDescriptor +from pywy.core import PywyPlan +from pywy.platforms.jvm.channels import DISPATCHABLE_CHANNEL_DESCRIPTOR +from pywy.platforms.jvm.graph import NodeDispatch, WGraphDispatch +from pywy.platforms.jvm.operator import JVMExecutionOperator + +class JVMExecutor(Executor): + + def __init__(self): + super(JVMExecutor, self).__init__() + + def execute(self, plan): + pywyPlan: PywyPlan = plan + graph = WGraphDispatch(pywyPlan.sinks) + + # TODO get this information by a configuration and ideally by the context + descriptor_default: ChannelDescriptor = DISPATCHABLE_CHANNEL_DESCRIPTOR + + def execute(op_current: NodeDispatch, op_next: NodeDispatch): + if op_current is None: + return + + jvm_current: JVMExecutionOperator = op_current.current + if jvm_current.outputs == 0: + jvm_current.execute(jvm_current.inputChannel, []) + return + + if op_next is None: + return + + jvm_next: JVMExecutionOperator = op_next.current + outputs = jvm_current.get_output_channeldescriptors() + inputs = jvm_next.get_input_channeldescriptors() + + intersect = outputs.intersection(inputs) + if len(intersect) == 0: + raise Exception( + "The operator(A) {} can't connect with (B) {}, " + "because the output of (A) is {} and the input of (B) is {} ".format( + jvm_current, + jvm_next, + outputs, + inputs + ) + ) + + if len(intersect) > 1: + if descriptor_default is None: + raise Exception( + "The interaction between the operator (A) {} and (B) {}, " + "can't be decided because are several channel availables {}".format( + jvm_current, + jvm_next, + intersect + ) + ) + descriptor = descriptor_default + else: + descriptor = intersect.pop() + + # TODO validate if is valite for several output + jvm_current.outputChannel[0] = descriptor.create_instance() + + jvm_current.execute(jvm_current.inputChannel, jvm_current.outputChannel) + + jvm_next.inputChannel = jvm_current.outputChannel + + graph.traversal(graph.starting_nodes, execute) + + + + starting: WayangJVMOperator = graph.starting_nodes[0].current.dispatch_operator + while starting.previous[0]: + print(starting) + #print(starting.nexts[0]) + starting = starting.previous[0] + if len(starting.previous) == 0 : + break + print(starting) + + + diff --git a/python/src/pywy/platforms/jvm/graph.py b/python/src/pywy/platforms/jvm/graph.py new file mode 100644 index 00000000..78636517 --- /dev/null +++ b/python/src/pywy/platforms/jvm/graph.py @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from typing import Iterable, List, Tuple + +from pywy.graph.graph import WayangGraph, GraphNode +from pywy.operators.base import PO_T +from pywy.platforms.jvm.serializable.wayang_jvm_operator import WJO_T + + +class NodeDispatch(GraphNode[PO_T, PO_T]): + wop: WJO_T + + def __init__(self, op: PO_T): + super(NodeDispatch, self).__init__(op) + + def get_adjacents(self) -> List[PO_T]: + operator: PO_T = self.current + if operator is None or operator.inputs == 0: + return [] + return operator.inputOperator + + def build_node(self, t: PO_T) -> 'NodeDispatch': + return NodeDispatch(t) + + def __str__(self): + return "NodeDispatch {}".format(self.current) + + def __repr__(self): + return self.__str__() + + +class WGraphDispatch(WayangGraph[PO_T, NodeDispatch]): + + def __init__(self, nodes: Iterable[PO_T]): + super(WGraphDispatch, self).__init__(nodes) + + def build_node(self, t: PO_T) -> NodeDispatch: + return NodeDispatch(t) diff --git a/python/src/pywy/plugins.py b/python/src/pywy/platforms/jvm/mappings.py similarity index 70% copy from python/src/pywy/plugins.py copy to python/src/pywy/platforms/jvm/mappings.py index e3509b68..4709a735 100644 --- a/python/src/pywy/plugins.py +++ b/python/src/pywy/platforms/jvm/mappings.py @@ -15,13 +15,12 @@ # limitations under the License. # -from pywy.core.platform import Platform -from pywy.core import Plugin -from pywy.platforms.python.plugin import PythonPlugin +from pywy.core import Mapping +from pywy.platforms.jvm.operator import * -# define the basic plugins that can be used -JAVA = Plugin({Platform('java')}) -SPARK = Plugin({Platform('spark')}) -FLINK = Plugin({Platform('flink')}) -# plugin for the python platform -PYTHON = PythonPlugin() + +JVM_OPERATOR_MAPPINGS = Mapping() + +JVM_OPERATOR_MAPPINGS.add_mapping(JVMFilterOperator()) +JVM_OPERATOR_MAPPINGS.add_mapping(JVMTextFileSourceOperator()) +JVM_OPERATOR_MAPPINGS.add_mapping(JVMTextFileSinkOperator()) diff --git a/python/src/pywy/plugins.py b/python/src/pywy/platforms/jvm/operator/__init__.py similarity index 63% copy from python/src/pywy/plugins.py copy to python/src/pywy/platforms/jvm/operator/__init__.py index e3509b68..a0bc5d29 100644 --- a/python/src/pywy/plugins.py +++ b/python/src/pywy/platforms/jvm/operator/__init__.py @@ -15,13 +15,14 @@ # limitations under the License. # -from pywy.core.platform import Platform -from pywy.core import Plugin -from pywy.platforms.python.plugin import PythonPlugin +from pywy.platforms.jvm.operator.jvm_execution_operator import JVMExecutionOperator +from pywy.platforms.jvm.operator.jvm_unary_filter import JVMFilterOperator +from pywy.platforms.jvm.operator.jvm_source_textfile import JVMTextFileSourceOperator +from pywy.platforms.jvm.operator.jvm_sink_textfile import JVMTextFileSinkOperator -# define the basic plugins that can be used -JAVA = Plugin({Platform('java')}) -SPARK = Plugin({Platform('spark')}) -FLINK = Plugin({Platform('flink')}) -# plugin for the python platform -PYTHON = PythonPlugin() +__ALL__ = [ + JVMExecutionOperator, + JVMFilterOperator, + JVMTextFileSourceOperator, + JVMTextFileSinkOperator, +] diff --git a/python/src/pywy/plugins.py b/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py similarity index 65% copy from python/src/pywy/plugins.py copy to python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py index e3509b68..0049c5c2 100644 --- a/python/src/pywy/plugins.py +++ b/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py @@ -15,13 +15,19 @@ # limitations under the License. # -from pywy.core.platform import Platform -from pywy.core import Plugin -from pywy.platforms.python.plugin import PythonPlugin +from typing import List, Type -# define the basic plugins that can be used -JAVA = Plugin({Platform('java')}) -SPARK = Plugin({Platform('spark')}) -FLINK = Plugin({Platform('flink')}) -# plugin for the python platform -PYTHON = PythonPlugin() +from pywy.core.channel import CH_T +from pywy.operators.base import PywyOperator +from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMOperator + + +class JVMExecutionOperator(PywyOperator): + + dispatch_operator: WayangJVMOperator + + def prefix(self) -> str: + return 'JVM' + + def execute(self, inputs: List[Type[CH_T]], output: List[CH_T]): + pass diff --git a/python/src/pywy/platforms/jvm/operator/jvm_sink_textfile.py b/python/src/pywy/platforms/jvm/operator/jvm_sink_textfile.py new file mode 100644 index 00000000..555ba048 --- /dev/null +++ b/python/src/pywy/platforms/jvm/operator/jvm_sink_textfile.py @@ -0,0 +1,56 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Set, List, Type + +from pywy.core.channel import (CH_T, ChannelDescriptor) +from pywy.exception import PywyException +from pywy.operators.sink import TextFileSink +from pywy.platforms.jvm.channels import DISPATCHABLE_CHANNEL_DESCRIPTOR, DispatchableChannel +from pywy.platforms.jvm.operator.jvm_execution_operator import JVMExecutionOperator +from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMTextFileSink + + +class JVMTextFileSinkOperator(TextFileSink, JVMExecutionOperator): + + def __init__(self, origin: TextFileSink = None): + path = None if origin is None else origin.path + type_class = None if origin is None else origin.inputSlot[0] + end_line = None if origin is None else origin.end_line + super().__init__(path, type_class, end_line) + + def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]): + self.validate_channels(inputs, outputs) + + if isinstance(inputs[0], DispatchableChannel): + + py_in_dispatch_channel: DispatchableChannel = inputs[0] + operator = py_in_dispatch_channel.provide_dispatchable(do_wrapper=True) + + sink: WayangJVMTextFileSink = WayangJVMTextFileSink(self.name, self.path) + + operator.connect_to(0, sink, 0) + + self.dispatch_operator = sink + else: + raise Exception("Channel Type does not supported") + + def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]: + return {DISPATCHABLE_CHANNEL_DESCRIPTOR} + + def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]: + raise Exception("The JVMTextFileSink does not support Output Channels") diff --git a/python/src/pywy/platforms/jvm/operator/jvm_source_textfile.py b/python/src/pywy/platforms/jvm/operator/jvm_source_textfile.py new file mode 100644 index 00000000..3ca3b911 --- /dev/null +++ b/python/src/pywy/platforms/jvm/operator/jvm_source_textfile.py @@ -0,0 +1,48 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Set, List, Type + +from pywy.core.channel import (CH_T, ChannelDescriptor) +from pywy.operators.source import TextFileSource +from pywy.platforms.jvm.channels import DISPATCHABLE_CHANNEL_DESCRIPTOR, DispatchableChannel +from pywy.platforms.jvm.operator.jvm_execution_operator import JVMExecutionOperator +from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMTextFileSource + + +class JVMTextFileSourceOperator(TextFileSource, JVMExecutionOperator): + + def __init__(self, origin: TextFileSource = None): + path = None if origin is None else origin.path + super().__init__(path) + pass + + def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]): + self.validate_channels(inputs, outputs) + if isinstance(outputs[0], DispatchableChannel): + py_out_dispatch_channel: DispatchableChannel = outputs[0] + py_out_dispatch_channel.accept_dispatchable( + WayangJVMTextFileSource(self.name, self.path) + ) + else: + raise Exception("Channel Type does not supported") + + def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]: + raise Exception("The JVMTextFileSource does not support Input Channels") + + def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]: + return {DISPATCHABLE_CHANNEL_DESCRIPTOR} diff --git a/python/src/pywy/platforms/jvm/operator/jvm_unary_filter.py b/python/src/pywy/platforms/jvm/operator/jvm_unary_filter.py new file mode 100644 index 00000000..06ba9e16 --- /dev/null +++ b/python/src/pywy/platforms/jvm/operator/jvm_unary_filter.py @@ -0,0 +1,72 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Set, List, Type + +from pywy.core.channel import CH_T, ChannelDescriptor +from pywy.operators.unary import FilterOperator +from pywy.platforms.jvm.channels import DISPATCHABLE_CHANNEL_DESCRIPTOR, DispatchableChannel +from pywy.platforms.jvm.operator.jvm_execution_operator import JVMExecutionOperator +from pywy.platforms.commons.channels import ( + CommonsCallableChannel +) +from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMMappartitionOperator, WayangJVMOperator + + +class JVMFilterOperator(FilterOperator, JVMExecutionOperator): + + def __init__(self, origin: FilterOperator = None): + predicate = None if origin is None else origin.predicate + super().__init__(predicate) + pass + + def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]): + self.validate_channels(inputs, outputs) + udf = self.predicate + if isinstance(inputs[0], DispatchableChannel): + py_in_dispatch_channel: DispatchableChannel = inputs[0] + py_out_dispatch_channel: DispatchableChannel = outputs[0] + + def func(iterator): + return filter(udf, iterator) + + py_out_dispatch_channel.accept_callable( + CommonsCallableChannel.concatenate( + func, + py_in_dispatch_channel.provide_callable() + ) + ) + + op: WayangJVMOperator = py_in_dispatch_channel.provide_dispatchable() + + if isinstance(op, WayangJVMMappartitionOperator): + py_out_dispatch_channel.accept_dispatchable(op) + return + + current: WayangJVMMappartitionOperator = WayangJVMMappartitionOperator(self.name) + # TODO check for the case where the index matter + op.connect_to(0, current, 0) + py_out_dispatch_channel.accept_dispatchable(current) + + else: + raise Exception("Channel Type does not supported") + + def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]: + return {DISPATCHABLE_CHANNEL_DESCRIPTOR} + + def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]: + return {DISPATCHABLE_CHANNEL_DESCRIPTOR} diff --git a/python/src/pywy/plugins.py b/python/src/pywy/platforms/jvm/platform.py similarity index 74% copy from python/src/pywy/plugins.py copy to python/src/pywy/platforms/jvm/platform.py index e3509b68..d52ac471 100644 --- a/python/src/pywy/plugins.py +++ b/python/src/pywy/platforms/jvm/platform.py @@ -16,12 +16,9 @@ # from pywy.core.platform import Platform -from pywy.core import Plugin -from pywy.platforms.python.plugin import PythonPlugin -# define the basic plugins that can be used -JAVA = Plugin({Platform('java')}) -SPARK = Plugin({Platform('spark')}) -FLINK = Plugin({Platform('flink')}) -# plugin for the python platform -PYTHON = PythonPlugin() + +class JVMPlatform(Platform): + + def __init__(self): + super(JVMPlatform, self).__init__("JVM") diff --git a/python/src/pywy/plugins.py b/python/src/pywy/platforms/jvm/plugin.py similarity index 67% copy from python/src/pywy/plugins.py copy to python/src/pywy/platforms/jvm/plugin.py index e3509b68..eff1e04b 100644 --- a/python/src/pywy/plugins.py +++ b/python/src/pywy/platforms/jvm/plugin.py @@ -15,13 +15,17 @@ # limitations under the License. # -from pywy.core.platform import Platform +from pywy.core import Executor from pywy.core import Plugin -from pywy.platforms.python.plugin import PythonPlugin +from pywy.platforms.jvm.execution import JVMExecutor +from pywy.platforms.jvm.mappings import JVM_OPERATOR_MAPPINGS +from pywy.platforms.jvm.platform import JVMPlatform -# define the basic plugins that can be used -JAVA = Plugin({Platform('java')}) -SPARK = Plugin({Platform('spark')}) -FLINK = Plugin({Platform('flink')}) -# plugin for the python platform -PYTHON = PythonPlugin() + +class JVMPlugin(Plugin): + + def __init__(self): + super(JVMPlugin, self).__init__({JVMPlatform()}, JVM_OPERATOR_MAPPINGS) + + def get_executor(self) -> Executor: + return JVMExecutor() diff --git a/python/src/pywy/plugins.py b/python/src/pywy/platforms/jvm/serializable/__init__.py similarity index 70% copy from python/src/pywy/plugins.py copy to python/src/pywy/platforms/jvm/serializable/__init__.py index e3509b68..d9e26de2 100644 --- a/python/src/pywy/plugins.py +++ b/python/src/pywy/platforms/jvm/serializable/__init__.py @@ -14,14 +14,3 @@ # See the License for the specific language governing permissions and # limitations under the License. # - -from pywy.core.platform import Platform -from pywy.core import Plugin -from pywy.platforms.python.plugin import PythonPlugin - -# define the basic plugins that can be used -JAVA = Plugin({Platform('java')}) -SPARK = Plugin({Platform('spark')}) -FLINK = Plugin({Platform('flink')}) -# plugin for the python platform -PYTHON = PythonPlugin() diff --git a/python/src/pywy/platforms/jvm/serializable/wayang_jvm_operator.py b/python/src/pywy/platforms/jvm/serializable/wayang_jvm_operator.py new file mode 100644 index 00000000..d18f155b --- /dev/null +++ b/python/src/pywy/platforms/jvm/serializable/wayang_jvm_operator.py @@ -0,0 +1,96 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from typing import Callable, List, TypeVar + +from pywy.exception import PywyException + + +class WayangJVMOperator: + + kind: str + name: str + path: str + udf: Callable + + previous: List['WayangJVMOperator'] + nexts: List['WayangJVMOperator'] + + def __init__(self, kind, name): + self.name = name + self.kind = kind + self.previous = [] + self.nexts = [] + + def validate_vector(self, vect: List['WayangJVMOperator'], index: int, op: 'WayangJVMOperator' = None): + if op is None: + op = self + + if vect is None or len(vect) == 0: + vect = [None] * (index + 1) + + if len(vect) < index: + vect.extend([None for i in range(index + 1 - len(vect))]) + + if vect[index] is not None: + raise PywyException( + 'the position in the index "{}" is already in use for "{}" in the operator "{}"'.format( + index, + vect[index], + op + ) + ) + + return vect + + def connect_to(self, nexts_index: int, operator: 'WayangJVMOperator', previous_index: int) -> 'WayangJVMOperator': + operator.previous = self.validate_vector(operator.previous, previous_index, operator) + self.nexts = self.validate_vector(self.nexts, nexts_index) + + self.nexts[nexts_index] = operator + operator.previous[previous_index] = self + return self + + def __str__(self): + return "WayangJVMOperator {}, previous.[{}], nexts.[{}]".format( + self.name, + self.previous, + self.nexts + ) + +WJO_T = TypeVar('WJO_T', bound=WayangJVMOperator) + + +class WayangJVMMappartitionOperator(WayangJVMOperator): + + def __init__(self, name: str, udf: Callable = None): + super().__init__("MapPartitionOperator", name) + self.udf = udf + + +class WayangJVMTextFileSource(WayangJVMOperator): + + def __init__(self, name: str, path: str): + super().__init__("TextFileSource", name) + self.path = path + + +class WayangJVMTextFileSink(WayangJVMOperator): + + def __init__(self, name: str, path: str): + super().__init__("TextFileSink", name) + self.path = path + diff --git a/python/src/pywy/plugins.py b/python/src/pywy/plugins.py index e3509b68..777d2976 100644 --- a/python/src/pywy/plugins.py +++ b/python/src/pywy/plugins.py @@ -17,6 +17,7 @@ from pywy.core.platform import Platform from pywy.core import Plugin +from pywy.platforms.jvm.plugin import JVMPlugin from pywy.platforms.python.plugin import PythonPlugin # define the basic plugins that can be used @@ -25,3 +26,4 @@ SPARK = Plugin({Platform('spark')}) FLINK = Plugin({Platform('flink')}) # plugin for the python platform PYTHON = PythonPlugin() +JVMs = JVMPlugin() diff --git a/python/src/pywy/tests/integration/jvm_platform_test.py b/python/src/pywy/tests/integration/jvm_platform_test.py new file mode 100644 index 00000000..987e7334 --- /dev/null +++ b/python/src/pywy/tests/integration/jvm_platform_test.py @@ -0,0 +1,144 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import os +import unittest +import tempfile +from itertools import chain +from typing import List, Iterable + +from pywy.config import RC_TEST_DIR as ROOT +from pywy.dataquanta import WayangContext +from pywy.plugins import JVMs + +logger = logging.getLogger(__name__) + + +class TestIntegrationJVMPlatform(unittest.TestCase): + + file_10e0: str + + def setUp(self): + self.file_10e0 = "{}/10e0MB.input".format(ROOT) + + @staticmethod + def seed_small_grep(validation_file): + def pre(a: str) -> bool: + return 'six' in a + + fd, path_tmp = tempfile.mkstemp() + + dq = WayangContext() \ + .register(JVMs) \ + .textfile(validation_file) \ + .filter(pre) + + return dq, path_tmp, pre + + def validate_files(self, + validation_file, + outputed_file, + read_and_convert_validation, + read_and_convert_outputed, + delete_outputed=True, + print_variable=False): + lines_filter: List[int] + with open(validation_file, 'r') as f: + lines_filter = list(read_and_convert_validation(f)) + selectivity = len(lines_filter) + + lines_platform: List[int] + with open(outputed_file, 'r') as fp: + lines_platform = list(read_and_convert_outputed(fp)) + elements = len(lines_platform) + + if delete_outputed: + os.remove(outputed_file) + + if print_variable: + logger.info(f"{lines_platform=}") + logger.info(f"{lines_filter=}") + logger.info(f"{elements=}") + logger.info(f"{selectivity=}") + + self.assertEqual(selectivity, elements) + self.assertEqual(lines_filter, lines_platform) + + def test_grep(self): + + dq, path_tmp, pre = self.seed_small_grep(self.file_10e0) + + dq.store_textfile(path_tmp) + + def convert_validation(file): + return filter(pre, file.readlines()) + + def convert_outputed(file): + return file.readlines() + + self.validate_files( + self.file_10e0, + path_tmp, + convert_validation, + convert_outputed + ) + + # def test_dummy_map(self): + # + # def convert(a: str) -> int: + # return len(a) + # + # dq, path_tmp, pre = self.seed_small_grep(self.file_10e0) + # + # dq.map(convert) \ + # .store_textfile(path_tmp) + # + # def convert_validation(file): + # return map(convert, filter(pre, file.readlines())) + # + # def convert_outputed(file): + # return map(lambda x: int(x), file.read().splitlines()) + # + # self.validate_files( + # self.file_10e0, + # path_tmp, + # convert_validation, + # convert_outputed + # ) + # + # def test_dummy_flatmap(self): + # def fm_func(string: str) -> Iterable[str]: + # return string.strip().split(" ") + # + # dq, path_tmp, pre = self.seed_small_grep(self.file_10e0) + # + # dq.flatmap(fm_func) \ + # .store_textfile(path_tmp, '\n') + # + # def convert_validation(file): + # return chain.from_iterable(map(fm_func, filter(pre, file.readlines()))) + # + # def convert_outputed(file): + # return file.read().splitlines() + # + # self.validate_files( + # self.file_10e0, + # path_tmp, + # convert_validation, + # convert_outputed + # )
