This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch wayang-211
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit f738e66fd4db66b08c2c0e67f35ca3101c9a3bf5
Author: bertty Contreras <[email protected]>
AuthorDate: Tue Apr 12 00:32:13 2022 +0200

    [WAYANG-#211] JVM-Platform translator almost done
    
    Signed-off-by: bertty <[email protected]>
---
 python/src/pywy/core/__init__.py                   |   4 +-
 python/src/pywy/core/core.py                       | 174 +++++++++++++++++++++
 python/src/pywy/core/mapping.py                    |   4 +-
 python/src/pywy/core/plan.py                       | 107 -------------
 python/src/pywy/core/plugin.py                     |  51 ------
 python/src/pywy/core/translator.py                 |  58 -------
 python/src/pywy/dataquanta.py                      |  12 +-
 python/src/pywy/operators/base.py                  |   4 +-
 .../jvm_execution_operator.py => context.py}       |  20 +--
 python/src/pywy/platforms/jvm/execution.py         |  14 +-
 .../jvm/operator/jvm_execution_operator.py         |  15 ++
 .../platforms/jvm/operator/jvm_sink_textfile.py    |   6 +-
 .../platforms/jvm/operator/jvm_source_textfile.py  |   6 +-
 .../platforms/jvm/operator/jvm_unary_filter.py     |   5 +-
 python/src/pywy/platforms/jvm/plugin.py            |   5 +-
 .../platforms/jvm/serializable/plan_writter.py     | 131 ++++++++++++++++
 .../jvm/serializable/wayang_jvm_operator.py        |  18 +++
 .../platforms/python/operator/py_sink_textfile.py  |   2 +-
 .../python/operator/py_source_textfile.py          |   2 +-
 .../platforms/python/operator/py_unary_filter.py   |   2 +-
 .../platforms/python/operator/py_unary_flatmap.py  |   2 +-
 .../pywy/platforms/python/operator/py_unary_map.py |   2 +-
 .../pywy/tests/integration/jvm_platform_test.py    |   1 -
 23 files changed, 379 insertions(+), 266 deletions(-)

diff --git a/python/src/pywy/core/__init__.py b/python/src/pywy/core/__init__.py
index a28e3fcd..f9be82ca 100644
--- a/python/src/pywy/core/__init__.py
+++ b/python/src/pywy/core/__init__.py
@@ -18,10 +18,8 @@
 from pywy.core.channel import Channel, ChannelDescriptor
 from pywy.core.executor import Executor
 from pywy.core.mapping import Mapping
-from pywy.core.plan import PywyPlan
+from pywy.core.core import PywyPlan, Plugin, Translator
 from pywy.core.platform import Platform
-from pywy.core.plugin import Plugin
-from pywy.core.translator import Translator
 
 __ALL__ = [
     Channel,
diff --git a/python/src/pywy/core/core.py b/python/src/pywy/core/core.py
new file mode 100644
index 00000000..084ffc25
--- /dev/null
+++ b/python/src/pywy/core/core.py
@@ -0,0 +1,174 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+from typing import Set, Iterable
+
+from pywy.core.executor import Executor
+from pywy.core.platform import Platform
+from pywy.core.mapping import Mapping
+from pywy.graph.graph import WayangGraph
+from pywy.graph.types import WGraphOfVec, NodeOperator, NodeVec
+from pywy.operators import SinkOperator
+
+
+class TranslateContext:
+    """TranslateContext contextual variables a parameters for the translation
+    """
+    pass
+
+
+class Plugin:
+    """ TODO: enrich this documentation
+    A plugin contributes the following components to a :class:`Context`
+    - mappings
+    - channels
+    - configurations
+    In turn, it may require several :clas:`Platform`s for its operation.
+    """
+
+    platforms: Set[Platform]
+    mappings: Mapping
+    translate_context: TranslateContext
+
+    def __init__(
+            self,
+            platforms: Set[Platform],
+            mappings: Mapping = Mapping(),
+            translate_context: TranslateContext = None):
+        self.platforms = platforms
+        self.mappings = mappings
+        self.translate_context = translate_context
+
+    def get_mappings(self) -> Mapping:
+        return self.mappings
+
+    def get_executor(self) -> Executor:
+        pass
+
+    def __str__(self):
+        return "Platforms: {}, Mappings: {}".format(str(self.platforms), 
str(self.mappings))
+
+    def __repr__(self):
+        return self.__str__()
+
+
+class PywyPlan:
+    """A PywyPlan consists of a set of 
:py:class:`pywy.operators.base.PywyOperator`
+
+    the operator inside PywyPlan follow a Directed acyclic graph(DAG), and 
describe
+    how the execution needs to be performed
+
+    Attributes
+    ----------
+    graph : :py:class:`pywy.graph.graph.WayangGraph`
+       Graph that describe the DAG, and it provides the iterable properties to
+       the PywyPlan
+    plugins : :obj:`set` of :py:class:`pywy.core.plugin.Plugin`
+        plugins is the set of possible platforms that can be uses to execute
+        the PywyPlan
+    sinks : :py:class:`typing.Iterable` of 
:py:class:`pywy.operators.sink.SinkOperator`
+        The list of sink operators, this describe the end of the pipeline, and
+        they are used to build the `graph`
+    """
+    graph: WayangGraph
+
+    def __init__(self, plugins: Set[Plugin], sinks: Iterable[SinkOperator]):
+        """basic Constructor of PywyPlan
+
+        this constructor set the plugins and sinks element, and it prepares
+        everything for been executed
+
+        Parameters
+        ----------
+        plugins
+            Description of `plugins`.
+        sinks
+            Description of `sinks`.
+        """
+        self.plugins = plugins
+        self.sinks = sinks
+        self.set_graph()
+
+    def set_graph(self):
+        """ it builds the :py:class:`pywy.graph.graph.WayangGraph` of the 
current PywyPlan
+        """
+        self.graph = WGraphOfVec(self.sinks)
+
+    def execute(self):
+        """ Execute the plan with the plugin provided at the moment of creation
+        """
+        plug = next(iter(self.plugins))
+        trs: Translator = Translator(plug, self)
+        new_plan = trs.translate()
+        plug.get_executor().execute(new_plan)
+
+
+class Translator:
+    """Translator use the :py:class:`pywy.core.Mapping` to convert the 
:py:class:`pywy.operators.base.PywyOperator`
+
+    Translator take a plan a produce the executable version of the plan using 
as tool
+    the :py:class:`pywy.core.Mapping` of the :py:class:`pywy.core.core.Plugin` 
and convert
+    the :py:class:`pywy.operators.base.PywyOperator` into an executable 
version inside
+    the :py:class:`pywy.core.Platform`
+
+    Attributes
+    ----------
+    plugin : :py:class:`pywy.core.core.Plugin`
+        plugin use in the translation
+    plan : :py:class:`pywy.core.core.PywyPlan`
+        Plan to be translated by the translator
+    translate_context: :py:class:`pywy.core.core.TranslateContext`
+        context used by the translates at runtime in some case is not needed
+    """
+
+    plugin: Plugin
+    plan: PywyPlan
+    translate_context: TranslateContext
+
+    def __init__(self, plugin: Plugin, plan: PywyPlan):
+        self.plugin = plugin
+        self.plan = plan
+        self.translate_context = plugin.translate_context
+
+    def translate(self):
+        mappings: Mapping = self.plugin.get_mappings()
+        graph = WGraphOfVec(self.plan.sinks)
+
+        translate = self.translate_context
+
+        def translate2plugin(current_op: NodeVec, next_op: NodeVec):
+            if current_op is None:
+                return
+
+            if current_op.current[1] is None:
+                current_op.current[1] = 
mappings.get_instanceof(current_op.current[0], **{'translate_context': 
translate})
+
+            if next_op is None:
+                return
+            if next_op.current[1] is None:
+                next_op.current[1] = 
mappings.get_instanceof(next_op.current[0], **{'translate_context': translate})
+
+            # TODO not necesary it it 0
+            current_op.current[1].connect(0, next_op.current[1], 0)
+
+        graph.traversal(graph.starting_nodes, translate2plugin)
+
+        node = []
+        for elem in graph.starting_nodes:
+            node.append(elem.current[1])
+
+        return PywyPlan({self.plugin}, node)
diff --git a/python/src/pywy/core/mapping.py b/python/src/pywy/core/mapping.py
index 81a57715..b44375e2 100644
--- a/python/src/pywy/core/mapping.py
+++ b/python/src/pywy/core/mapping.py
@@ -50,7 +50,7 @@ class Mapping:
         """
         self.mappings[operator.name_basic()] = type(operator)
 
-    def get_instanceof(self, operator: PywyOperator):
+    def get_instanceof(self, operator: PywyOperator, **kwargs):
         """Instance the executable version of 
:py:class:`pywy.operators.base.PywyOperator`
 
         Parameters
@@ -70,7 +70,7 @@ class Mapping:
                     operator.name
                 )
             )
-        return template(operator)
+        return template(operator, **kwargs)
 
     def __str__(self):
         return str(self.mappings)
diff --git a/python/src/pywy/core/plan.py b/python/src/pywy/core/plan.py
deleted file mode 100644
index 48407b98..00000000
--- a/python/src/pywy/core/plan.py
+++ /dev/null
@@ -1,107 +0,0 @@
-#
-#  Licensed to the Apache Software Foundation (ASF) under one or more
-#  contributor license agreements.  See the NOTICE file distributed with
-#  this work for additional information regarding copyright ownership.
-#  The ASF licenses this file to You under the Apache License, Version 2.0
-#  (the "License"); you may not use this file except in compliance with
-#  the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-#
-
-from typing import (Iterable, Set)
-
-from pywy.graph.graph import WayangGraph
-from pywy.graph.types import (NodeOperator, WGraphOfVec, NodeVec)
-from pywy.operators.sink import SinkOperator
-from pywy.core.plugin import Plugin
-
-
-class PywyPlan:
-    """A PywyPlan consists of a set of 
:py:class:`pywy.operators.base.PywyOperator`
-
-    the operator inside PywyPlan follow a Directed acyclic graph(DAG), and 
describe
-    how the execution needs to be performed
-
-    Attributes
-    ----------
-    graph : :py:class:`pywy.graph.graph.WayangGraph`
-       Graph that describe the DAG, and it provides the iterable properties to
-       the PywyPlan
-    plugins : :obj:`set` of :py:class:`pywy.core.plugin.Plugin`
-        plugins is the set of possible platforms that can be uses to execute
-        the PywyPlan
-    sinks : :py:class:`typing.Iterable` of 
:py:class:`pywy.operators.sink.SinkOperator`
-        The list of sink operators, this describe the end of the pipeline, and
-        they are used to build the `graph`
-    """
-    graph: WayangGraph
-
-    def __init__(self, plugins: Set[Plugin], sinks: Iterable[SinkOperator]):
-        """basic Constructor of PywyPlan
-
-        this constructor set the plugins and sinks element, and it prepares
-        everything for been executed
-
-        Parameters
-        ----------
-        plugins
-            Description of `plugins`.
-        sinks
-            Description of `sinks`.
-        """
-        self.plugins = plugins
-        self.sinks = sinks
-        self.set_graph()
-
-    def set_graph(self):
-        """ it builds the :py:class:`pywy.graph.graph.WayangGraph` of the 
current PywyPlan
-        """
-        self.graph = WGraphOfVec(self.sinks)
-
-    def print(self):
-        def print_plan(current: NodeOperator, previous: NodeOperator):
-            if current is None:
-                print("this is source")
-                print(previous.current)
-                return
-            if previous is None:
-                print("this is sink")
-                print(current.current)
-                return
-
-            print(
-                "===========\n{}\n@@@@@ => previous 
is\n{}\n===========\n".format(
-                    current.current,
-                    previous.current
-                )
-            )
-
-        self.graph.traversal(self.graph.starting_nodes, print_plan)
-
-    def printTuple(self):
-        def print_plan(current: NodeVec, previous: NodeVec):
-            if current is None:
-                print("this is source")
-                print(previous.current)
-                return
-            if previous is None:
-                print("this is sink")
-                print(current.current)
-                return
-
-            print(
-                "############\n{}\n@@@@@ => previous is\n{}\n############\n"
-                    .format(
-                    current.current,
-                    previous.current
-                )
-            )
-
-        self.graph.traversal(self.graph.starting_nodes, print_plan)
diff --git a/python/src/pywy/core/plugin.py b/python/src/pywy/core/plugin.py
deleted file mode 100644
index b79cbf56..00000000
--- a/python/src/pywy/core/plugin.py
+++ /dev/null
@@ -1,51 +0,0 @@
-#
-#  Licensed to the Apache Software Foundation (ASF) under one or more
-#  contributor license agreements.  See the NOTICE file distributed with
-#  this work for additional information regarding copyright ownership.
-#  The ASF licenses this file to You under the Apache License, Version 2.0
-#  (the "License"); you may not use this file except in compliance with
-#  the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-#
-
-from typing import Set
-
-from pywy.core.executor import Executor
-from pywy.core.platform import Platform
-from pywy.core.mapping import Mapping
-
-
-class Plugin:
-    """
-    A plugin contributes the following components to a :class:`Context`
-    - mappings
-    - channels
-    - configurations
-    In turn, it may require several :clas:`Platform`s for its operation.
-    """
-
-    platforms: Set[Platform]
-    mappings: Mapping
-
-    def __init__(self, platforms: Set[Platform], mappings: Mapping = 
Mapping()):
-        self.platforms = platforms
-        self.mappings = mappings
-
-    def get_mappings(self) -> Mapping:
-        return self.mappings
-
-    def get_executor(self) -> Executor:
-        pass
-
-    def __str__(self):
-        return "Platforms: {}, Mappings: {}".format(str(self.platforms), 
str(self.mappings))
-
-    def __repr__(self):
-        return self.__str__()
diff --git a/python/src/pywy/core/translator.py 
b/python/src/pywy/core/translator.py
deleted file mode 100644
index 7c0b99be..00000000
--- a/python/src/pywy/core/translator.py
+++ /dev/null
@@ -1,58 +0,0 @@
-#
-#  Licensed to the Apache Software Foundation (ASF) under one or more
-#  contributor license agreements.  See the NOTICE file distributed with
-#  this work for additional information regarding copyright ownership.
-#  The ASF licenses this file to You under the Apache License, Version 2.0
-#  (the "License"); you may not use this file except in compliance with
-#  the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-#
-
-from pywy.graph.types import (WGraphOfVec, NodeVec)
-from pywy.core.plugin import Plugin
-from pywy.core.plan import PywyPlan
-from pywy.core.mapping import Mapping
-
-
-class Translator:
-
-    plugin: Plugin
-    plan: PywyPlan
-
-    def __init__(self, plugin: Plugin, plan: PywyPlan):
-        self.plugin = plugin
-        self.plan = plan
-
-    def translate(self):
-        mappings: Mapping = self.plugin.get_mappings()
-        graph = WGraphOfVec(self.plan.sinks)
-
-        def translate2plugin(current_op: NodeVec, next_op: NodeVec):
-            if current_op is None:
-                return
-
-            if current_op.current[1] is None:
-                current_op.current[1] = 
mappings.get_instanceof(current_op.current[0])
-
-            if next_op is None:
-                return
-            if next_op.current[1] is None:
-                next_op.current[1] = 
mappings.get_instanceof(next_op.current[0])
-
-            # TODO not necesary it it 0
-            current_op.current[1].connect(0, next_op.current[1], 0)
-
-        graph.traversal(graph.starting_nodes, translate2plugin)
-
-        node = []
-        for elem in graph.starting_nodes:
-            node.append(elem.current[1])
-
-        return PywyPlan({self.plugin}, node)
diff --git a/python/src/pywy/dataquanta.py b/python/src/pywy/dataquanta.py
index 6e811cb1..c0836dfb 100644
--- a/python/src/pywy/dataquanta.py
+++ b/python/src/pywy/dataquanta.py
@@ -17,12 +17,10 @@
 
 from typing import Set, List, cast
 
-from pywy.core import Translator
+from pywy.core.core import Plugin, PywyPlan
 from pywy.operators.base import PO_T
 from pywy.types import (GenericTco, Predicate, Function, FlatmapFunction, 
IterableOut, T, In, Out)
 from pywy.operators import *
-from pywy.core import PywyPlan
-from pywy.core import Plugin
 
 
 class WayangContext:
@@ -94,13 +92,7 @@ class DataQuanta(GenericTco):
                 )
             )
         ]
-        plan = PywyPlan(self.context.plugins, last)
-
-        plug = self.context.plugins.pop()
-        trs: Translator = Translator(plug, plan)
-        new_plan = trs.translate()
-        plug.get_executor().execute(new_plan)
-        # TODO add the logic to execute the plan
+        PywyPlan(self.context.plugins, last).execute()
 
     def _connect(self, op: PO_T, port_op: int = 0) -> PywyOperator:
         self.operator.connect(0, op, port_op)
diff --git a/python/src/pywy/operators/base.py 
b/python/src/pywy/operators/base.py
index 59aabf2a..14620c40 100644
--- a/python/src/pywy/operators/base.py
+++ b/python/src/pywy/operators/base.py
@@ -38,7 +38,9 @@ class PywyOperator:
                  input_type: TypeVar = None,
                  output_type: TypeVar = None,
                  input_length: Optional[int] = 1,
-                 output_length: Optional[int] = 1
+                 output_length: Optional[int] = 1,
+                 *args,
+                 **kwargs
                  ):
         self.name = (self.prefix() + name + self.postfix()).strip()
         self.inputSlot = [input_type]
diff --git a/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py 
b/python/src/pywy/platforms/jvm/context.py
similarity index 67%
copy from python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py
copy to python/src/pywy/platforms/jvm/context.py
index 0049c5c2..100016dd 100644
--- a/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py
+++ b/python/src/pywy/platforms/jvm/context.py
@@ -15,19 +15,19 @@
 #  limitations under the License.
 #
 
-from typing import List, Type
-
-from pywy.core.channel import CH_T
-from pywy.operators.base import PywyOperator
+from pywy.core.core import TranslateContext
+from pywy.platforms.jvm.serializable.plan_writter import PlanWritter
 from pywy.platforms.jvm.serializable.wayang_jvm_operator import 
WayangJVMOperator
 
 
-class JVMExecutionOperator(PywyOperator):
+class JVMTranslateContext(TranslateContext):
+    plan_writer: PlanWritter
 
-    dispatch_operator: WayangJVMOperator
+    def __init__(self):
+        self.plan_writer = PlanWritter()
 
-    def prefix(self) -> str:
-        return 'JVM'
+    def add_operator(self, op: WayangJVMOperator):
+        self.plan_writer.add_operator(op)
 
-    def execute(self, inputs: List[Type[CH_T]], output: List[CH_T]):
-        pass
+    def generate_request(self):
+        self.plan_writer.send_message_to_wayang()
diff --git a/python/src/pywy/platforms/jvm/execution.py 
b/python/src/pywy/platforms/jvm/execution.py
index 2dad76d6..b64973ef 100644
--- a/python/src/pywy/platforms/jvm/execution.py
+++ b/python/src/pywy/platforms/jvm/execution.py
@@ -20,6 +20,8 @@ from pywy.core import PywyPlan
 from pywy.platforms.jvm.channels import DISPATCHABLE_CHANNEL_DESCRIPTOR
 from pywy.platforms.jvm.graph import NodeDispatch, WGraphDispatch
 from pywy.platforms.jvm.operator import JVMExecutionOperator
+from pywy.platforms.jvm.serializable.wayang_jvm_operator import 
WayangJVMOperator
+
 
 class JVMExecutor(Executor):
 
@@ -84,16 +86,8 @@ class JVMExecutor(Executor):
 
         graph.traversal(graph.starting_nodes, execute)
 
+        magic: JVMExecutionOperator = graph.starting_nodes[0].current
 
-
-        starting: WayangJVMOperator = 
graph.starting_nodes[0].current.dispatch_operator
-        while starting.previous[0]:
-            print(starting)
-            #print(starting.nexts[0])
-            starting = starting.previous[0]
-            if len(starting.previous) == 0 :
-                break
-        print(starting)
-
+        magic.translate_context.generate_request()
 
 
diff --git a/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py 
b/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py
index 0049c5c2..8bc137a7 100644
--- a/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py
+++ b/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py
@@ -19,6 +19,7 @@ from typing import List, Type
 
 from pywy.core.channel import CH_T
 from pywy.operators.base import PywyOperator
+from pywy.platforms.jvm.context import JVMTranslateContext
 from pywy.platforms.jvm.serializable.wayang_jvm_operator import 
WayangJVMOperator
 
 
@@ -26,6 +27,20 @@ class JVMExecutionOperator(PywyOperator):
 
     dispatch_operator: WayangJVMOperator
 
+    translate_context: JVMTranslateContext
+
+    def set_context(self, **kwargs):
+        if 'translate_context' not in kwargs:
+            return
+        self.translate_context = kwargs['translate_context']
+
+    def close_operator(self, op: WayangJVMOperator):
+        if self.translate_context is None:
+            return
+
+        self.translate_context.add_operator(op)
+
+
     def prefix(self) -> str:
         return 'JVM'
 
diff --git a/python/src/pywy/platforms/jvm/operator/jvm_sink_textfile.py 
b/python/src/pywy/platforms/jvm/operator/jvm_sink_textfile.py
index 555ba048..a047de41 100644
--- a/python/src/pywy/platforms/jvm/operator/jvm_sink_textfile.py
+++ b/python/src/pywy/platforms/jvm/operator/jvm_sink_textfile.py
@@ -27,11 +27,12 @@ from pywy.platforms.jvm.serializable.wayang_jvm_operator 
import WayangJVMTextFil
 
 class JVMTextFileSinkOperator(TextFileSink, JVMExecutionOperator):
 
-    def __init__(self, origin: TextFileSink = None):
+    def __init__(self, origin: TextFileSink = None, **kwargs):
         path = None if origin is None else origin.path
         type_class = None if origin is None else origin.inputSlot[0]
         end_line = None if origin is None else origin.end_line
         super().__init__(path, type_class, end_line)
+        self.set_context(**kwargs)
 
     def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]):
         self.validate_channels(inputs, outputs)
@@ -45,6 +46,9 @@ class JVMTextFileSinkOperator(TextFileSink, 
JVMExecutionOperator):
 
             operator.connect_to(0, sink, 0)
 
+            self.close_operator(operator)
+            self.close_operator(sink)
+
             self.dispatch_operator = sink
         else:
             raise Exception("Channel Type does not supported")
diff --git a/python/src/pywy/platforms/jvm/operator/jvm_source_textfile.py 
b/python/src/pywy/platforms/jvm/operator/jvm_source_textfile.py
index 3ca3b911..030c702f 100644
--- a/python/src/pywy/platforms/jvm/operator/jvm_source_textfile.py
+++ b/python/src/pywy/platforms/jvm/operator/jvm_source_textfile.py
@@ -21,15 +21,15 @@ from pywy.core.channel import (CH_T, ChannelDescriptor)
 from pywy.operators.source import TextFileSource
 from pywy.platforms.jvm.channels import DISPATCHABLE_CHANNEL_DESCRIPTOR, 
DispatchableChannel
 from pywy.platforms.jvm.operator.jvm_execution_operator import 
JVMExecutionOperator
-from pywy.platforms.jvm.serializable.wayang_jvm_operator import 
WayangJVMTextFileSource
+from pywy.platforms.jvm.serializable.wayang_jvm_operator import 
WayangJVMTextFileSource, WayangJVMOperator
 
 
 class JVMTextFileSourceOperator(TextFileSource, JVMExecutionOperator):
 
-    def __init__(self, origin: TextFileSource = None):
+    def __init__(self, origin: TextFileSource = None, **kwargs):
         path = None if origin is None else origin.path
         super().__init__(path)
-        pass
+        self.set_context(**kwargs)
 
     def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]):
         self.validate_channels(inputs, outputs)
diff --git a/python/src/pywy/platforms/jvm/operator/jvm_unary_filter.py 
b/python/src/pywy/platforms/jvm/operator/jvm_unary_filter.py
index 06ba9e16..90da7f84 100644
--- a/python/src/pywy/platforms/jvm/operator/jvm_unary_filter.py
+++ b/python/src/pywy/platforms/jvm/operator/jvm_unary_filter.py
@@ -29,10 +29,10 @@ from pywy.platforms.jvm.serializable.wayang_jvm_operator 
import WayangJVMMappart
 
 class JVMFilterOperator(FilterOperator, JVMExecutionOperator):
 
-    def __init__(self, origin: FilterOperator = None):
+    def __init__(self, origin: FilterOperator = None, **kwargs):
         predicate = None if origin is None else origin.predicate
         super().__init__(predicate)
-        pass
+        self.set_context(**kwargs)
 
     def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]):
         self.validate_channels(inputs, outputs)
@@ -60,6 +60,7 @@ class JVMFilterOperator(FilterOperator, JVMExecutionOperator):
             current: WayangJVMMappartitionOperator = 
WayangJVMMappartitionOperator(self.name)
             # TODO check for the case where the index matter
             op.connect_to(0, current, 0)
+            self.close_operator(op)
             py_out_dispatch_channel.accept_dispatchable(current)
 
         else:
diff --git a/python/src/pywy/platforms/jvm/plugin.py 
b/python/src/pywy/platforms/jvm/plugin.py
index eff1e04b..eb52b7af 100644
--- a/python/src/pywy/platforms/jvm/plugin.py
+++ b/python/src/pywy/platforms/jvm/plugin.py
@@ -16,7 +16,8 @@
 #
 
 from pywy.core import Executor
-from pywy.core import Plugin
+from pywy.core.core import Plugin
+from pywy.platforms.jvm.context import JVMTranslateContext
 from pywy.platforms.jvm.execution import JVMExecutor
 from pywy.platforms.jvm.mappings import JVM_OPERATOR_MAPPINGS
 from pywy.platforms.jvm.platform import JVMPlatform
@@ -25,7 +26,7 @@ from pywy.platforms.jvm.platform import JVMPlatform
 class JVMPlugin(Plugin):
 
     def __init__(self):
-        super(JVMPlugin, self).__init__({JVMPlatform()}, JVM_OPERATOR_MAPPINGS)
+        super(JVMPlugin, self).__init__({JVMPlatform()}, 
JVM_OPERATOR_MAPPINGS, JVMTranslateContext())
 
     def get_executor(self) -> Executor:
         return JVMExecutor()
diff --git a/python/src/pywy/platforms/jvm/serializable/plan_writter.py 
b/python/src/pywy/platforms/jvm/serializable/plan_writter.py
new file mode 100644
index 00000000..17929181
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/serializable/plan_writter.py
@@ -0,0 +1,131 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+from typing import Set
+
+import pywy.platforms.jvm.serializable.pywayangplan_pb2 as pwb
+import os
+import cloudpickle
+import logging
+import requests
+import base64
+
+
+# Writes Wayang Plan from several stages
+from pywy.exception import PywyException
+from pywy.operators import SinkOperator
+from pywy.operators.source import SourceUnaryOperator
+from pywy.operators.unary import UnaryToUnaryOperator
+from pywy.platforms.jvm.serializable.wayang_jvm_operator import 
WayangJVMOperator, WayangJVMTextFileSink
+
+
+class PlanWritter:
+
+    def __init__(self):
+        self.originals: Set[WayangJVMOperator] = set()
+
+    def add_operator(self, operator: WayangJVMOperator):
+        self.originals.add(operator)
+
+    def add_proto_unary_operator(self, operator: WayangJVMOperator):
+        op = pwb.OperatorProto()
+        op.id = str(operator.name)
+        op.type = operator.kind
+        op.udf = cloudpickle.dumps(operator.udf)
+        op.path = str(None)
+        return op
+
+    def add_proto_source_operator(self, operator: WayangJVMOperator):
+        source = pwb.OperatorProto()
+        source.id = str(operator.name)
+        source.type = operator.kind
+        source.path = os.path.abspath(operator.path)
+        source.udf = chr(0).encode('utf-8')
+        return source
+
+    def add_proto_sink_operator(self, operator: WayangJVMOperator):
+        sink = pwb.OperatorProto()
+        sink.id = str(operator.name)
+        sink.type = operator.kind
+        sink.path = os.path.abspath(operator.path)
+        sink.udf = chr(0).encode('utf-8')
+        return sink
+
+    def send_message_to_wayang(self):
+        connections = {}
+        sources = []
+        sinks = []
+        operators = []
+        for operator in self.originals:
+            if not operator.is_unary():
+                raise PywyException(
+                    "the not unary operator are not supported".format(
+                        type(operator),
+                        operator
+                    )
+                )
+            if operator.is_operator():
+                connections[operator] = self.add_proto_unary_operator(operator)
+                operators.append(connections[operator])
+            elif operator.is_source():
+                connections[operator] = 
self.add_proto_source_operator(operator)
+                sources.append(connections[operator])
+            elif operator.is_sink():
+                connections[operator] = self.add_proto_sink_operator(operator)
+                sinks.append(connections[operator])
+            else:
+                raise PywyException(
+                    "the type {} for the operator {} is not supported 
{}".format(
+                        type(operator),
+                        operator,
+                        WayangJVMTextFileSink.mro()
+                    )
+                )
+        for operator in self.originals:
+            current = connections[operator]
+            for ele in operator.previous:
+                current.predecessors.append(connections.get(ele).id)
+            for ele in operator.nexts:
+                current.successors.append(connections.get(ele).id)
+
+        plan_configuration = pwb.WayangPlanProto()
+
+        plan = pwb.PlanProto()
+        plan.sources.extend(sources)
+        plan.operators.extend(operators)
+        plan.sinks.extend(sinks)
+        plan.input = pwb.PlanProto.string
+        plan.output = pwb.PlanProto.string
+
+        ctx = pwb.ContextProto()
+        ctx.platforms.extend([pwb.ContextProto.PlatformProto.java])
+
+        plan_configuration.plan.CopyFrom(plan)
+        plan_configuration.context.CopyFrom(ctx)
+
+        print("plan!")
+        print(plan_configuration)
+
+        msg_bytes = plan_configuration.SerializeToString()
+        msg_64 = base64.b64encode(msg_bytes)
+
+        logging.debug(msg_bytes)
+        # response = requests.get("http://localhost:8080/plan/create/fromfile";)
+        data = {
+            'message': msg_64
+        }
+        response = requests.post("http://localhost:8080/plan/create";, data)
+        logging.debug(response)
diff --git a/python/src/pywy/platforms/jvm/serializable/wayang_jvm_operator.py 
b/python/src/pywy/platforms/jvm/serializable/wayang_jvm_operator.py
index d18f155b..ab3a1a4e 100644
--- a/python/src/pywy/platforms/jvm/serializable/wayang_jvm_operator.py
+++ b/python/src/pywy/platforms/jvm/serializable/wayang_jvm_operator.py
@@ -71,6 +71,18 @@ class WayangJVMOperator:
             self.nexts
         )
 
+    def is_source(self):
+        return False
+
+    def is_sink(self):
+        return False
+
+    def is_unary(self):
+        return True
+
+    def is_operator(self):
+        return False
+
 WJO_T = TypeVar('WJO_T', bound=WayangJVMOperator)
 
 
@@ -80,6 +92,8 @@ class WayangJVMMappartitionOperator(WayangJVMOperator):
         super().__init__("MapPartitionOperator", name)
         self.udf = udf
 
+    def is_operator(self):
+        return True
 
 class WayangJVMTextFileSource(WayangJVMOperator):
 
@@ -87,6 +101,8 @@ class WayangJVMTextFileSource(WayangJVMOperator):
         super().__init__("TextFileSource", name)
         self.path = path
 
+    def is_source(self):
+        return True
 
 class WayangJVMTextFileSink(WayangJVMOperator):
 
@@ -94,3 +110,5 @@ class WayangJVMTextFileSink(WayangJVMOperator):
         super().__init__("TextFileSink", name)
         self.path = path
 
+    def is_sink(self):
+        return True
diff --git a/python/src/pywy/platforms/python/operator/py_sink_textfile.py 
b/python/src/pywy/platforms/python/operator/py_sink_textfile.py
index 7d8eec1b..bccab733 100644
--- a/python/src/pywy/platforms/python/operator/py_sink_textfile.py
+++ b/python/src/pywy/platforms/python/operator/py_sink_textfile.py
@@ -28,7 +28,7 @@ from pywy.platforms.python.channels import (
 
 class PyTextFileSinkOperator(TextFileSink, PyExecutionOperator):
 
-    def __init__(self, origin: TextFileSink = None):
+    def __init__(self, origin: TextFileSink = None, **kwargs):
         path = None if origin is None else origin.path
         type_class = None if origin is None else origin.inputSlot[0]
         end_line = None if origin is None else origin.end_line
diff --git a/python/src/pywy/platforms/python/operator/py_source_textfile.py 
b/python/src/pywy/platforms/python/operator/py_source_textfile.py
index 245d090d..f5651e79 100644
--- a/python/src/pywy/platforms/python/operator/py_source_textfile.py
+++ b/python/src/pywy/platforms/python/operator/py_source_textfile.py
@@ -28,7 +28,7 @@ from pywy.platforms.python.channels import (
 
 class PyTextFileSourceOperator(TextFileSource, PyExecutionOperator):
 
-    def __init__(self, origin: TextFileSource = None):
+    def __init__(self, origin: TextFileSource = None, **kwargs):
         path = None if origin is None else origin.path
         super().__init__(path)
         pass
diff --git a/python/src/pywy/platforms/python/operator/py_unary_filter.py 
b/python/src/pywy/platforms/python/operator/py_unary_filter.py
index 2d807282..200f85f1 100644
--- a/python/src/pywy/platforms/python/operator/py_unary_filter.py
+++ b/python/src/pywy/platforms/python/operator/py_unary_filter.py
@@ -32,7 +32,7 @@ from pywy.platforms.python.channels import (
 
 class PyFilterOperator(FilterOperator, PyExecutionOperator):
 
-    def __init__(self, origin: FilterOperator = None):
+    def __init__(self, origin: FilterOperator = None, **kwargs):
         predicate = None if origin is None else origin.predicate
         super().__init__(predicate)
         pass
diff --git a/python/src/pywy/platforms/python/operator/py_unary_flatmap.py 
b/python/src/pywy/platforms/python/operator/py_unary_flatmap.py
index 72016a8c..43a02465 100644
--- a/python/src/pywy/platforms/python/operator/py_unary_flatmap.py
+++ b/python/src/pywy/platforms/python/operator/py_unary_flatmap.py
@@ -33,7 +33,7 @@ from pywy.platforms.python.channels import (
 
 class PyFlatmapOperator(FlatmapOperator, PyExecutionOperator):
 
-    def __init__(self, origin: FlatmapOperator = None):
+    def __init__(self, origin: FlatmapOperator = None, **kwargs):
         fm_function = None if origin is None else origin.fm_function
         super().__init__(fm_function)
 
diff --git a/python/src/pywy/platforms/python/operator/py_unary_map.py 
b/python/src/pywy/platforms/python/operator/py_unary_map.py
index a8e53a48..54418d46 100644
--- a/python/src/pywy/platforms/python/operator/py_unary_map.py
+++ b/python/src/pywy/platforms/python/operator/py_unary_map.py
@@ -32,7 +32,7 @@ from pywy.platforms.python.channels import (
 
 class PyMapOperator(MapOperator, PyExecutionOperator):
 
-    def __init__(self, origin: MapOperator = None):
+    def __init__(self, origin: MapOperator = None, **kwargs):
         function = None if origin is None else origin.function
         super().__init__(function)
         pass
diff --git a/python/src/pywy/tests/integration/jvm_platform_test.py 
b/python/src/pywy/tests/integration/jvm_platform_test.py
index 987e7334..c3bb3e39 100644
--- a/python/src/pywy/tests/integration/jvm_platform_test.py
+++ b/python/src/pywy/tests/integration/jvm_platform_test.py
@@ -80,7 +80,6 @@ class TestIntegrationJVMPlatform(unittest.TestCase):
         self.assertEqual(lines_filter, lines_platform)
 
     def test_grep(self):
-
         dq, path_tmp, pre = self.seed_small_grep(self.file_10e0)
 
         dq.store_textfile(path_tmp)

Reply via email to