This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch WAYANG-211 in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit c113fa92a0cc33e4016ee97fbcd2bf3f92ee2977 Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Sun Jun 19 23:26:57 2022 +0200 [WAYANG-211] python java implementation Signed-off-by: bertty <[email protected]> --- python/src/pywy/platforms/jvm/worker.py | 374 +++++++++++++++++++++ wayang-api/wayang-api-python/pom.xml | 8 + .../wayang/api/python/executor/ProcessFeeder.java | 184 ++++++++++ .../api/python/executor/ProcessReceiver.java | 53 +++ .../api/python/executor/PythonProcessCaller.java | 130 +++++++ .../api/python/executor/PythonWorkerManager.java | 70 ++++ .../wayang/api/python/executor/ReaderIterator.java | 98 ++++++ .../wayang/api/python/function/PythonCode.java | 32 ++ .../api/python/function/PythonFunctionWrapper.java | 46 +++ .../wayang/api/python/function/PythonUDF.java | 25 ++ .../wayang-api-python-defaults.properties | 3 +- .../server/spring/decoder/WayangPlanBuilder.java | 239 +++++++++++++ .../server/spring/general/WayangController.java | 304 +++++++++++++++++ 13 files changed, 1565 insertions(+), 1 deletion(-) diff --git a/python/src/pywy/platforms/jvm/worker.py b/python/src/pywy/platforms/jvm/worker.py new file mode 100644 index 00000000..9e1fe3bb --- /dev/null +++ b/python/src/pywy/platforms/jvm/worker.py @@ -0,0 +1,374 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import socket +import struct +import pickle +from itertools import chain + +import cloudpickle +import base64 +import re +import sys +import time + +class SpecialLengths(object): + END_OF_DATA_SECTION = -1 + PYTHON_EXCEPTION_THROWN = -2 + TIMING_DATA = -3 + END_OF_STREAM = -4 + NULL = -5 + START_ARROW_STREAM = -6 + + +def read_int(stream): + length = stream.read(4) + if not length: + raise EOFError + res = struct.unpack("!i", length)[0] + return res + + +class UTF8Deserializer: + """ + Deserializes streams written by String.getBytes. + """ + + def __init__(self, use_unicode=True): + self.use_unicode = use_unicode + + def loads(self, stream): + length = read_int(stream) + if length == SpecialLengths.END_OF_DATA_SECTION: + raise EOFError + elif length == SpecialLengths.NULL: + return None + s = stream.read(length) + return s.decode("utf-8") if self.use_unicode else s + + def load_stream(self, stream): + try: + while True: + yield self.loads(stream) + except struct.error: + return + except EOFError: + return + + def __repr__(self): + return "UTF8Deserializer(%s)" % self.use_unicode + + +def write_int(p, outfile): + outfile.write(struct.pack("!i", p)) + + +def write_with_length(obj, stream): + serialized = obj.encode('utf-8') + if serialized is None: + raise ValueError("serialized value should not be None") + if len(serialized) > (1 << 31): + raise ValueError("can not serialize object larger than 2G") + write_int(len(serialized), stream) + stream.write(serialized) + + +class Serializer: + def dump_stream(self, iterator, stream): + """ + Serialize an iterator of objects to the output stream. + """ + raise NotImplementedError + + def load_stream(self, stream): + """ + Return an iterator of deserialized objects from the input stream. + """ + raise NotImplementedError + + def dumps(self, obj): + """ + Serialize an object into a byte array. + When batching is used, this will be called with an array of objects. + """ + raise NotImplementedError + + def _load_stream_without_unbatching(self, stream): + """ + Return an iterator of deserialized batches (iterable) of objects from the input stream. + If the serializer does not operate on batches the default implementation returns an + iterator of single element lists. + """ + return map(lambda x: [x], self.load_stream(stream)) + + # Note: our notion of "equality" is that output generated by + # equal serializers can be deserialized using the same serializer. + + # This default implementation handles the simple cases; + # subclasses should override __eq__ as appropriate. + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not self.__eq__(other) + + def __repr__(self): + return "%s()" % self.__class__.__name__ + + def __hash__(self): + return hash(str(self)) + +class FramedSerializer(Serializer): + + """ + Serializer that writes objects as a stream of (length, data) pairs, + where `length` is a 32-bit integer and data is `length` bytes. + """ + + def dump_stream(self, iterator, stream): + for obj in iterator: + self._write_with_length(obj, stream) + + def load_stream(self, stream): + while True: + try: + yield self._read_with_length(stream) + except EOFError: + return + + def _write_with_length(self, obj, stream): + serialized = self.dumps(obj) + if serialized is None: + raise ValueError("serialized value should not be None") + if len(serialized) > (1 << 31): + raise ValueError("can not serialize object larger than 2G") + write_int(len(serialized), stream) + stream.write(serialized) + + def _read_with_length(self, stream): + length = read_int(stream) + if length == SpecialLengths.END_OF_DATA_SECTION: + raise EOFError + elif length == SpecialLengths.NULL: + return None + obj = stream.read(length) + if len(obj) < length: + raise EOFError + return self.loads(obj) + + def dumps(self, obj): + """ + Serialize an object into a byte array. + When batching is used, this will be called with an array of objects. + """ + raise NotImplementedError + + def loads(self, obj): + """ + Deserialize an object from a byte array. + """ + raise NotImplementedError + + +class BatchedSerializer(Serializer): + + """ + Serializes a stream of objects in batches by calling its wrapped + Serializer with streams of objects. + """ + + UNLIMITED_BATCH_SIZE = -1 + UNKNOWN_BATCH_SIZE = 0 + + def __init__(self, serializer, batchSize=UNLIMITED_BATCH_SIZE): + self.serializer = serializer + self.batchSize = batchSize + + def _batched(self, iterator): + if self.batchSize == self.UNLIMITED_BATCH_SIZE: + print("hahahhaha") + yield list(iterator) + elif hasattr(iterator, "__len__") and hasattr(iterator, "__getslice__"): + n = len(iterator) + for i in range(0, n, self.batchSize): + toc = time.perf_counter() + print(f"batched toc1={toc:0.4f}") + yield iterator[i : i + self.batchSize] + else: + items = [] + count = 0 + for item in iterator: + items.append(item) + count += 1 + if count == self.batchSize: + yield items + items = [] + count = 0 + if items: + yield items + + def dump_stream(self, iterator, stream): + self.serializer.dump_stream(self._batched(iterator), stream) + + def load_stream(self, stream): + return chain.from_iterable(self._load_stream_without_unbatching(stream)) + + def _load_stream_without_unbatching(self, stream): + return self.serializer.load_stream(stream) + + def __repr__(self): + return "BatchedSerializer(%s, %d)" % (str(self.serializer), self.batchSize) + + +class PickleSerializer(FramedSerializer): + + """ + Serializes objects using Python's pickle serializer: + + http://docs.python.org/2/library/pickle.html + + This serializer supports nearly any Python object, but may + not be as fast as more specialized serializers. + """ + + def dumps(self, obj): + return pickle.dumps(obj, pickle_protocol) + + def loads(self, obj, encoding="bytes"): + return pickle.loads(obj, encoding=encoding) + +pickle_protocol = pickle.HIGHEST_PROTOCOL +class CloudPickleSerializer(FramedSerializer): + def dumps(self, obj): + try: + return cloudpickle.dumps(obj, pickle_protocol) + except pickle.PickleError: + raise + except Exception as e: + emsg = str(e) + if "'i' format requires" in emsg: + msg = "Object too large to serialize: %s" % emsg + else: + msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg) +# print_exec(sys.stderr) + raise pickle.PicklingError(msg) + + def loads(self, obj, encoding="bytes"): + return cloudpickle.loads(obj, encoding=encoding) + +#if sys.version_info < (3, 8): +CPickleSerializer = PickleSerializer +#else: +# CPickleSerializer = CloudPickleSerializer + +def dump_stream(iterator, stream): + + for obj in iterator: + if type(obj) is str: + print("here?2") + write_with_length(obj, stream) + ## elif type(obj) is list: + ## write_with_length(obj, stream) + print("Termine") + write_int(SpecialLengths.END_OF_DATA_SECTION, stream) + print("Escribi Fin") + + +def process(infile, outfile): + """udf64 = os.environ["UDF"] + print("udf64") + print(udf64) + #serialized_udf = binascii.a2b_base64(udf64) + #serialized_udf = base64.b64decode(udf64) + serialized_udf = bytearray(udf64, encoding='utf-16') + # NOT VALID TO BE UTF8 serialized_udf = bytes(udf64, 'UTF-8') + print("serialized_udf") + print(serialized_udf) + # input to be ast.literal_eval(serialized_udf) + func = pickle.loads(serialized_udf, encoding="bytes") + print ("func") + print (func) + print(func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])) + # func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])""" + + + + # TODO First we must receive the operator + UDF + """udf = lambda elem: elem.lower() + + def func(it): + return sorted(it, key=udf)""" + udf_length = read_int(infile) + print("udf_length") + print(udf_length) + serialized_udf = infile.read(udf_length) + print("serialized_udf") + print(serialized_udf) + #base64_message = base64.b64decode(serialized_udf + "===") + #print("base64_message") + #print(base64_message) + func = pickle.loads(serialized_udf) + #func = ori.lala(serialized_udf) + #print (func) + #for x in func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]): print(x) + + + """print("example") + for x in func("2344|234|efrf|$#|ffrf"): print(x)""" + # TODO Here we are temporarily assuming that the user is exclusively sending UTF8. User has several types + iterator = UTF8Deserializer().load_stream(infile) + # out_iter = sorted(iterator, key=lambda elem: elem.lower()) + # out_iter = batched(func(iterator)) + ser = BatchedSerializer(CPickleSerializer(), 100) + ser.dump_stream(func(iterator), outfile) + #dump_stream(iterator=out_iter, stream=outfile) + + +def local_connect(port): + sock = None + errors = [] + # Support for both IPv4 and IPv6. + # On most of IPv6-ready systems, IPv6 will take precedence. + for res in socket.getaddrinfo("127.0.0.1", port, socket.AF_UNSPEC, socket.SOCK_STREAM): + af, socktype, proto, _, sa = res + try: + sock = socket.socket(af, socktype, proto) + # sock.settimeout(int(os.environ.get("SPARK_AUTH_SOCKET_TIMEOUT", 15))) + sock.settimeout(30) + sock.connect(sa) + # sockfile = sock.makefile("rwb", int(os.environ.get("SPARK_BUFFER_SIZE", 65536))) + sockfile = sock.makefile("rwb", 65536) + # _do_server_auth(sockfile, auth_secret) + return (sockfile, sock) + except socket.error as e: + emsg = str(e) + errors.append("tried to connect to %s, but an error occurred: %s" % (sa, emsg)) + sock.close() + sock = None + raise Exception("could not open socket: %s" % errors) + + +if __name__ == '__main__': + print("Python version") + print (sys.version) + java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"]) + sock_file, sock = local_connect(java_port) + process(sock_file, sock_file) + sock_file.flush() + exit() diff --git a/wayang-api/wayang-api-python/pom.xml b/wayang-api/wayang-api-python/pom.xml index 5a3ad443..9ac2e2c1 100644 --- a/wayang-api/wayang-api-python/pom.xml +++ b/wayang-api/wayang-api-python/pom.xml @@ -35,4 +35,12 @@ <properties> <java-module-name>org.apache.wayang.api</java-module-name> </properties> + + <dependencies> + <dependency> + <groupId>org.apache.wayang</groupId> + <artifactId>wayang-core</artifactId> + <version>0.6.1-SNAPSHOT</version> + </dependency> + </dependencies> </project> diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessFeeder.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessFeeder.java new file mode 100644 index 00000000..ee694322 --- /dev/null +++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessFeeder.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.api.python.executor; + +import org.apache.wayang.api.python.function.PythonCode; +import org.apache.wayang.api.python.function.PythonUDF; +import org.apache.wayang.core.api.exception.WayangException; + +import java.io.BufferedOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.Socket; +import java.net.SocketException; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.Map; + +public class ProcessFeeder<Input, Output> { + + private Socket socket; + private PythonUDF<Input, Output> udf; + private PythonCode serializedUDF; + private Iterable<Input> input; + + //TODO add to a config file + int END_OF_DATA_SECTION = -1; + int NULL = -5; + + public ProcessFeeder( + Socket socket, + PythonUDF<Input, Output> udf, + PythonCode serializedUDF, + Iterable<Input> input){ + + if(input == null) throw new WayangException("Nothing to process with Python API"); + + this.socket = socket; + this.udf = udf; + this.serializedUDF = serializedUDF; + this.input = input; + + } + + public void send(){ + + try{ + //TODO use config buffer size + int BUFFER_SIZE = 8 * 1024; + + BufferedOutputStream stream = new BufferedOutputStream(socket.getOutputStream(), BUFFER_SIZE); + DataOutputStream dataOut = new DataOutputStream(stream); + + writeUDF(serializedUDF, dataOut); + this.writeIteratorToStream(input.iterator(), dataOut); + dataOut.writeInt(END_OF_DATA_SECTION); + dataOut.flush(); + + } catch (IOException e) { + e.printStackTrace(); + } + } + + public void writeUDF(PythonCode serializedUDF, DataOutputStream dataOut){ + + //write(serializedUDF.toByteArray(), dataOut); + writeBytes(serializedUDF.toByteArray(), dataOut); + System.out.println("UDF written"); + + } + + public void writeIteratorToStream(Iterator<Input> iter, DataOutputStream dataOut) + throws IOException { + + System.out.println("iterator being send"); + int buffer = 0; + for (Iterator<Input> it = iter; it.hasNext(); ) { + Input elem = it.next(); + //System.out.println(elem.toString()); + write(elem, dataOut); + } + } + + /*TODO Missing case PortableDataStream */ + public void write(Object obj, DataOutputStream dataOut){ + try { + + if(obj == null) + dataOut.writeInt(this.NULL); + + /** + * Byte Array cases + */ + else if (obj instanceof Byte[] || obj instanceof byte[]) { + System.out.println("Writing Bytes"); + writeBytes(obj, dataOut); + } + /** + * String case + * */ + else if (obj instanceof String) + writeUTF((String) obj, dataOut); + + /** + * Key, Value case + * */ + else if (obj instanceof Map.Entry) + writeKeyValue((Map.Entry) obj, dataOut); + + else{ + throw new WayangException("Unexpected element type " + obj.getClass()); + } + + + } catch (IOException e) { + e.printStackTrace(); + } + } + + public void writeBytes(Object obj, DataOutputStream dataOut){ + + try{ + + if (obj instanceof Byte[]) { + + int length = ((Byte[]) obj).length; + + byte[] bytes = new byte[length]; + int j=0; + + // Unboxing Byte values. (Byte[] to byte[]) + for(Byte b: ((Byte[]) obj)) + bytes[j++] = b.byteValue(); + + dataOut.writeInt(length); + dataOut.write(bytes); + + } else if (obj instanceof byte[]) { + + dataOut.writeInt(((byte[]) obj).length); + dataOut.write(((byte[]) obj)); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + public void writeUTF(String str, DataOutputStream dataOut){ + + byte[] bytes = str.getBytes(StandardCharsets.UTF_8); + + try { + + dataOut.writeInt(bytes.length); + dataOut.write(bytes); + } catch (SocketException e){ + + } catch (IOException e) { + e.printStackTrace(); + } + } + + public void writeKeyValue(Map.Entry obj, DataOutputStream dataOut){ + + write(obj.getKey(), dataOut); + write(obj.getValue(), dataOut); + } + +} diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessReceiver.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessReceiver.java new file mode 100644 index 00000000..7ef5f983 --- /dev/null +++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessReceiver.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.api.python.executor; + +import java.io.BufferedInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.net.Socket; +import java.util.Iterator; +/*TODO cannot be always string, include definition for every operator +* like: map(udf, inputtype, outputtype)*/ +public class ProcessReceiver<Output> { + + private ReaderIterator<Output> iterator; + + public ProcessReceiver(Socket socket){ + try{ + //TODO use config buffer size + int BUFFER_SIZE = 1 * 1024; + + DataInputStream stream = new DataInputStream(new BufferedInputStream(socket.getInputStream(), BUFFER_SIZE)); + this.iterator = new ReaderIterator<>(stream); + + } catch (IOException e) { + e.printStackTrace(); + } + } + + public Iterable<Output> getIterable(){ + return () -> iterator; + } + + public void print(){ + iterator.forEachRemaining(x -> System.out.println(x.toString())); + + } +} diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonProcessCaller.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonProcessCaller.java new file mode 100644 index 00000000..7eca5595 --- /dev/null +++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonProcessCaller.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.api.python.executor; + +import java.lang.ProcessBuilder.Redirect; +import org.apache.wayang.api.python.function.PythonCode; +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.core.api.exception.WayangException; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.Arrays; +import java.util.Map; +import org.apache.wayang.core.util.ReflectionUtils; + +public class PythonProcessCaller { + + private Thread process; + private Socket socket; + private ServerSocket serverSocket; + private boolean ready; + + //TODO How to get the config + private Configuration configuration; + + public PythonProcessCaller(PythonCode serializedUDF){ + + //TODO create documentation to how to the configuration in the code + this.configuration = new Configuration(); + this.configuration.load(ReflectionUtils.loadResource("wayang-api-python-defaults.properties")); + this.ready = false; + byte[] addr = new byte[4]; + addr[0] = 127; addr[1] = 0; addr[2] = 0; addr[3] = 1; + + try { + /*TODO should NOT be assigned an specific port, set port as 0 (zero)*/ + this.serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(addr)); + + Runnable run1 = () -> { + ProcessBuilder pb = new ProcessBuilder( + Arrays.asList( + "python3", + this.configuration.getStringProperty("wayang.api.python.worker") + ) + ); + Map<String, String> workerEnv = pb.environment(); + workerEnv.put("PYTHON_WORKER_FACTORY_PORT", + String.valueOf(this.serverSocket.getLocalPort())); + + // TODO See what is happening with ENV Python version + workerEnv.put( + "PYTHONPATH", + this.configuration.getStringProperty("wayang.api.python.path") + ); + + pb.redirectOutput(Redirect.INHERIT); + pb.redirectError(Redirect.INHERIT); + try { + pb.start(); + } catch (IOException e) { + e.printStackTrace(); + } + }; + this.process = new Thread(run1); + this.process.start(); + + // Redirect worker stdout and stderr + //IDK redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream) + + // Wait for it to connect to our socket + this.serverSocket.setSoTimeout(100000); + + try { + this.socket = this.serverSocket.accept(); + this.serverSocket.setSoTimeout(0); + + if(socket.isConnected()) + this.ready = true; + + } catch (Exception e) { + System.out.println(e); + throw new WayangException("Python worker failed to connect back.", e); + } + } catch (Exception e){ + System.out.println(e); + throw new WayangException("Python worker failed"); + } + } + + public Thread getProcess() { + return process; + } + + public Socket getSocket() { + return socket; + } + + public boolean isReady(){ + return ready; + } + + public void close(){ + try { + this.process.interrupt(); + this.socket.close(); + this.serverSocket.close(); + System.out.println("Everything closed"); + } catch (IOException e) { + e.printStackTrace(); + } + } +} diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonWorkerManager.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonWorkerManager.java new file mode 100644 index 00000000..cef0c148 --- /dev/null +++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonWorkerManager.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.api.python.executor; + +import org.apache.wayang.api.python.function.PythonCode; +import org.apache.wayang.api.python.function.PythonUDF; +import org.apache.wayang.core.api.exception.WayangException; + +public class PythonWorkerManager<Input, Output> { + + private PythonUDF<Input, Output> udf; + private PythonCode serializedUDF; + private Iterable<Input> inputIterator; + + public PythonWorkerManager( + PythonUDF<Input, Output> udf, + PythonCode serializedUDF, + Iterable<Input> input + ){ + this.udf = udf; + this.serializedUDF = serializedUDF; + this.inputIterator = input; + } + + public Iterable<Output> execute(){ + PythonProcessCaller worker = new PythonProcessCaller(this.serializedUDF); + + if(worker.isReady()){ + Runnable run1 = () -> { + ProcessFeeder<Input, Output> feed = new ProcessFeeder<>( + worker.getSocket(), + this.udf, + this.serializedUDF, + this.inputIterator + ); + feed.send(); + }; + Thread lala = new Thread(run1); + lala.start(); + ProcessReceiver<Output> r = new ProcessReceiver<>(worker.getSocket()); + + //r.print(); + return r.getIterable(); + //return (Iterable<Output>) this.inputIterator; + + } else{ + + int port = worker.getSocket().getLocalPort(); + worker.close(); + throw new WayangException("Not possible to work with the Socket provided on port: " + port); + } + + } +} diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ReaderIterator.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ReaderIterator.java new file mode 100644 index 00000000..cac90d8e --- /dev/null +++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ReaderIterator.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.api.python.executor; + +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.NoSuchElementException; + +public class ReaderIterator <Output> implements Iterator<Output> { + + private Output nextObj = null; + private boolean eos = false; + private boolean fst = false; + private DataInputStream stream = null; + + public ReaderIterator(DataInputStream stream) { + + this.stream = stream; + this.eos = false; + this.nextObj = null; + } + + private Output read() { + + int END_OF_DATA_SECTION = -1; + + try { + int length = this.stream.readInt(); + + if (length > 0) { + byte[] obj = new byte[length]; + stream.readFully(obj); + String s = new String(obj, StandardCharsets.UTF_8); + Output it = (Output) s; + return it; + } else if (length == END_OF_DATA_SECTION) { + this.eos = true; + return null; + } + } catch (EOFException e){ + this.eos = true; + return null; + } catch (IOException e) { + //e.printStackTrace(); + throw new RuntimeException(e); + } + return null; + } + + @Override + public boolean hasNext() { + + if(!this.eos){ + nextObj = read(); + // System.out.println(nextObj + " " + !this.eos); + /*To work with null values it is suppose to use -5 + if(this.nextObj == null){ + return false; + }*/ + + return !this.eos; + } + + return false; + } + + @Override + public Output next() { + + if(!this.eos){ + Output obj = nextObj; + nextObj = null; + return obj; + } + + throw new NoSuchElementException(); + } + +} diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonCode.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonCode.java new file mode 100644 index 00000000..55e6a346 --- /dev/null +++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonCode.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wayang.api.python.function; + +import java.io.Serializable; + +public class PythonCode implements Serializable { + + private byte[] code; + public PythonCode(byte[] code){ + this.code = code; + } + + public byte[] toByteArray(){ + return this.code; + } +} diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonFunctionWrapper.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonFunctionWrapper.java new file mode 100644 index 00000000..d7300d26 --- /dev/null +++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonFunctionWrapper.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.api.python.function; + +import org.apache.wayang.api.python.executor.PythonWorkerManager; +import org.apache.wayang.core.function.FunctionDescriptor; + +public class PythonFunctionWrapper<Input, Output> implements FunctionDescriptor.SerializableFunction<Iterable<Input>, Iterable<Output>> { + + private PythonUDF<Input, Output> myUDF; + private PythonCode serializedUDF; + + public PythonFunctionWrapper(PythonUDF<Input, Output> myUDF, PythonCode serializedUDF){ + this.myUDF = myUDF; + this.serializedUDF = serializedUDF; + } + + @Override + public Iterable<Output> apply(Iterable<Input> input) { + + PythonWorkerManager<Input, Output> manager = new PythonWorkerManager<>( + this.myUDF, + this.serializedUDF, + input + ); + Iterable<Output> output = manager.execute(); + return output; + } + +} diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonUDF.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonUDF.java new file mode 100644 index 00000000..1f8dc5dc --- /dev/null +++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonUDF.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.api.python.function; + +import org.apache.wayang.core.function.FunctionDescriptor; + +public interface PythonUDF<Input, Output> extends FunctionDescriptor.SerializableFunction<Iterable<Input>, Iterable<Output>>{ + +} diff --git a/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties b/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties index ff3a99fe..5f9e8f4e 100644 --- a/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties +++ b/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties @@ -15,4 +15,5 @@ # limitations under the License. # -# TODO: Add properties here \ No newline at end of file +wayang.api.python.worker = python/src/pywy/platforms/jvm/worker.py +wayang.api.python.path = /opt/python3 diff --git a/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/decoder/WayangPlanBuilder.java b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/decoder/WayangPlanBuilder.java new file mode 100644 index 00000000..c45cf2ea --- /dev/null +++ b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/decoder/WayangPlanBuilder.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.api.rest.server.spring.decoder; + +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.wayang.api.python.function.WrappedPythonFunction; +import org.apache.wayang.basic.operators.MapPartitionsOperator; +import org.apache.wayang.basic.operators.TextFileSink; +import org.apache.wayang.basic.operators.TextFileSource; +import org.apache.wayang.basic.operators.UnionAllOperator; +import org.apache.wayang.commons.serializable.OperatorProto; +import org.apache.wayang.commons.serializable.PlanProto; +import org.apache.wayang.commons.serializable.WayangPlanProto; +import org.apache.wayang.core.api.WayangContext; +import org.apache.wayang.core.api.exception.WayangException; +import org.apache.wayang.core.function.MapPartitionsDescriptor; +import org.apache.wayang.core.plan.wayangplan.OperatorBase; +import org.apache.wayang.core.plan.wayangplan.WayangPlan; +import org.apache.wayang.java.Java; +import org.apache.wayang.spark.Spark; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.*; +import java.util.stream.Collectors; +import java.util.Base64; + +public class WayangPlanBuilder { + + private WayangPlan wayangPlan; + private WayangContext wayangContext; + + public WayangPlanBuilder(FileInputStream planFile){ + try { + + WayangPlanProto plan = WayangPlanProto.parseFrom(planFile); + + this.wayangContext = buildContext(plan); + this.wayangPlan = buildPlan(plan); + + } catch (IOException e) { + e.printStackTrace(); + } + } + + public WayangPlanBuilder(String writtenPlan){ + + System.out.println(writtenPlan); + byte[] message = Base64.getDecoder().decode(writtenPlan); + System.out.println(message); + + try { + WayangPlanProto plan = WayangPlanProto.parseFrom(message); + + this.wayangContext = buildContext(plan); + this.wayangPlan = buildPlan(plan); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + + } + + private WayangContext buildContext(WayangPlanProto plan){ + + WayangContext ctx = new WayangContext(); +// plan.getContext().getPlatformsList().forEach(platform -> { +// if (platform.getNumber() == 0) +// ctx.with(Java.basicPlugin()); +// else if (platform.getNumber() == 1) +// ctx.with(Spark.basicPlugin()); +// }); + ctx.with(Spark.basicPlugin()); + + return ctx; + } + + private WayangPlan buildPlan(WayangPlanProto plan){ + + System.out.println(plan); + + PlanProto planProto = plan.getPlan(); + LinkedList<OperatorProto> protoList = new LinkedList<>(); + planProto.getSourcesList().forEach(protoList::addLast); + + Map<String, OperatorBase> operators = new HashMap<>(); + List<OperatorBase> sinks = new ArrayList<>(); + while(! protoList.isEmpty()) { + + OperatorProto proto = protoList.pollFirst(); + + /* Checking if protoOperator can be connected to the current WayangPlan*/ + boolean processIt; + if(proto.getType().equals("source")) processIt = true; + + else { + /* Checking if ALL predecessors were already processed */ + processIt = true; + for(String predecessor : proto.getPredecessorsList()){ + if (!operators.containsKey(predecessor)) { + processIt = false; + break; + } + } + } + + /* Operators should not be processed twice*/ + if(operators.containsKey(proto.getId())) processIt = false; + + if(processIt) { + + /* Create and store Wayang operator */ + OperatorBase operator = createOperatorByType(proto); + operators.put(proto.getId(), operator); + + /*TODO Connect with predecessors requires more details in connection slot*/ + int order = 0; + for (String pre_id : proto.getPredecessorsList()) { + + OperatorBase predecessor = operators.get(pre_id); + /* Only works without replicate topology */ + predecessor.connectTo(0, operator, order); + order++; + + if(proto.getType().toLowerCase().contains("sink")){ + sinks.add(operator); + //if(!sinks.contains(operator)) { + // sinks.add(operator); + //} + } + } + + /*List of OperatorProto successors + * They will be added to the protoList + * nevertheless they must be processed only if the parents are in operators list */ + List<OperatorProto> listSuccessors = planProto.getOperatorsList() + .stream() + .filter(t -> proto.getSuccessorsList().contains(t.getId())) + .collect(Collectors.toList()); + for (OperatorProto successor : listSuccessors){ + if(!protoList.contains(successor)){ + protoList.addLast(successor); + } + } + + List<OperatorProto> sinkSuccessors = planProto.getSinksList() + .stream() + .filter(t -> proto.getSuccessorsList().contains(t.getId())) + .collect(Collectors.toList()); + for (OperatorProto successor : sinkSuccessors){ + if(!protoList.contains(successor)){ + protoList.addLast(successor); + } + } + + } else { + + /* In case we cannot process it yet, It must be added again at the end*/ + protoList.addLast(proto); + } + } + + WayangPlan wayangPlan = new WayangPlan(sinks.get(0)); + return wayangPlan; + } + + public OperatorBase createOperatorByType(OperatorProto operator){ + + switch(operator.getType()){ + case "TextFileSource": + try { + String source_path = operator.getPath(); + URL url = new File(source_path).toURI().toURL(); + return new TextFileSource(url.toString()); + } catch (MalformedURLException e) { + e.printStackTrace(); + } + break; + case "TextFileSink": + try { + String sink_path = operator.getPath(); + URL url = new File(sink_path).toURI().toURL(); + return new TextFileSink<String>( + url.toString(), + String.class + ); + + } catch (MalformedURLException e) { + e.printStackTrace(); + } + break; + case "MapPartitionOperator": + return new MapPartitionsOperator<>( + new MapPartitionsDescriptor<String, String>( + new WrappedPythonFunction<String, String>( + l -> l, + operator.getUdf() + ), + String.class, + String.class + ) + ); + + case "union": + return new UnionAllOperator<String>( + String.class + ); + + } + + throw new WayangException("Operator Type not supported "+operator.getType()); + } + + public WayangContext getWayangContext() { + return wayangContext; + } + + public WayangPlan getWayangPlan() { + return wayangPlan; + } +} diff --git a/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/general/WayangController.java b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/general/WayangController.java new file mode 100644 index 00000000..d87044fc --- /dev/null +++ b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/general/WayangController.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.api.rest.server.spring.general; + +import com.google.protobuf.ByteString; +import org.apache.wayang.api.python.function.WrappedPythonFunction; +import org.apache.wayang.api.rest.server.spring.decoder.WayangPlanBuilder; +import org.apache.wayang.basic.operators.*; +import org.apache.wayang.commons.serializable.OperatorProto; +import org.apache.wayang.commons.serializable.PlanProto; +import org.apache.wayang.core.api.WayangContext; +import org.apache.wayang.core.api.exception.WayangException; +import org.apache.wayang.core.function.MapPartitionsDescriptor; +import org.apache.wayang.core.plan.wayangplan.OperatorBase; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Paths; +import java.util.*; +import java.util.stream.Collectors; + +import org.apache.wayang.core.plan.wayangplan.WayangPlan; +import org.apache.wayang.java.Java; +import org.apache.wayang.spark.Spark; + +import org.apache.wayang.commons.serializable.WayangPlanProto; +import org.springframework.web.multipart.MultipartFile; + + +@RestController +public class WayangController { + + @GetMapping("/plan/create/fromfile") + public String planFromFile( + //@RequestParam("file") MultipartFile file + ){ + + try { + FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message"); + WayangPlanBuilder wpb = new WayangPlanBuilder(inputStream); + + /*TODO ADD id to executions*/ + wpb.getWayangContext().execute(wpb.getWayangPlan()); + + } catch (IOException e) { + e.printStackTrace(); + } + + return "Builder works"; + } + + @PostMapping("/plan/create") + public String planFromMessage( + @RequestParam("message") String message + ){ + + WayangPlanBuilder wpb = new WayangPlanBuilder(message); + + /*TODO ADD id to executions*/ + wpb.getWayangContext().execute(wpb.getWayangPlan()); + + return ""; + } + + @GetMapping("/") + public String all(){ + System.out.println("detected!"); + + try { + FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message"); + WayangPlanProto plan = WayangPlanProto.parseFrom(inputStream); + + WayangContext wc = buildContext(plan); + WayangPlan wp = buildPlan(plan); + + System.out.println("Plan!"); + System.out.println(wp.toString()); + + wc.execute(wp); + return("Works!"); + + } catch (FileNotFoundException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + + return "Not working"; + } + + private WayangContext buildContext(WayangPlanProto plan){ + + WayangContext ctx = new WayangContext(); +// plan.getContext().getPlatformsList().forEach(platform -> { +// if (platform.getNumber() == 0) +// ctx.with(Java.basicPlugin()); +// else if (platform.getNumber() == 1) +// ctx.with(Spark.basicPlugin()); +// }); + ctx.with(Spark.basicPlugin()); + + return ctx; + } + + private WayangPlan buildPlan(WayangPlanProto plan){ + + System.out.println(plan); + + PlanProto planProto = plan.getPlan(); + LinkedList<OperatorProto> protoList = new LinkedList<>(); + planProto.getSourcesList().forEach(protoList::addLast); + + Map<String, OperatorBase> operators = new HashMap<>(); + List<OperatorBase> sinks = new ArrayList<>(); + while(! protoList.isEmpty()) { + + OperatorProto proto = protoList.pollFirst(); + + /* Checking if protoOperator can be connected to the current WayangPlan*/ + boolean processIt; + if(proto.getType().equals("source")) processIt = true; + + else { + /* Checking if ALL predecessors were already processed */ + processIt = true; + for(String predecessor : proto.getPredecessorsList()){ + if (!operators.containsKey(predecessor)) { + processIt = false; + break; + } + } + } + + /* Operators should not be processed twice*/ + if(operators.containsKey(proto.getId())) processIt = false; + + if(processIt) { + + /* Create and store Wayang operator */ + OperatorBase operator = createOperatorByType(proto); + operators.put(proto.getId(), operator); + + /*TODO Connect with predecessors requires more details in connection slot*/ + int order = 0; + for (String pre_id : proto.getPredecessorsList()) { + + OperatorBase predecessor = operators.get(pre_id); + /* Only works without replicate topology */ + predecessor.connectTo(0, operator, order); + order++; + + if(proto.getType().equals("sink")){ + sinks.add(operator); + //if(!sinks.contains(operator)) { + // sinks.add(operator); + //} + } + } + + /*List of OperatorProto successors + * They will be added to the protoList + * nevertheless they must be processed only if the parents are in operators list */ + List<OperatorProto> listSuccessors = planProto.getOperatorsList() + .stream() + .filter(t -> proto.getSuccessorsList().contains(t.getId())) + .collect(Collectors.toList()); + for (OperatorProto successor : listSuccessors){ + if(!protoList.contains(successor)){ + protoList.addLast(successor); + } + } + + List<OperatorProto> sinkSuccessors = planProto.getSinksList() + .stream() + .filter(t -> proto.getSuccessorsList().contains(t.getId())) + .collect(Collectors.toList()); + for (OperatorProto successor : sinkSuccessors){ + if(!protoList.contains(successor)){ + protoList.addLast(successor); + } + } + + } else { + + /* In case we cannot process it yet, It must be added again at the end*/ + protoList.addLast(proto); + } + } + + WayangPlan wayangPlan = new WayangPlan(sinks.get(0)); + return wayangPlan; + } + + public OperatorBase createOperatorByType(OperatorProto operator){ + + System.out.println("Typo: " + operator.getType()); + switch(operator.getType()){ + case "source": + try { + String source_path = operator.getPath(); + URL url = new File(source_path).toURI().toURL(); + return new TextFileSource(url.toString()); + } catch (MalformedURLException e) { + e.printStackTrace(); + } + break; + case "sink": + try { + String sink_path = operator.getPath(); + URL url = new File(sink_path).toURI().toURL(); + return new TextFileSink<String>( + url.toString(), + String.class + ); + + } catch (MalformedURLException e) { + e.printStackTrace(); + } + break; + case "reduce_by_key": + try { + /* Function to be applied in Python workers */ + ByteString function = operator.getUdf(); + + /* Has dimension or positions that compose GroupKey */ + Map<String, String> parameters = operator.getParametersMap(); + + PyWayangReduceByOperator<String, String> op = new PyWayangReduceByOperator( + operator.getParametersMap(), + operator.getUdf() , + String.class, + String.class, + false + ); + + String sink_path = operator.getPath(); + URL url = new File(sink_path).toURI().toURL(); + return new TextFileSink<String>( + url.toString(), + String.class + ); + + } catch (MalformedURLException e) { + e.printStackTrace(); + } + break; + case "map_partition": + return new MapPartitionsOperator<>( + new MapPartitionsDescriptor<String, String>( + new WrappedPythonFunction<String, String>( + l -> l, + operator.getUdf() + ), + String.class, + String.class + ) + ); + + case "union": + return new UnionAllOperator<String>( + String.class + ); + + } + + throw new WayangException("Operator Type not supported"); + } + + public static URI createUri(String resourcePath) { + try { + return Thread.currentThread().getClass().getResource(resourcePath).toURI(); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Illegal URI.", e); + } + + } + +}
