This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch python-platform in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit edab66a194ec90f0ac948255ee681cc7d098ee90 Author: Rodrigo Pardo Meza <[email protected]> AuthorDate: Mon Mar 21 14:03:00 2022 +0100 Wayang 8 (#89) * [WAYANG-8][API-PYTHON] Creation of functions to be consumed by MapPartitionsDescriptor * [WAYANG-8][API-PYTHON] Included PythonProcessCaller that manages the python process execution and Java - Python connection * [WAYANG-8][API-PYTHON] POM fixes plus minor test * [WAYANG-8][API-PYTHON] Python connection through TCP socket enabled * [WAYANG-8][API-PYTHON] Writing from Java to Python. Not taking into care about Iterator Datatypes. * [WAYANG-8][API-PYTHON] Java Socket Writter improvements * [WAYANG-8][API-PYTHON] Python UTF8 Deserializer included * [WAYANG-8][API-PYTHON] Python UTF8 Reading Stream * [WAYANG-8][API-PYTHON] Getting results from Python and continue processing * [WAYANG-8][API-PYTHON] Config files for pywayang * [WAYANG-8][API-PYTHON] Structures to save the plan with functional fashion plus most basic operators * [WAYANG-8][API-PYTHON] Main program to test plan executions locally * [WAYANG-8][API-PYTHON] Minor comments and TODOs * [WAYANG-8][API-PYTHON] Most basic test for protobuff communication with java * [WAYANG-8][API-PYTHON] Addjacency list from PyWayang Plan * [WAYANG-8][API-PYTHON] Graph traversal implementation with visitor pattern * [WAYANG-8][API-PYTHON] Protobuf python message generator * [WAYANG-8][API-PYTHON] Wayang Web Service project structure * [WAYANG-8][API-PYTHON] Protobuf message generation fixes * [WAYANG-8][API-PYTHON] Wayang Web Service executes most basic plans directly * [WAYANG-8][API-PYTHON] Receiving Base64 passing to byte array and unpickling * [WAYANG-8][API-PYTHON] Updated classes to process a single Serialized UDF * [WAYANG-8][API-PYTHON] New test with single UDF * [WAYANG-8][API-PYTHON] Protobuf command * [WAYANG-8][API-PYTHON] Protobuf message template updated * [WAYANG-8][API-PYTHON] POM fixes * [WAYANG-8][API-PYTHON] License comments added * [WAYANG-8][API-PYTHON] Correction on missing licenses * [WAYANG-8][API-PYTHON] Serializable module creation * [WAYANG-8][API-PYTHON] adding protoc to travis * [WAYANG-8][API-PYTHON] protoc executable path correction * [WAYANG-8][API-PYTHON] Commenting objc_class_prefix * [WAYANG-8][API-PYTHON] Obtaining pipelines * [WAYANG-8][API-PYTHON] Dataquanta writing message * [WAYANG-8][API-PYTHON] Plan writer pipeline based adjustments * [WAYANG-8][API-PYTHON] Operator Python executable indicator * [WAYANG-8][API-PYTHON] Plan writer improved to use less sockets * [WAYANG-8][API-PYTHON] New version of Wayang protobuf message * [WAYANG-8][API-PYTHON] Wayang REST improved to allow multi pipelined executions * [WAYANG-8][API-PYTHON] More test programs * [WAYANG-8][API-PYTHON] Commentaries and logging for Graph module * [WAYANG-8][API-PYTHON] Commentaries and logging for Orchestrator module * [WAYANG-8][API-PYTHON] Commentaries and logging for Protobuf module * [WAYANG-8][API-PYTHON] Fix usage of relative paths * [WAYANG-8][API-PYTHON] Scripts to compile protobuf has been deleted. Now Maven executes them * [WAYANG-8][API-PYTHON] Execution Log configuration * [WAYANG-8][API-PYTHON] Fix - Python Map partition with single operator * [WAYANG-8][API-PYTHON] Unitary Testing preparing the Wayang Plan * [WAYANG-8][API-PYTHON] Plugin selection through Plan Descriptor * [WAYANG-8][API-PYTHON] Unitary Testing preparing the Wayang Plan with Spark Execution * [WAYANG-8][API-PYTHON] Pywayang sends protobuf message in API request as bytes using base64 * [WAYANG-8][API-PYTHON] New Operators Flatmap group by, reduce and Reduce By Key. Only Python Side. * [WAYANG-8][API-PYTHON] Protobuf Wayang Plan message updated to allow more Complex Java-Python Operators * [WAYANG-8][API-PYTHON] Adding TPC-H 1st Test * [WAYANG-8][API-PYTHON] Last changes, not working * [WAYANG-8] Fixing errors with dependencies * [WAYANG-8] Fix to Pom versions problem * [WAYANG-8] Protoc path updated * [WAYANG-8] Correction in the pom.xml for flags Signed-off-by: bertty <[email protected]> Co-authored-by: Bertty Contreras-Rojas <[email protected]> Signed-off-by: bertty <[email protected]> --- pom.xml | 2 + pywayang/config/__init__.py | 20 +++ pywayang/config/config_reader.py | 51 ++++++ pywayang/config/pywayang_config.ini | 38 ++++ pywayang/graph/__init__.py | 19 ++ pywayang/graph/graph.py | 71 ++++++++ pywayang/graph/node.py | 48 +++++ pywayang/graph/traversal.py | 51 ++++++ pywayang/graph/visitant.py | 52 ++++++ pywayang/orchestrator/__init__.py | 20 +++ pywayang/orchestrator/dataquanta.py | 330 ++++++++++++++++++++++++++++++++++ pywayang/orchestrator/execdirectly.py | 162 +++++++++++++++++ pywayang/orchestrator/main.py | 173 ++++++++++++++++++ pywayang/orchestrator/operator.py | 121 +++++++++++++ pywayang/orchestrator/plan.py | 52 ++++++ pywayang/protobuf/__init__.py | 18 ++ pywayang/protobuf/old_planwriter.py | 308 +++++++++++++++++++++++++++++++ pywayang/protobuf/planwriter.py | 277 ++++++++++++++++++++++++++++ pywayang/test/demo_testing.py | 30 ++++ pywayang/test/full_java_test.py | 69 +++++++ pywayang/test/full_spark_test.py | 67 +++++++ 21 files changed, 1979 insertions(+) diff --git a/pom.xml b/pom.xml index 3ff48f95..e0ac2bda 100644 --- a/pom.xml +++ b/pom.xml @@ -1248,6 +1248,8 @@ <exclude>**/README.md</exclude> <exclude>**/general-todos.md</exclude> <exclude>**/scala_1*</exclude> + + <exclude>**/*pb2.py</exclude> </excludes> </configuration> </plugin> diff --git a/pywayang/config/__init__.py b/pywayang/config/__init__.py new file mode 100644 index 00000000..008475c2 --- /dev/null +++ b/pywayang/config/__init__.py @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from config.config_reader import get_source_types +from config.config_reader import get_sink_types +from config.config_reader import get_boundary_types diff --git a/pywayang/config/config_reader.py b/pywayang/config/config_reader.py new file mode 100644 index 00000000..c8f58732 --- /dev/null +++ b/pywayang/config/config_reader.py @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import configparser +import os + + +def get_boundary_types(): + config = configparser.ConfigParser() + config.sections() + config.read('../config/pywayang_config.ini') + boundary_types = dict(config.items('BOUNDARY_TYPES')) + boundary_types.pop("variable_to_access") + return boundary_types.values() + + +def get_source_types(): + config = configparser.ConfigParser() + #print("path: ", os.getcwd()) + config.read("../config/pywayang_config.ini") + source_types = dict(config.items('SOURCE_TYPES')) + source_types.pop("variable_to_access") + return source_types.values() + #sections_list = config.sections() + #for section in sections_list: + # print(section) + #print("source_types") + #for x in source_types.values(): + # print(x) + +def get_sink_types(): + config = configparser.ConfigParser() + #print("path: ", os.getcwd()) + config.read("../config/pywayang_config.ini") + sink_types = dict(config.items('SINK_TYPES')) + sink_types.pop("variable_to_access") + return sink_types.values() \ No newline at end of file diff --git a/pywayang/config/pywayang_config.ini b/pywayang/config/pywayang_config.ini new file mode 100644 index 00000000..78cc2b48 --- /dev/null +++ b/pywayang/config/pywayang_config.ini @@ -0,0 +1,38 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +[DEFAULT] +variable_to_access = value + +[INPUT] +txnname_mod = string1 +txnmemo_mod = string2 + +[MODIFY] +txnname_mod = string3 +txnmemo_mod = string4 + +[BOUNDARY_TYPES] +boundary_type_1 = union + +[SOURCE_TYPES] +source_type_1 = source +source_type_2 = text + +[SINK_TYPES] +sink_type_1 = sink +sink_type_2 = sonk \ No newline at end of file diff --git a/pywayang/graph/__init__.py b/pywayang/graph/__init__.py new file mode 100644 index 00000000..17e2deb5 --- /dev/null +++ b/pywayang/graph/__init__.py @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import graph.graph +import graph.node \ No newline at end of file diff --git a/pywayang/graph/graph.py b/pywayang/graph/graph.py new file mode 100644 index 00000000..be7a32f0 --- /dev/null +++ b/pywayang/graph/graph.py @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from graph.node import Node +import logging + + +# Adjacency Matrix used to analise the plan +class Graph: + def __init__(self): + self.graph = {} + self.nodes_no = 0 + self.nodes = [] + + # Fills the Graph + def populate(self, sinks): + for sink in iter(sinks): + self.process_operator(sink) + + # Add current operator and set dependencies + def process_operator(self, operator): + self.add_node(operator.operator_type, operator.id, operator) + + if len(operator.previous) > 0: + for parent in operator.previous: + if parent: + self.add_node(parent.operator_type, parent.id, parent) + self.add_link(operator.id, parent.id, 1) + self.process_operator(parent) + + def add_node(self, name, id, operator): + if id in self.nodes: + return + + self.nodes_no += 1 + self.nodes.append(id) + new_node = Node(name, id, operator) + + self.graph[id] = new_node + + def add_link(self, id_child, id_parent, e): + if id_child in self.nodes: + if id_parent in self.nodes: + self.graph[id_child].add_predecessor(id_parent, e) + self.graph[id_parent].add_successor(id_child, e) + + def print_adjlist(self): + + for key in self.graph: + logging.debug("Node: ", self.graph[key].operator_type, " - ", key) + for key2 in self.graph[key].predecessors: + logging.debug("- Parent: ", self.graph[key2].operator_type, " - ", self.graph[key].predecessors[key2], " - ", key2) + for key2 in self.graph[key].successors: + logging.debug("- Child: ", self.graph[key2].operator_type, " - ", self.graph[key].successors[key2], " - ", key2) + + def get_node(self, id): + return self.graph[id] diff --git a/pywayang/graph/node.py b/pywayang/graph/node.py new file mode 100644 index 00000000..d0d696fd --- /dev/null +++ b/pywayang/graph/node.py @@ -0,0 +1,48 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import abc + + +class Element(metaclass=abc.ABCMeta): + @abc.abstractmethod + def accept(self, visitor, udf, orientation, last_iter): + pass + + +# Describes an Operator in the Graph +class Node(Element): + def __init__(self, operator_type, id, operator): + self.operator_type = operator_type + self.id = id + self.predecessors = {} + self.successors = {} + self.python_exec = operator.python_exec + + # Temporal + self.operator = operator + + def add_predecessor(self, id_parent, e): + self.predecessors[id_parent] = e + + def add_successor(self, id_child, e): + self.successors[id_child] = e + + # Nodes are visited by objects of class Visitant. + # Visitants are being used to execute a UDF through the Graph + def accept(self, visitor, udf, orientation, last_iter): + visitor.visit_node(self, udf, orientation, last_iter) diff --git a/pywayang/graph/traversal.py b/pywayang/graph/traversal.py new file mode 100644 index 00000000..e2dd8516 --- /dev/null +++ b/pywayang/graph/traversal.py @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from graph.visitant import Visitant +import logging + + +# Defines how a UDF will be applied over the Graph +class Traversal: + + def __init__(self, graph, origin, udf): + self.graph = graph + self.origin = origin + self.udf = udf + self.app = Visitant(graph, []) + + # Starting from Sinks or Sources sets an specific orientation + if origin[0].source: + self.orientation = "successors" + elif origin[0].sink: + self.orientation = "predecessors" + else: + logging.error("Origin point to traverse the plan wrongly defined") + return + + for operator in iter(origin): + logging.debug("operator origin: " + str(operator.id)) + node = graph.get_node(operator.id) + self.app.visit_node( + node=node, + udf=self.udf, + orientation=self.orientation, + last_iter=None + ) + + def get_collected_data(self): + return self.app.get_collection() diff --git a/pywayang/graph/visitant.py b/pywayang/graph/visitant.py new file mode 100644 index 00000000..3d2f874f --- /dev/null +++ b/pywayang/graph/visitant.py @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import abc +import logging + + +class Visitor(metaclass=abc.ABCMeta): + @abc.abstractmethod + def visit_node(self, node, udf, orientation, last_iter): + pass + + +# Applies a UDF in current Node +class Visitant(Visitor): + + def __init__(self, graph, results): + self.collection = results + self.graph = graph + + # UDF can store results in ApplyFunction.collection whenever its requires. + # last_iter has the generated current value obtained in the previous iteration + def visit_node(self, node, udf, orientation, last_iter): + logging.debug("Applying UDf" + str(orientation)) + current_value = udf(node, last_iter, self.collection) + logging.debug("orientation result " + str(getattr(node, orientation))) + next_iter = getattr(node, orientation) + if len(next_iter) > 0: + for next_iter_id in next_iter: + if next_iter_id: + logging.debug("next_id: " + str(next_iter_id)) + next_iter_node = self.graph.get_node(next_iter_id) + logging.debug("next_iter_node: " + next_iter_node.operator_type + " " + str(next_iter_node.id)) + next_iter_node.accept(visitor=self, udf=udf, orientation=orientation, last_iter=current_value) + pass + + def get_collection(self): + return self.collection diff --git a/pywayang/orchestrator/__init__.py b/pywayang/orchestrator/__init__.py new file mode 100644 index 00000000..ed7d0ac9 --- /dev/null +++ b/pywayang/orchestrator/__init__.py @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import orchestrator.plan +import orchestrator.dataquanta +import graph.graph diff --git a/pywayang/orchestrator/dataquanta.py b/pywayang/orchestrator/dataquanta.py new file mode 100644 index 00000000..7d700eb5 --- /dev/null +++ b/pywayang/orchestrator/dataquanta.py @@ -0,0 +1,330 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from orchestrator.operator import Operator +from graph.graph import Graph +from graph.traversal import Traversal +from protobuf.planwriter import MessageWriter +import itertools +import collections +import logging +from functools import reduce +import operator + + +# Wraps a Source operation to create an iterable +class DataQuantaBuilder: + def __init__(self, descriptor): + self.descriptor = descriptor + + def source(self, source): + + if type(source) is str: + source_ori = open(source, "r") + else: + source_ori = source + return DataQuanta( + Operator( + operator_type="source", + udf=source, + iterator=iter(source_ori), + previous=[], + python_exec=False + ), + descriptor=self.descriptor + ) + + +# Wraps an operation over an iterable +class DataQuanta: + def __init__(self, operator=None, descriptor=None): + self.operator = operator + self.descriptor = descriptor + if self.operator.is_source(): + self.descriptor.add_source(self.operator) + if self.operator.is_sink(): + self.descriptor.add_sink(self.operator) + + # Operational Functions + def filter(self, udf): + def func(iterator): + return filter(udf, iterator) + + return DataQuanta( + Operator( + operator_type="filter", + udf=func, + previous=[self.operator], + python_exec=True + ), + descriptor=self.descriptor + ) + + def flatmap(self, udf): + + def auxfunc(iterator): + return itertools.chain.from_iterable(map(udf, iterator)) + + def func(iterator): + mapped = map(udf, iterator) + flattened = flatten_single_dim(mapped) + yield from flattened + + def flatten_single_dim(mapped): + for item in mapped: + for subitem in item: + yield subitem + + return DataQuanta( + Operator( + operator_type="flatmap", + udf=func, + previous=[self.operator], + python_exec=True + ), + descriptor=self.descriptor + ) + + def group_by(self, udf): + def func(iterator): + # TODO key should be given by "udf" + return itertools.groupby(iterator, key=operator.itemgetter(0)) + #return itertools.groupby(sorted(iterator), key=itertools.itemgetter(0)) + + return DataQuanta( + Operator( + operator_type="group_by", + udf=func, + previous=[self.operator], + python_exec=True + ), + descriptor=self.descriptor + ) + + def map(self, udf): + def func(iterator): + return map(udf, iterator) + + return DataQuanta( + Operator( + operator_type="map", + udf=func, + previous=[self.operator], + python_exec=True + ), + descriptor=self.descriptor + ) + + # Key specifies pivot dimensions + # UDF specifies reducer function + def reduce_by_key(self, keys, udf): + + op = Operator( + operator_type="reduce_by_key", + udf=udf, + previous=[self.operator], + python_exec=False + ) + + print(len(keys), keys) + for i in range(0, len(keys)): + """if keys[i] is int: + op.set_parameter("vector_position|"+str(i), keys[i]) + else: + op.set_parameter("dimension_key|"+str(i), keys[i])""" + + # TODO maybe would be better just leave the number as key + op.set_parameter("dimension|"+str(i+1), keys[i]) + + return DataQuanta( + op, + descriptor=self.descriptor + ) + + def reduce(self, udf): + def func(iterator): + return reduce(udf, iterator) + + return DataQuanta( + Operator( + operator_type="reduce", + udf=func, + previous=[self.operator], + python_exec=True + ), + descriptor=self.descriptor + ) + + def sink(self, path, end="\n"): + def consume(iterator): + with open(path, 'w') as f: + for x in iterator: + f.write(str(x) + end) + + def func(iterator): + consume(iterator) + # return self.__run(consume) + + return DataQuanta( + Operator( + operator_type="sink", + + udf=path, + # To execute directly uncomment + # udf=func, + + previous=[self.operator], + python_exec=False + ), + descriptor=self.descriptor + ) + + def sort(self, udf): + + def func(iterator): + return sorted(iterator, key=udf) + + return DataQuanta( + Operator( + operator_type="sort", + udf=func, + previous=[self.operator], + python_exec=True + ), + descriptor=self.descriptor + ) + + # This function allow the union to be performed by Python + # Nevertheless, current configuration runs it over Java + def union(self, other): + + def func(iterator): + return itertools.chain(iterator, other.operator.getIterator()) + + return DataQuanta( + Operator( + operator_type="union", + udf=func, + previous=[self.operator, other.operator], + python_exec=False + ), + descriptor=self.descriptor + ) + + def __run(self, consumer): + consumer(self.operator.getIterator()) + + # Execution Functions + def console(self, end="\n"): + def consume(iterator): + for x in iterator: + print(x, end=end) + + self.__run(consume) + + # Only for debugging purposes! + # To execute the plan directly in the program driver + def execute(self): + logging.warn("DEBUG Execution") + logging.info("Reminder to swap SINK UDF value from path to func") + logging.debug(self.operator.previous[0].operator_type) + if self.operator.is_sink(): + logging.debug(self.operator.operator_type) + logging.debug(self.operator.udf) + logging.debug(len(self.operator.previous)) + self.operator.udf(self.operator.previous[0].getIterator()) + else: + logging.error("Plan must call execute from SINK type of operator") + raise RuntimeError + + # Converts Python Functional Plan to valid Wayang Plan + def to_wayang_plan(self): + + sinks = self.descriptor.get_sinks() + if len(sinks) == 0: + return + + graph = Graph() + graph.populate(self.descriptor.get_sinks()) + + # Uncomment to check the Graph built + # graph.print_adjlist() + + # Function to be consumed by Traverse + # Separates Python Plan into a List of Pipelines + def define_pipelines(node1, current_pipeline, collection): + def store_unique(pipe_to_insert): + for pipe in collection: + if equivalent_lists(pipe, pipe_to_insert): + return + collection.append(pipe_to_insert) + + def equivalent_lists(l1, l2): + if collections.Counter(l1) == collections.Counter(l2): + return True + else: + return False + + if not current_pipeline: + current_pipeline = [node1] + + elif node1.operator.is_boundary(): + store_unique(current_pipeline.copy()) + current_pipeline.clear() + current_pipeline.append(node1) + + else: + current_pipeline.append(node1) + + if node1.operator.sink: + store_unique(current_pipeline.copy()) + current_pipeline.clear() + + return current_pipeline + + # Works over the graph + trans = Traversal( + graph=graph, + origin=self.descriptor.get_sources(), + # udf=lambda x, y, z: d(x, y, z) + # UDF always will receive: + # x: a Node object, + # y: an object representing the result of the last iteration, + # z: a collection to store final results inside your UDF + udf=lambda x, y, z: define_pipelines(x, y, z) + ) + + # Gets the results of the traverse process + collected_stages = trans.get_collected_data() + + # Passing the Stages to a Wayang message writer + writer = MessageWriter() + a = 0 + # Stage is composed of class Node objects + for stage in collected_stages: + a += 1 + logging.info("///") + logging.info("stage" + str(a)) + writer.process_pipeline(stage) + + writer.set_dependencies() + + # Uses a file to provide the plan + # writer.write_message(self.descriptor) + + # Send the plan to Wayang REST api directly + writer.send_message(self.descriptor) diff --git a/pywayang/orchestrator/execdirectly.py b/pywayang/orchestrator/execdirectly.py new file mode 100644 index 00000000..452ccab7 --- /dev/null +++ b/pywayang/orchestrator/execdirectly.py @@ -0,0 +1,162 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from orchestrator.plan import Descriptor +from orchestrator.dataquanta import DataQuantaBuilder +import datetime + + +# Returns the Sink Executable Dataquanta of a DEMO plan +def plan_sort(descriptor): + plan = DataQuantaBuilder(descriptor) + sink_dataquanta = \ + plan.source("../test/words.txt") \ + .sort(lambda elem: elem.lower()) \ + .sink("../test/output.txt", end="") + return sink_dataquanta + + +# Returns the Sink Executable Dataquanta of a DEMO plan +def plan_sort_filter(descriptor): + plan = DataQuantaBuilder(descriptor) + sink_dataquanta = \ + plan.source("../test/words.txt") \ + .sort(lambda elem: elem.lower()) \ + .filter(lambda elem: str(elem).startswith("f")) \ + .sink("../test/output.txt", end="") + return sink_dataquanta + + +# Returns the Sink Executable Dataquanta of a DEMO plan +def plan_filter_text(descriptor): + plan = DataQuantaBuilder(descriptor) + + sink_dataquanta = \ + plan.source("../test/words.txt") \ + .filter(lambda elem: str(elem).startswith("f")) \ + .sink("../test/output.txt", end="") + + return sink_dataquanta + + +# Returns the Sink Executable Dataquanta of a DEMO plan +def plan_filter(descriptor): + plan = DataQuantaBuilder(descriptor) + + sink_dataquanta = \ + plan.source("../test/numbers.txt") \ + .filter(lambda elem: int(elem) % 2 != 0) \ + .sink("../test/output.txt", end="") + + return sink_dataquanta + + +# Returns the Sink Executable Dataquanta of a DEMO plan +def plan_basic(descriptor): + plan = DataQuantaBuilder(descriptor) + + sink_dataquanta = \ + plan.source("../test/lines.txt") \ + .sink("../test/output.txt", end="") + + return sink_dataquanta + + +# Returns the Sink Executable Dataquanta of a DEMO plan +def plan_junction(descriptor): + + plan = DataQuantaBuilder(descriptor) + + dq_source_a = plan.source("../test/lines.txt") + dq_source_b = plan.source("../test/morelines.txt") \ + .filter(lambda elem: str(elem).startswith("I")) + dq_source_c = plan.source("../test/lastlines.txt") \ + .filter(lambda elem: str(elem).startswith("W")) + + sink_dataquanta = dq_source_a.union(dq_source_b) \ + .union(dq_source_c) \ + .sort(lambda elem: elem.lower()) \ + .sink("../test/output.txt", end="") + + return sink_dataquanta + + +def plan_java_junction(descriptor): + + plan = DataQuantaBuilder(descriptor) + + dq_source_a = plan.source("../test/lines.txt") + dq_source_b = plan.source("../test/morelines.txt") + sink_dataquanta = dq_source_a.union(dq_source_b) \ + .filter(lambda elem: str(elem).startswith("I")) \ + .sort(lambda elem: elem.lower()) \ + .sink("../test/output.txt", end="") + + return sink_dataquanta + + +def plan_tpch_q1(descriptor): + + #TODO create reduce by + plan = DataQuantaBuilder(descriptor) + + def reducer(obj1, obj2): + return obj1[0] + + sink = plan.source("../test/lineitem.txt") \ + .map(lambda elem: elem.split("|")) \ + .filter(lambda elem: datetime.datetime.strptime(elem[10], '%Y-%m-%d') <= datetime.datetime.strptime("1998-09-02", '%Y-%m-%d')) \ + .map(lambda elem: + [elem[8], elem[9], elem[4], elem[5], + float(elem[5]) * (1 - float(elem[6])), + float(elem[5]) * (1 - float(elem[6])) * (1 + float(elem[7])), + elem[4], elem[5], + elem[6], 1]) \ + .sink("../test/output.txt", end="") + # .group_by(lambda elem: elem) \ + # .reduce_by(reducer) \ + # .flatmap(lambda elem: elem.split("|")) + # .map(lambda elem: (elem, elem.split("|"))) \ + # L_RETURNFLAG 8 + # L_LINESTATUS 9 + # L_QUANTITY 4 + # L_EXTENDEDPRICE 5 + # discount 6 + # tax 7 + + return dq_source_b + + +def plan_full_java(descriptor): + + plan = DataQuantaBuilder(descriptor) + + dq_source_a = plan.source("../test/lines.txt") + dq_source_b = plan.source("../test/morelines.txt") + sink_dataquanta = dq_source_a.union(dq_source_b) \ + .sink("../test/output.txt", end="") + + return sink_dataquanta + + +if __name__ == '__main__': + + # Plan will contain general info about the Wayang Plan created here + descriptor = Descriptor() + + plan_dataquanta_sink = plan_tpch_q1(descriptor) + plan_dataquanta_sink.execute() diff --git a/pywayang/orchestrator/main.py b/pywayang/orchestrator/main.py new file mode 100644 index 00000000..b634eeb9 --- /dev/null +++ b/pywayang/orchestrator/main.py @@ -0,0 +1,173 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from orchestrator.plan import Descriptor +from orchestrator.dataquanta import DataQuantaBuilder +import datetime + + +# Returns the Sink Executable Dataquanta of a DEMO plan +def plan_sort(descriptor): + plan = DataQuantaBuilder(descriptor) + sink_dataquanta = \ + plan.source("../test/words.txt") \ + .sort(lambda elem: elem.lower()) \ + .sink("../test/output.txt", end="") + return sink_dataquanta + + +# Returns the Sink Executable Dataquanta of a DEMO plan +def plan_sort_filter(descriptor): + plan = DataQuantaBuilder(descriptor) + sink_dataquanta = \ + plan.source("../test/words.txt") \ + .sort(lambda elem: elem.lower()) \ + .filter(lambda elem: str(elem).startswith("f")) \ + .sink("../test/output.txt", end="") + return sink_dataquanta + + +# Returns the Sink Executable Dataquanta of a DEMO plan +def plan_filter_text(descriptor): + plan = DataQuantaBuilder(descriptor) + + sink_dataquanta = \ + plan.source("../test/words.txt") \ + .filter(lambda elem: str(elem).startswith("f")) \ + .sink("../test/output.txt", end="") + + return sink_dataquanta + + +# Returns the Sink Executable Dataquanta of a DEMO plan +def plan_filter(descriptor): + plan = DataQuantaBuilder(descriptor) + + sink_dataquanta = \ + plan.source("../test/numbers.txt") \ + .filter(lambda elem: int(elem) % 2 != 0) \ + .sink("../test/output.txt", end="") + + return sink_dataquanta + + +# Returns the Sink Executable Dataquanta of a DEMO plan +def plan_basic(descriptor): + plan = DataQuantaBuilder(descriptor) + + sink_dataquanta = \ + plan.source("../test/lines.txt") \ + .sink("../test/output.txt", end="") + + return sink_dataquanta + + +# Returns the Sink Executable Dataquanta of a DEMO plan +def plan_junction(descriptor): + + plan = DataQuantaBuilder(descriptor) + + dq_source_a = plan.source("../test/lines.txt") + dq_source_b = plan.source("../test/morelines.txt") \ + .filter(lambda elem: str(elem).startswith("I")) + dq_source_c = plan.source("../test/lastlines.txt") \ + .filter(lambda elem: str(elem).startswith("W")) + + sink_dataquanta = dq_source_a.union(dq_source_b) \ + .union(dq_source_c) \ + .sort(lambda elem: elem.lower()) \ + .sink("../test/output.txt", end="") + + return sink_dataquanta + + +def plan_java_junction(descriptor): + + plan = DataQuantaBuilder(descriptor) + + dq_source_a = plan.source("../test/lines.txt") + dq_source_b = plan.source("../test/morelines.txt") + sink_dataquanta = dq_source_a.union(dq_source_b) \ + .filter(lambda elem: str(elem).startswith("I")) \ + .sort(lambda elem: elem.lower()) \ + .sink("../test/output.txt", end="") + + return sink_dataquanta + + +def plan_tpch_q1(descriptor): + + # TODO create reduce by + plan = DataQuantaBuilder(descriptor) + + def reducer(obj1, obj2): + return obj1[0], obj1[1], obj1[2] + obj2[2], obj1[3] + obj2[3], obj1[4] + obj2[4], obj1[5] + obj2[5], \ + obj1[6] + obj2[6], obj1[7] + obj2[7], obj1[8] + obj2[8], obj1[9] + obj2[9] + + sink = plan.source("../test/lineitem.txt") \ + .map(lambda elem: elem.split("|")) \ + .sink("../test/output.txt", end="") + """ + .filter(lambda elem: datetime.datetime.strptime(elem[10], '%Y-%m-%d') <= datetime.datetime.strptime('1998-09-02', '%Y-%m-%d')) \ + .map(lambda elem: + [elem[8], elem[9], elem[4], elem[5], + float(elem[5]) * (1 - float(elem[6])), + float(elem[5]) * (1 - float(elem[6])) * (1 + float(elem[7])), + elem[4], elem[5], + elem[6], 1]) \ + .sink("../test/output.txt", end="")""" + # .reduce_by_key([0, 1], reducer) \ + + + return sink + + +def plan_full_java(descriptor): + + plan = DataQuantaBuilder(descriptor) + + dq_source_a = plan.source("../test/lines.txt") + dq_source_b = plan.source("../test/morelines.txt") + sink_dataquanta = dq_source_a.union(dq_source_b) \ + .sink("../test/output.txt", end="") + + return sink_dataquanta + + +def plan_wordcount(descriptor): + + plan = DataQuantaBuilder(descriptor) + sink_wordcount = plan.source("../test/lineitem.txt") \ + .filter(lambda elem: len(str(elem).split("|")[0]) < 4) \ + .flatmap(lambda elem: str(elem).split("|")) \ + .sink("../test/output.txt", end="") + + return sink_wordcount + + +if __name__ == '__main__': + + # Plan will contain general info about the Wayang Plan created here + descriptor = Descriptor() + descriptor.add_plugin(Descriptor.Plugin.spark) + descriptor.add_plugin(Descriptor.Plugin.java) + + plan_dataquanta_sink = plan_wordcount(descriptor) + # plan_dataquanta_sink.execute() + # plan_dataquanta_sink.console() + + plan_dataquanta_sink.to_wayang_plan() diff --git a/pywayang/orchestrator/operator.py b/pywayang/orchestrator/operator.py new file mode 100644 index 00000000..ecaa6bdd --- /dev/null +++ b/pywayang/orchestrator/operator.py @@ -0,0 +1,121 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pickle +import cloudpickle +from config.config_reader import get_source_types +from config.config_reader import get_sink_types +from config.config_reader import get_boundary_types +import logging + +pickle_protocol = pickle.HIGHEST_PROTOCOL + + +# Describes an Operation over an intermediate result +# Each operation could be processed by Python or Java platforms +class Operator: + + def __init__( + self, operator_type=None, udf=None, previous=None, + iterator=None, python_exec=False + ): + + # Operator ID + self.id = id(self) + + # Operator Type + self.operator_type = operator_type + + # Set Boundaries + if self.operator_type in get_boundary_types(): + self.boundary = True + else: + self.boundary = False + + # UDF Function + self.udf = udf + + # Source types must come with an Iterator + self.iterator = iterator + if operator_type in get_source_types(): + if iterator is None: + print("Source Operator Type without an Iterator") + raise + else: + self.source = True + else: + self.source = False + + # Sink Operators + if operator_type in get_sink_types(): + self.sink = True + else: + self.sink = False + + # TODO Why managing previous and predecessors per separate? + self.previous = previous + + self.successor = [] + self.predecessor = [] + + self.parameters = {} + + # Set predecessors and successors from previous + if self.previous: + for prev in self.previous: + if prev is not None: + prev.set_successor(self) + self.set_predecessor(prev) + + self.python_exec = python_exec + + logging.info("Operator:" + str(self.getID()) + ", type:" + self.operator_type + ", PythonExecutable: " + + str(self.python_exec) + + ", is boundary: " + str(self.is_boundary()) + ", is source: " + + str(self.source) + ", is sink: " + str(self.sink)) + + def getID(self): + return self.id + + def is_source(self): + return self.source + + def is_sink(self): + return self.sink + + def is_boundary(self): + return self.boundary + + def serialize_udf(self): + self.udf = cloudpickle.dumps(self.udf) + + def getIterator(self): + if self.is_source(): + return self.iterator + # TODO this should iterate through previous REDESIGN + return self.udf(self.previous[0].getIterator()) + + def set_parameter(self, key, value): + self.parameters[key] = value + + def set_successor(self, suc): + if (not self.is_sink()) and self.successor.count(suc) == 0: + self.successor.append(suc) + + def set_predecessor(self, suc): + if self.predecessor.count(suc) == 0: + self.predecessor.append(suc) diff --git a/pywayang/orchestrator/plan.py b/pywayang/orchestrator/plan.py new file mode 100644 index 00000000..25610cc7 --- /dev/null +++ b/pywayang/orchestrator/plan.py @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import logging +from enum import Enum + +class Descriptor: + + def __init__(self): + self.sinks = [] + self.sources = [] + self.boundary_operators = None + logging.basicConfig(filename='../config/execution.log', level=logging.DEBUG) + self.plugins = [] + + class Plugin(Enum): + java = 0 + spark = 1 + + def get_boundary_operators(self): + return self.boundary_operators + + def add_source(self, operator): + self.sources.append(operator) + + def get_sources(self): + return self.sources + + def add_sink(self, operator): + self.sinks.append(operator) + + def get_sinks(self): + return self.sinks + + def add_plugin(self, plugin): + self.plugins.append(plugin) + + def get_plugins(self): + return self.plugins diff --git a/pywayang/protobuf/__init__.py b/pywayang/protobuf/__init__.py new file mode 100644 index 00000000..15a80ad9 --- /dev/null +++ b/pywayang/protobuf/__init__.py @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import protobuf.pywayangplan_pb2 diff --git a/pywayang/protobuf/old_planwriter.py b/pywayang/protobuf/old_planwriter.py new file mode 100644 index 00000000..e8700f01 --- /dev/null +++ b/pywayang/protobuf/old_planwriter.py @@ -0,0 +1,308 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import protobuf.pywayangplan_pb2 as pwb +import os +import pickle +import struct +import base64 + + +class OldMessageWriter: + + def __init__(self, descriptor): + + sink = descriptor.get_sinks()[0] + source = descriptor.get_sources()[0] + + op = source + visited = [] + middle_operators = [] + while op.sink is not True and len(op.successor) > 0: + pre = op.successor[0] + if pre not in visited and pre.sink is not True: + pre.serialize_udf() + middle_operators.append(pre) + """base64_bytes = base64.b64encode(pre.udf) + pre.udf = base64_bytes""" + + """pre.serialize_udf() + print("pre.udf") + print(pre.udf) + func = pickle.loads(pre.udf) + print("func") + print(func) + middle_operators.append(pre) + + # Testing + msg = pre.udf + base64_bytes = base64.b64encode(msg) + base64_message = base64.b64decode(base64_bytes) + func2 = pickle.loads(base64_message) + print(base64_message) + func3 = pickle.loads(b'\x80\x04\x955\x04\x00\x00\x00\x00\x00\x00\x8c\x17cloudpickle.cloudpickle\x94\x8c\r_builtin_type\x94\x93\x94\x8c\nLambdaType\x94\x85\x94R\x94(h\x02\x8c\x08CodeType\x94\x85\x94R\x94(K\x01K\x00K\x01K\x03K\x13C\nt\x00\x88\x00|\x00\x83\x02S\x00\x94N\x85\x94\x8c\x06filter\x94\x85\x94\x8c\x08iterator\x94\x85\x94\x8cS/Users/rodrigopardomeza/wayang/incubator-wayang/pywayang/orchestrator/dataquanta.py\x94\x8c\x04func\x94K%C\x02\x00\x01\x94\x8c\x03udf\x94\x85\ [...] + for i in func3([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]): + print(i)""" + op = pre + + """for mid in middle_operators: + print(mid.operator_type) + print(pickle.loads(mid.udf)) + func = pickle.loads(mid.udf) + for i in func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]): + print(i)""" + + finalpath = "/Users/rodrigopardomeza/wayang/incubator-wayang/protobuf/filter_message" + planconf = pwb.WayangPlan() + try: + f = open(finalpath, "rb") + planconf.ParseFromString(f.read()) + f.close() + except IOError: + print(finalpath + ": Could not open file. Creating a new one.") + + so = pwb.Source() + so.id = source.id + so.type = source.operator_type + so.path = os.path.abspath(source.udf) + + operators = [] + for mid in middle_operators: + op = pwb.Operator() + op.id = mid.id + op.type = mid.operator_type + op.udf = mid.udf + operators.append(op) + + si = pwb.Sink() + si.id = sink.id + si.type = sink.operator_type + si.path = os.path.abspath(sink.udf) + + plan = pwb.Plan() + plan.source.CopyFrom(so) + plan.sink.CopyFrom(si) + plan.operators.extend(operators) + plan.input = pwb.Plan.string + plan.output = pwb.Plan.string + + ctx = pwb.Context() + ctx.platforms.extend([pwb.Context.Platform.java]) + + planconf.plan.CopyFrom(plan) + planconf.context.CopyFrom(ctx) + + f = open(finalpath, "wb") + f.write(planconf.SerializeToString()) + f.close() + pass + +class func_inteface: + + def __init__(self, node, nested_udf): + self.node = node + self.nested_udf = nested_udf + + def func(self, iterable): + return self.node.operator.udf(self.nested_udf(iterable)) + + +class MessageWriter: + sources = [] + operators = [] + sinks = [] + + def add_source(self, operator_id, operator_type, path, predecessors, successors): + source = pwb.OperatorProto() + source.id = operator_id + source.type = operator_type + source.path = os.path.abspath(path) + source.udf = None + source.predecessors = predecessors + source.successors = successors + self.sources.append(source) + + def add_sink(self, operator_id, operator_type, path, predecessors, successors): + sink = pwb.OperatorProto() + sink.id = operator_id + sink.type = operator_type + sink.path = os.path.abspath(path) + sink.udf = None + sink.predecessors = predecessors + sink.successors = successors + self.sinks.append(sink) + + def add_operator(self, operator_id, operator_type, udf, path, predecessors, successors): + op = pwb.OperatorProto() + op.id = operator_id + op.type = operator_type + op.udf = udf + op.path = path + op.predecessors = predecessors + op.successors = successors + self.operators.append(op) + + def process_pipeline(self, stage): + + nested_udf = None + nested_id = "" + for node in reversed(stage): + print("########") + print(node.operator_type, "executable:", node.python_exec, "id:", node.id) + + if nested_udf is not None: + print("review pre") + print( nested_udf) + print( nested_udf(["Wilo","lifo","Wifo"])) + + if not node.python_exec: + if nested_udf is not None: + """self.add_operator(nested_id, "map_partition", nested_udf, None + # obtain predecessors and successors + , successors=[node.id] + )""" + print("node", nested_id) + print(nested_udf) + print("he muerto") + print( nested_udf(["Wilo","lifo","Wifo"])) + + t = nested_udf(["Wilo","lifo","Wifo"]) + print("jajajarvard") + print(t) + for i in t: + print(i) + nested_udf = None + nested_id = "" + + """if node.operator.source: + self.add_source( + node.id, node.operator_type, node.operator.udf, + node.predecessors, node.operator.successor) + else: + self.add_operator( + node.id, node.operator_type, None, node.operator.udf, + node.predecessors, node.operator.successor)""" + else: + print("adding", node.id) + if nested_udf is None: + nested_udf = node.operator.udf + nested_id = node.id + else: + print("paseeeeeee viste") + tmp = nested_udf + + print( tmp(["Wilo","lifo","Wifo"])) + + #def func(_, iterable): + # return nested_udf(node.operator.udf(iterable)) + nested_udf = self.concatenate(nested_udf, node.operator.udf) + print( nested_udf(["Wilo","lifo","Wifo"])) + print(nested_udf) + + # nested_udf = func_inteface(node, nested_udf) + nested_id = str(node.id) + "," + str(nested_id) + + if nested_udf is not None: + print("review") + print( nested_udf) + print( nested_udf(["Wilo","lifo","Wifo"])) + + if nested_udf is not None: + """self.add_operator(nested_id, "map_partition", nested_udf, None + # obtain predecessors and successors + , successors=[node.id] + )""" + print("node", nested_id) + print(nested_udf) + t = nested_udf(["Wilo","lifo","Wifo"]) + print("jajajarvard2") + print(t) + for i in t: + print(i) + nested_udf = None + nested_id = "" + + def __init__(self): + print("lala") + + def concatenate(self, function_a, function_b): + def executable(iterable): + return function_a(function_b(iterable)) + return executable + + def old(self, descriptor): + + sink = descriptor.get_sinks()[0] + source = descriptor.get_sources()[0] + + op = source + visited = [] + middle_operators = [] + while op.sink is not True and len(op.successor) > 0: + pre = op.successor[0] + if pre not in visited and pre.sink is not True: + pre.serialize_udf() + middle_operators.append(pre) + op = pre + + finalpath = "/Users/rodrigopardomeza/wayang/incubator-wayang/protobuf/filter_message" + planconf = pwb.WayangPlan() + try: + f = open(finalpath, "rb") + planconf.ParseFromString(f.read()) + f.close() + except IOError: + print(finalpath + ": Could not open file. Creating a new one.") + + so = pwb.Source() + so.id = source.id + so.type = source.operator_type + so.path = os.path.abspath(source.udf) + + operators = [] + for mid in middle_operators: + op = pwb.Operator() + op.id = mid.id + op.type = mid.operator_type + op.udf = mid.udf + operators.append(op) + + si = pwb.Sink() + si.id = sink.id + si.type = sink.operator_type + si.path = os.path.abspath(sink.udf) + + plan = pwb.Plan() + plan.source.CopyFrom(so) + plan.sink.CopyFrom(si) + plan.operators.extend(operators) + plan.input = pwb.Plan.string + plan.output = pwb.Plan.string + + ctx = pwb.Context() + ctx.platforms.extend([pwb.Context.Platform.java]) + + planconf.plan.CopyFrom(plan) + planconf.context.CopyFrom(ctx) + + f = open(finalpath, "wb") + f.write(planconf.SerializeToString()) + f.close() + pass + + def pipeline_singleton(self): + print("lala") diff --git a/pywayang/protobuf/planwriter.py b/pywayang/protobuf/planwriter.py new file mode 100644 index 00000000..b63dcbbc --- /dev/null +++ b/pywayang/protobuf/planwriter.py @@ -0,0 +1,277 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import protobuf.pywayangplan_pb2 as pwb +import os +import cloudpickle +import logging +import pathlib +import requests +import base64 + + +# Writes Wayang Plan from several stages +class MessageWriter: + sources = [] + operators = [] + sinks = [] + operator_references = {} + boundaries = {} + + # Creates and appends Source type of operator + def add_source(self, operator_id, operator_type, path): + source = pwb.OperatorProto() + source.id = str(operator_id) + source.type = operator_type + source.path = os.path.abspath(path) + source.udf = chr(0).encode('utf-8') + # source.parameters = {} + self.sources.append(source) + return source + + # Creates and appends Sink type of operator + def add_sink(self, operator_id, operator_type, path): + sink = pwb.OperatorProto() + sink.id = str(operator_id) + sink.type = operator_type + sink.path = os.path.abspath(path) + sink.udf = chr(0).encode('utf-8') + # sink.parameters = {} + self.sinks.append(sink) + return sink + + # Creates and appends a Python operator + # Python OP don't require parameters, UDF has the function ready to be executed directly + def add_operator(self, operator_id, operator_type, udf): + op = pwb.OperatorProto() + op.id = str(operator_id) + op.type = operator_type + op.udf = cloudpickle.dumps(udf) + op.path = str(None) + # op.parameters = {} + self.operators.append(op) + return op + + # Creates and appends a Java operator + def add_java_operator(self, operator_id, operator_type, udf, parameters): + op = pwb.OperatorProto() + op.id = str(operator_id) + op.type = operator_type + op.udf = cloudpickle.dumps(udf) + op.path = str(None) + #op.parameters = parameters + for param in parameters: + print(param, parameters[param]) + op.parameters[param] = str(parameters[param]) + # op.parameters[] + #m.mapfield[5] = 10 + self.operators.append(op) + return op + + # Receive a chain of operators, separate them in Wayang Operators + # Compacts several Python executable operators in one Map Partition Wayang Operator + def process_pipeline(self, stage): + + nested_udf = None + nested_id = "" + nested_predecessors = None + nested_successors = None + for node in reversed(stage): + logging.debug(node.operator_type + " executable: " + str(node.python_exec) + " id: " + str(node.id)) + + if not node.python_exec: + if nested_udf is not None: + + # Predecessors depends on last operator + # Successors depends on first operator + op = self.add_operator(nested_id, "map_partition", nested_udf) + + ids = str(nested_id).split(",") + for id in ids: + self.operator_references[str(id)] = op + + self.boundaries[str(nested_id)] = {} + self.boundaries[str(nested_id)]["end"] = nested_successors + self.boundaries[str(nested_id)]["start"] = nested_predecessors + + nested_udf = None + nested_id = "" + nested_predecessors = None + nested_successors = None + + if node.operator.source: + op = self.add_source(node.id, node.operator_type, node.operator.udf) + self.operator_references[str(node.id)] = op + self.boundaries[str(node.id)] = {} + self.boundaries[str(node.id)]["end"] = node.successors.keys() + + elif node.operator.sink: + op = self.add_sink(node.id, node.operator_type, node.operator.udf) + self.operator_references[str(node.id)] = op + self.boundaries[str(node.id)] = {} + self.boundaries[str(node.id)]["start"] = node.predecessors.keys() + + # Regular operator to be processed in Java + # Notice that those could include more parameters for Java + else: + op = self.add_java_operator(node.id, node.operator_type, node.operator.udf, node.operator.parameters) + self.operator_references[str(node.id)] = op + self.boundaries[str(node.id)] = {} + self.boundaries[str(node.id)]["start"] = node.predecessors.keys() + self.boundaries[str(node.id)]["end"] = node.successors.keys() + + else: + + if nested_udf is None: + nested_udf = node.operator.udf + nested_id = node.id + # It is the last operator to execute in the map partition + nested_successors = node.successors.keys() + + else: + nested_udf = self.concatenate(nested_udf, node.operator.udf) + nested_id = str(node.id) + "," + str(nested_id) + + # Every iteration assign the first known predecessors + nested_predecessors = node.predecessors.keys() + + # Just in case in the future some pipelines start with Python operators + if nested_udf is not None: + self.add_operator(nested_id, "map_partition", nested_udf) + + ids = nested_id.split(",") + for id in ids: + self.operator_references[id] = op + + self.boundaries[nested_id] = {} + self.boundaries[nested_id]["end"] = nested_successors + self.boundaries[nested_id]["start"] = nested_predecessors + + def __init__(self): + pass + + # Takes 2 Functions and compact them in only one function + @staticmethod + def concatenate(function_a, function_b): + def executable(iterable): + return function_a(function_b(iterable)) + + return executable + + # Set dependencies over final Wayang Operators + def set_dependencies(self): + + for source in self.sources: + + if 'end' in self.boundaries[source.id]: + op_successors = [] + for op_id in self.boundaries[source.id]['end']: + op_successors.append(str(self.operator_references[str(op_id)].id)) + source.successors.extend(op_successors) + + for sink in self.sinks: + if 'start' in self.boundaries[sink.id]: + op_predecessors = [] + for op_id in self.boundaries[sink.id]['start']: + op_predecessors.append(str(self.operator_references[str(op_id)].id)) + sink.predecessors.extend(op_predecessors) + + for op in self.operators: + if 'start' in self.boundaries[op.id]: + op_predecessors = [] + for op_id in self.boundaries[op.id]['start']: + op_predecessors.append(str(self.operator_references[str(op_id)].id)) + op.predecessors.extend(op_predecessors) + + if 'end' in self.boundaries[op.id]: + op_successors = [] + for op_id in self.boundaries[op.id]['end']: + op_successors.append(str(self.operator_references[str(op_id)].id)) + op.successors.extend(op_successors) + + # Writes the message to a local directory + def write_message(self, descriptor): + + finalpath = "../../protobuf/wayang_message" + plan_configuration = pwb.WayangPlanProto() + + try: + f = open(finalpath, "rb") + plan_configuration.ParseFromString(f.read()) + f.close() + except IOError: + logging.warn("File " + finalpath + " did not exist. System generated a new file") + + plan = pwb.PlanProto() + plan.sources.extend(self.sources) + plan.operators.extend(self.operators) + plan.sinks.extend(self.sinks) + plan.input = pwb.PlanProto.string + plan.output = pwb.PlanProto.string + + ctx = pwb.ContextProto() + # ctx.platforms.extend([pwb.ContextProto.PlatformProto.java]) + for plug in descriptor.plugins: + ctx.platforms.append(plug.value) + # ctx.platforms.extend(descriptor.get_plugins()) + + plan_configuration.plan.CopyFrom(plan) + plan_configuration.context.CopyFrom(ctx) + + f = open(finalpath, "wb") + f.write(plan_configuration.SerializeToString()) + f.close() + pass + + # Send message as bytes to the Wayang Rest API + def send_message(self, descriptor): + + plan_configuration = pwb.WayangPlanProto() + + plan = pwb.PlanProto() + plan.sources.extend(self.sources) + plan.operators.extend(self.operators) + plan.sinks.extend(self.sinks) + plan.input = pwb.PlanProto.string + plan.output = pwb.PlanProto.string + + ctx = pwb.ContextProto() + # ctx.platforms.extend([pwb.ContextProto.PlatformProto.java]) + for plug in descriptor.plugins: + ctx.platforms.append(plug.value) + # ctx.platforms.extend(descriptor.get_plugins()) + + plan_configuration.plan.CopyFrom(plan) + plan_configuration.context.CopyFrom(ctx) + + print("plan!") + print(plan_configuration) + + msg_bytes = plan_configuration.SerializeToString() + msg_64 = base64.b64encode(msg_bytes) + + logging.debug(msg_bytes) + # response = requests.get("http://localhost:8080/plan/create/fromfile") + data = { + 'message': msg_64 + } + response = requests.post("http://localhost:8080/plan/create", data) + logging.debug(response) + # f = open(finalpath, "wb") + # f.write(plan_configuration.SerializeToString()) + # f.close() + pass diff --git a/pywayang/test/demo_testing.py b/pywayang/test/demo_testing.py new file mode 100644 index 00000000..c096a897 --- /dev/null +++ b/pywayang/test/demo_testing.py @@ -0,0 +1,30 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import unittest + + +class MyTestCase(unittest.TestCase): + + def test_something(self): + self.assertEqual(True, False) + + def test_upper(self): + self.assertEqual('foo'.upper(), 'FOO') + + +if __name__ == '__main__': + unittest.main() diff --git a/pywayang/test/full_java_test.py b/pywayang/test/full_java_test.py new file mode 100644 index 00000000..d17aedd0 --- /dev/null +++ b/pywayang/test/full_java_test.py @@ -0,0 +1,69 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import unittest +from orchestrator.plan import Descriptor +from orchestrator.dataquanta import DataQuantaBuilder + + +class MyTestCase(unittest.TestCase): + + def test_most_basic(self): + descriptor = Descriptor() + descriptor.add_plugin(Descriptor.Plugin.java) + + plan = DataQuantaBuilder(descriptor) + sink_dataquanta = \ + plan.source("../test/lines.txt") \ + .sink("../test/output.txt", end="") + + sink_dataquanta.to_wayang_plan() + + + def test_single_juncture(self): + descriptor = Descriptor() + descriptor.add_plugin(Descriptor.Plugin.java) + + plan = DataQuantaBuilder(descriptor) + dq_source_a = plan.source("../test/lines.txt") + dq_source_b = plan.source("../test/morelines.txt") + sink_dataquanta = dq_source_a.union(dq_source_b) \ + .sink("../test/output.txt", end="") + + sink_dataquanta.to_wayang_plan() + + + def test_multiple_juncture(self): + descriptor = Descriptor() + descriptor.add_plugin(Descriptor.Plugin.java) + + plan = DataQuantaBuilder(descriptor) + dq_source_a = plan.source("../test/lines.txt") + dq_source_b = plan.source("../test/morelines.txt") \ + .filter(lambda elem: str(elem).startswith("I")) + dq_source_c = plan.source("../test/lastlines.txt") \ + .filter(lambda elem: str(elem).startswith("W")) + + sink_dataquanta = dq_source_a.union(dq_source_b) \ + .union(dq_source_c) \ + .sort(lambda elem: elem.lower()) \ + .sink("../test/output.txt", end="") + + sink_dataquanta.to_wayang_plan() + + +if __name__ == '__main__': + unittest.main() diff --git a/pywayang/test/full_spark_test.py b/pywayang/test/full_spark_test.py new file mode 100644 index 00000000..9276ccc6 --- /dev/null +++ b/pywayang/test/full_spark_test.py @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import unittest +from orchestrator.plan import Descriptor +from orchestrator.dataquanta import DataQuantaBuilder + + +def test_most_basic(self): + descriptor = Descriptor() + descriptor.add_plugin(Descriptor.Plugin.spark) + + plan = DataQuantaBuilder(descriptor) + sink_dataquanta = \ + plan.source("../test/lines.txt") \ + .sink("../test/output.txt", end="") + + sink_dataquanta.to_wayang_plan() + + +def test_single_juncture(self): + descriptor = Descriptor() + descriptor.add_plugin(Descriptor.Plugin.spark) + + plan = DataQuantaBuilder(descriptor) + dq_source_a = plan.source("../test/lines.txt") + dq_source_b = plan.source("../test/morelines.txt") + sink_dataquanta = dq_source_a.union(dq_source_b) \ + .sink("../test/output.txt", end="") + + sink_dataquanta.to_wayang_plan() + + +def test_multiple_juncture(self): + descriptor = Descriptor() + descriptor.add_plugin(Descriptor.Plugin.spark) + + plan = DataQuantaBuilder(descriptor) + dq_source_a = plan.source("../test/lines.txt") + dq_source_b = plan.source("../test/morelines.txt") \ + .filter(lambda elem: str(elem).startswith("I")) + dq_source_c = plan.source("../test/lastlines.txt") \ + .filter(lambda elem: str(elem).startswith("W")) + + sink_dataquanta = dq_source_a.union(dq_source_b) \ + .union(dq_source_c) \ + .sort(lambda elem: elem.lower()) \ + .sink("../test/output.txt", end="") + + sink_dataquanta.to_wayang_plan() + + +if __name__ == '__main__': + unittest.main()
