Repository: flink Updated Branches: refs/heads/master 0ac5d4020 -> c7ada8d78
http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java new file mode 100644 index 0000000..4f5fdae --- /dev/null +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java @@ -0,0 +1,375 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.flink.python.api.streaming.data; + +import org.apache.flink.python.api.streaming.util.StreamPrinter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.lang.reflect.Field; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.util.Iterator; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.python.api.PythonPlanBinder; +import static org.apache.flink.python.api.PythonPlanBinder.DEBUG; +import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH; +import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH; +import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_DC_ID; +import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_PLAN_NAME; +import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR; +import static org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_COUNT; +import static org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_NAME_PREFIX; +import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This streamer is used by functions to send/receive data to/from an external python process. + */ +public class PythonStreamer implements Serializable { + protected static final Logger LOG = LoggerFactory.getLogger(PythonStreamer.class); + private static final int SIGNAL_BUFFER_REQUEST = 0; + private static final int SIGNAL_BUFFER_REQUEST_G0 = -3; + private static final int SIGNAL_BUFFER_REQUEST_G1 = -4; + private static final int SIGNAL_FINISHED = -1; + private static final int SIGNAL_ERROR = -2; + private static final byte SIGNAL_LAST = 32; + + private final int id; + private final boolean usePython3; + private final boolean debug; + private final String planArguments; + + private String inputFilePath; + private String outputFilePath; + + private final byte[] buffer = new byte[4]; + + private Process process; + private Thread shutdownThread; + protected ServerSocket server; + protected Socket socket; + protected InputStream in; + protected OutputStream out; + protected int port; + + protected PythonSender sender; + protected PythonReceiver receiver; + + protected StringBuilder msg = new StringBuilder(); + + protected final AbstractRichFunction function; + + public PythonStreamer(AbstractRichFunction function, int id) { + this.id = id; + this.usePython3 = PythonPlanBinder.usePython3; + this.debug = DEBUG; + planArguments = PythonPlanBinder.arguments.toString(); + sender = new PythonSender(); + receiver = new PythonReceiver(); + this.function = function; + } + + /** + * Starts the python script. + * + * @throws IOException + */ + public void open() throws IOException { + server = new ServerSocket(0); + startPython(); + } + + private void startPython() throws IOException { + this.outputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "output"; + this.inputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "input"; + + sender.open(inputFilePath); + receiver.open(outputFilePath); + + String path = function.getRuntimeContext().getDistributedCache().getFile(FLINK_PYTHON_DC_ID).getAbsolutePath(); + String planPath = path + FLINK_PYTHON_PLAN_NAME; + + String pythonBinaryPath = usePython3 ? FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH; + + try { + Runtime.getRuntime().exec(pythonBinaryPath); + } catch (IOException ex) { + throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary."); + } + + if (debug) { + socket.setSoTimeout(0); + LOG.info("Waiting for Python Process : " + function.getRuntimeContext().getTaskName() + + " Run python " + planPath + planArguments); + } else { + process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + planArguments); + new StreamPrinter(process.getInputStream()).start(); + new StreamPrinter(process.getErrorStream(), true, msg).start(); + } + + shutdownThread = new Thread() { + @Override + public void run() { + try { + destroyProcess(); + } catch (IOException ex) { + } + } + }; + + Runtime.getRuntime().addShutdownHook(shutdownThread); + + OutputStream processOutput = process.getOutputStream(); + processOutput.write("operator\n".getBytes()); + processOutput.write(("" + server.getLocalPort() + "\n").getBytes()); + processOutput.write((id + "\n").getBytes()); + processOutput.write((inputFilePath + "\n").getBytes()); + processOutput.write((outputFilePath + "\n").getBytes()); + processOutput.flush(); + + try { // wait a bit to catch syntax errors + Thread.sleep(2000); + } catch (InterruptedException ex) { + } + if (!debug) { + try { + process.exitValue(); + throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely." + msg); + } catch (IllegalThreadStateException ise) { //process still active -> start receiving data + } + } + + socket = server.accept(); + in = socket.getInputStream(); + out = socket.getOutputStream(); + } + + /** + * Closes this streamer. + * + * @throws IOException + */ + public void close() throws IOException { + try { + socket.close(); + sender.close(); + receiver.close(); + } catch (Exception e) { + LOG.error("Exception occurred while closing Streamer. :" + e.getMessage()); + } + if (!debug) { + destroyProcess(); + } + if (shutdownThread != null) { + Runtime.getRuntime().removeShutdownHook(shutdownThread); + } + } + + private void destroyProcess() throws IOException { + try { + process.exitValue(); + } catch (IllegalThreadStateException ise) { //process still active + if (process.getClass().getName().equals("java.lang.UNIXProcess")) { + int pid; + try { + Field f = process.getClass().getDeclaredField("pid"); + f.setAccessible(true); + pid = f.getInt(process); + } catch (Throwable e) { + process.destroy(); + return; + } + String[] args = new String[]{"kill", "-9", "" + pid}; + Runtime.getRuntime().exec(args); + } else { + process.destroy(); + } + } + } + + private void sendWriteNotification(int size, boolean hasNext) throws IOException { + byte[] tmp = new byte[5]; + putInt(tmp, 0, size); + tmp[4] = hasNext ? 0 : SIGNAL_LAST; + out.write(tmp, 0, 5); + out.flush(); + } + + private void sendReadConfirmation() throws IOException { + out.write(new byte[1], 0, 1); + out.flush(); + } + + private void checkForError() { + if (getInt(buffer, 0) == -2) { + try { //wait before terminating to ensure that the complete error message is printed + Thread.sleep(2000); + } catch (InterruptedException ex) { + } + throw new RuntimeException( + "External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely." + msg); + } + } + + /** + * Sends all broadcast-variables encoded in the configuration to the external process. + * + * @param config configuration object containing broadcast-variable count and names + * @throws IOException + */ + public final void sendBroadCastVariables(Configuration config) throws IOException { + try { + int broadcastCount = config.getInteger(PLANBINDER_CONFIG_BCVAR_COUNT, 0); + + String[] names = new String[broadcastCount]; + + for (int x = 0; x < names.length; x++) { + names[x] = config.getString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + x, null); + } + + in.read(buffer, 0, 4); + checkForError(); + int size = sender.sendRecord(broadcastCount); + sendWriteNotification(size, false); + + for (String name : names) { + Iterator bcv = function.getRuntimeContext().getBroadcastVariable(name).iterator(); + + in.read(buffer, 0, 4); + checkForError(); + size = sender.sendRecord(name); + sendWriteNotification(size, false); + + while (bcv.hasNext() || sender.hasRemaining(0)) { + in.read(buffer, 0, 4); + checkForError(); + size = sender.sendBuffer(bcv, 0); + sendWriteNotification(size, bcv.hasNext() || sender.hasRemaining(0)); + } + sender.reset(); + } + } catch (SocketTimeoutException ste) { + throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg); + } + } + + /** + * Sends all values contained in the iterator to the external process and collects all results. + * + * @param i iterator + * @param c collector + * @throws IOException + */ + public final void streamBufferWithoutGroups(Iterator i, Collector c) throws IOException { + try { + int size; + if (i.hasNext()) { + while (true) { + in.read(buffer, 0, 4); + int sig = getInt(buffer, 0); + switch (sig) { + case SIGNAL_BUFFER_REQUEST: + if (i.hasNext() || sender.hasRemaining(0)) { + size = sender.sendBuffer(i, 0); + sendWriteNotification(size, sender.hasRemaining(0) || i.hasNext()); + } else { + throw new RuntimeException("External process requested data even though none is available."); + } + break; + case SIGNAL_FINISHED: + return; + case SIGNAL_ERROR: + try { //wait before terminating to ensure that the complete error message is printed + Thread.sleep(2000); + } catch (InterruptedException ex) { + } + throw new RuntimeException( + "External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + msg); + default: + receiver.collectBuffer(c, sig); + sendReadConfirmation(); + break; + } + } + } + } catch (SocketTimeoutException ste) { + throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg); + } + } + + /** + * Sends all values contained in both iterators to the external process and collects all results. + * + * @param i1 iterator + * @param i2 iterator + * @param c collector + * @throws IOException + */ + public final void streamBufferWithGroups(Iterator i1, Iterator i2, Collector c) throws IOException { + try { + int size; + if (i1.hasNext() || i2.hasNext()) { + while (true) { + in.read(buffer, 0, 4); + int sig = getInt(buffer, 0); + switch (sig) { + case SIGNAL_BUFFER_REQUEST_G0: + if (i1.hasNext() || sender.hasRemaining(0)) { + size = sender.sendBuffer(i1, 0); + sendWriteNotification(size, sender.hasRemaining(0) || i1.hasNext()); + } + break; + case SIGNAL_BUFFER_REQUEST_G1: + if (i2.hasNext() || sender.hasRemaining(1)) { + size = sender.sendBuffer(i2, 1); + sendWriteNotification(size, sender.hasRemaining(1) || i2.hasNext()); + } + break; + case SIGNAL_FINISHED: + return; + case SIGNAL_ERROR: + try { //wait before terminating to ensure that the complete error message is printed + Thread.sleep(2000); + } catch (InterruptedException ex) { + } + throw new RuntimeException( + "External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + msg); + default: + receiver.collectBuffer(c, sig); + sendReadConfirmation(); + break; + } + } + } + } catch (SocketTimeoutException ste) { + throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg); + } + } + + protected final static int getInt(byte[] array, int offset) { + return (array[offset] << 24) | (array[offset + 1] & 0xff) << 16 | (array[offset + 2] & 0xff) << 8 | (array[offset + 3] & 0xff); + } + + protected final static void putInt(byte[] array, int offset, int value) { + array[offset] = (byte) (value >> 24); + array[offset + 1] = (byte) (value >> 16); + array[offset + 2] = (byte) (value >> 8); + array[offset + 3] = (byte) (value); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java new file mode 100644 index 0000000..ed02ce4 --- /dev/null +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.flink.python.api.streaming.plan; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import org.apache.flink.api.java.tuple.Tuple; +import static org.apache.flink.python.api.streaming.data.PythonReceiver.createTuple; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_BOOLEAN; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_BYTE; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_BYTES; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_DOUBLE; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_FLOAT; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_INTEGER; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_LONG; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_NULL; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_SHORT; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_STRING; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_TUPLE; +import org.apache.flink.python.api.types.CustomTypeWrapper; + +/** + * Instances of this class can be used to receive data from the plan process. + */ +public class PythonPlanReceiver implements Serializable { + private final DataInputStream input; + + public PythonPlanReceiver(InputStream input) { + this.input = new DataInputStream(input); + } + + public Object getRecord() throws IOException { + return getRecord(false); + } + + public Object getRecord(boolean normalized) throws IOException { + return receiveField(normalized); + } + + private Object receiveField(boolean normalized) throws IOException { + byte type = (byte) input.readByte(); + switch (type) { + case TYPE_TUPLE: + int tupleSize = input.readByte(); + Tuple tuple = createTuple(tupleSize); + for (int x = 0; x < tupleSize; x++) { + tuple.setField(receiveField(normalized), x); + } + return tuple; + case TYPE_BOOLEAN: + return input.readByte() == 1; + case TYPE_BYTE: + return (byte) input.readByte(); + case TYPE_SHORT: + if (normalized) { + return (int) input.readShort(); + } else { + return input.readShort(); + } + case TYPE_INTEGER: + return input.readInt(); + case TYPE_LONG: + if (normalized) { + return new Long(input.readLong()).intValue(); + } else { + return input.readLong(); + } + case TYPE_FLOAT: + if (normalized) { + return (double) input.readFloat(); + } else { + return input.readFloat(); + } + case TYPE_DOUBLE: + return input.readDouble(); + case TYPE_STRING: + int stringSize = input.readInt(); + byte[] string = new byte[stringSize]; + input.readFully(string); + return new String(string); + case TYPE_BYTES: + int bytessize = input.readInt(); + byte[] bytes = new byte[bytessize]; + input.readFully(bytes); + return bytes; + case TYPE_NULL: + return null; + default: + int size = input.readInt(); + byte[] data = new byte[size]; + input.readFully(data); + return new CustomTypeWrapper(type, data); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java new file mode 100644 index 0000000..16a1eba --- /dev/null +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.flink.python.api.streaming.plan; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.Serializable; +import org.apache.flink.api.java.tuple.Tuple; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_BOOLEAN; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_BYTE; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_BYTES; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_DOUBLE; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_FLOAT; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_INTEGER; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_LONG; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_NULL; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_SHORT; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_STRING; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_TUPLE; +import org.apache.flink.python.api.types.CustomTypeWrapper; + +/** + * Instances of this class can be used to send data to the plan process. + */ +public class PythonPlanSender implements Serializable { + private final DataOutputStream output; + + public PythonPlanSender(OutputStream output) { + this.output = new DataOutputStream(output); + } + + public void sendRecord(Object record) throws IOException { + String className = record.getClass().getSimpleName().toUpperCase(); + if (className.startsWith("TUPLE")) { + className = "TUPLE"; + } + if (className.startsWith("BYTE[]")) { + className = "BYTES"; + } + SupportedTypes type = SupportedTypes.valueOf(className); + switch (type) { + case TUPLE: + output.write(TYPE_TUPLE); + int arity = ((Tuple) record).getArity(); + output.writeInt(arity); + for (int x = 0; x < arity; x++) { + sendRecord(((Tuple) record).getField(x)); + } + return; + case BOOLEAN: + output.write(TYPE_BOOLEAN); + output.write(((Boolean) record) ? (byte) 1 : (byte) 0); + return; + case BYTE: + output.write(TYPE_BYTE); + output.write((Byte) record); + return; + case BYTES: + output.write(TYPE_BYTES); + output.write((byte[]) record, 0, ((byte[]) record).length); + return; + case CHARACTER: + output.write(TYPE_STRING); + output.writeChars(((Character) record) + ""); + return; + case SHORT: + output.write(TYPE_SHORT); + output.writeShort((Short) record); + return; + case INTEGER: + output.write(TYPE_INTEGER); + output.writeInt((Integer) record); + return; + case LONG: + output.write(TYPE_LONG); + output.writeLong((Long) record); + return; + case STRING: + output.write(TYPE_STRING); + output.writeBytes((String) record); + return; + case FLOAT: + output.write(TYPE_FLOAT); + output.writeFloat((Float) record); + return; + case DOUBLE: + output.write(TYPE_DOUBLE); + output.writeDouble((Double) record); + return; + case NULL: + output.write(TYPE_NULL); + return; + case CUSTOMTYPEWRAPPER: + output.write(((CustomTypeWrapper) record).getType()); + output.write(((CustomTypeWrapper) record).getData()); + return; + default: + throw new IllegalArgumentException("Unknown Type encountered: " + type); + } + } + + private enum SupportedTypes { + TUPLE, BOOLEAN, BYTE, BYTES, CHARACTER, SHORT, INTEGER, LONG, FLOAT, DOUBLE, STRING, OTHER, NULL, CUSTOMTYPEWRAPPER + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java new file mode 100644 index 0000000..ecbc7f4 --- /dev/null +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.flink.python.api.streaming.plan; + +import java.io.IOException; +import java.io.Serializable; +import java.net.ServerSocket; +import java.net.Socket; +import org.apache.flink.python.api.streaming.util.StreamPrinter; +import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH; +import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH; +import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_PLAN_NAME; +import static org.apache.flink.python.api.PythonPlanBinder.usePython3; + +/** + * Generic class to exchange data during the plan phase. + */ +public class PythonPlanStreamer implements Serializable { + protected PythonPlanSender sender; + protected PythonPlanReceiver receiver; + + private Process process; + private ServerSocket server; + private Socket socket; + + public Object getRecord() throws IOException { + return getRecord(false); + } + + public Object getRecord(boolean normalize) throws IOException { + return receiver.getRecord(normalize); + } + + public void sendRecord(Object record) throws IOException { + sender.sendRecord(record); + } + + public void open(String tmpPath, String args) throws IOException { + server = new ServerSocket(0); + startPython(tmpPath, args); + socket = server.accept(); + sender = new PythonPlanSender(socket.getOutputStream()); + receiver = new PythonPlanReceiver(socket.getInputStream()); + } + + private void startPython(String tmpPath, String args) throws IOException { + String pythonBinaryPath = usePython3 ? FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH; + + try { + Runtime.getRuntime().exec(pythonBinaryPath); + } catch (IOException ex) { + throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary."); + } + process = Runtime.getRuntime().exec(pythonBinaryPath + " -B " + tmpPath + FLINK_PYTHON_PLAN_NAME + args); + + new StreamPrinter(process.getInputStream()).start(); + new StreamPrinter(process.getErrorStream()).start(); + + try { + Thread.sleep(2000); + } catch (InterruptedException ex) { + } + + try { + int value = process.exitValue(); + if (value != 0) { + throw new RuntimeException("Plan file caused an error. Check log-files for details."); + } + if (value == 0) { + throw new RuntimeException("Plan file exited prematurely without an error."); + } + } catch (IllegalThreadStateException ise) {//Process still running + } + + process.getOutputStream().write("plan\n".getBytes()); + process.getOutputStream().write((server.getLocalPort() + "\n").getBytes()); + process.getOutputStream().flush(); + } + + public void close() { + try { + process.exitValue(); + } catch (NullPointerException npe) { //exception occurred before process was started + } catch (IllegalThreadStateException ise) { //process still active + process.destroy(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java new file mode 100644 index 0000000..5ff3572 --- /dev/null +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.flink.python.api.streaming.util; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; + +/** + * Simple utility class to print all contents of an inputstream to stdout. + */ +public class StreamPrinter extends Thread { + private final BufferedReader reader; + private final boolean wrapInException; + private StringBuilder msg; + + public StreamPrinter(InputStream stream) { + this(stream, false, null); + } + + public StreamPrinter(InputStream stream, boolean wrapInException, StringBuilder msg) { + this.reader = new BufferedReader(new InputStreamReader(stream)); + this.wrapInException = wrapInException; + this.msg = msg; + } + + @Override + public void run() { + String line; + try { + if (wrapInException) { + while ((line = reader.readLine()) != null) { + msg.append("\n" + line); + } + } else { + while ((line = reader.readLine()) != null) { + System.out.println(line); + System.out.flush(); + } + } + } catch (IOException ex) { + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py index 143cea7..680f495 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py @@ -51,33 +51,19 @@ def recv_all(socket, toread): return b"".join(bits) -class OneWayBusyBufferingMappedFileConnection(object): - def __init__(self, output_path): - self._output_file = open(output_path, "rb+") - if hasattr(mmap, 'MAP_SHARED'): - self._file_output_buffer = mmap.mmap(self._output_file.fileno(), MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_WRITE) - else: - self._file_output_buffer = mmap.mmap(self._output_file.fileno(), MAPPED_FILE_SIZE, None, mmap.ACCESS_WRITE) - - self._out = deque() - self._out_size = 0 - - self._offset_limit = MAPPED_FILE_SIZE - 1024 * 1024 * 3 +class PureTCPConnection(object): + def __init__(self, port): + self._socket = SOCKET.socket(family=SOCKET.AF_INET, type=SOCKET.SOCK_STREAM) + self._socket.connect((SOCKET.gethostbyname("localhost"), port)) def write(self, msg): - self._out.append(msg) - self._out_size += len(msg) - if self._out_size > self._offset_limit: - self._write_buffer() + self._socket.send(msg) - def _write_buffer(self): - self._file_output_buffer.seek(1, 0) - self._file_output_buffer.write(b"".join(self._out)) - self._file_output_buffer.seek(0, 0) - self._file_output_buffer.write(b'\x01') + def read(self, size): + return recv_all(self._socket, size) def close(self): - self._file_output_buffer.close() + self._socket.close() class BufferingTCPMappedFileConnection(object): http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Iterator.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Iterator.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Iterator.py index 0e740cf..3425cfa 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Iterator.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Iterator.py @@ -296,3 +296,43 @@ class StringDeserializer(object): class NullDeserializer(object): def deserialize(self): return None + + +class TypedIterator(object): + def __init__(self, con, env): + self._connection = con + self._env = env + + def next(self): + read = self._connection.read + type = read(1) + if type == Types.TYPE_TUPLE: + size = unpack(">i", read(4))[0] + return tuple([self.next() for x in range(size)]) + elif type == Types.TYPE_BYTE: + return unpack(">c", read(1))[0] + elif type == Types.TYPE_BYTES: + size = unpack(">i", read(4))[0] + return bytearray(read(size)) if size else bytearray(b"") + elif type == Types.TYPE_BOOLEAN: + return unpack(">?", read(1))[0] + elif type == Types.TYPE_FLOAT: + return unpack(">f", read(4))[0] + elif type == Types.TYPE_DOUBLE: + return unpack(">d", read(8))[0] + elif type == Types.TYPE_INTEGER: + return unpack(">i", read(4))[0] + elif type == Types.TYPE_LONG: + return unpack(">q", read(8))[0] + elif type == Types.TYPE_STRING: + length = unpack(">i", read(4))[0] + return read(length).decode("utf-8") if length else "" + elif type == Types.TYPE_NULL: + return None + else: + for entry in self._env._types: + if type == entry[0]: + return entry[3]() + raise Exception("Unable to find deserializer for type ID " + str(type)) + + http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py index 865487d..472592a 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py @@ -17,6 +17,7 @@ ################################################################################ from flink.connection import Connection from flink.connection import Collector +from flink.connection import Iterator from flink.plan.DataSet import DataSet from flink.plan.Constants import _Identifier from flink.plan.OperationInfo import OperationInfo @@ -156,12 +157,14 @@ class Environment(object): plan_mode = sys.stdin.readline().rstrip('\n') == "plan" if plan_mode: - output_path = sys.stdin.readline().rstrip('\n') - self._connection = Connection.OneWayBusyBufferingMappedFileConnection(output_path) + port = int(sys.stdin.readline().rstrip('\n')) + self._connection = Connection.PureTCPConnection(port) + self._iterator = Iterator.TypedIterator(self._connection, self) self._collector = Collector.TypedCollector(self._connection, self) self._send_plan() - self._connection._write_buffer() + result = self._receive_result() self._connection.close() + return result else: import struct operator = None @@ -383,3 +386,16 @@ class Environment(object): collect(entry.parent.id) collect(entry.other.id) collect(entry.name) + + def _receive_result(self): + jer = JobExecutionResult() + jer._net_runtime = self._iterator.next() + return jer + + +class JobExecutionResult: + def __init__(self): + self._net_runtime = 0 + + def get_net_runtime(self): + return self._net_runtime