[FLINK-3014] Replace InStr.read() calls with DataInStr.readFully() This closes #1432
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c7ada8d7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c7ada8d7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c7ada8d7 Branch: refs/heads/master Commit: c7ada8d785087e0209071a8219ff841006b96639 Parents: fc4d946 Author: zentol <ches...@apache.org> Authored: Sun Nov 22 16:24:30 2015 +0100 Committer: zentol <s.mo...@web.de> Committed: Wed Jan 20 07:24:46 2016 +0100 ---------------------------------------------------------------------- .../api/streaming/data/PythonStreamer.java | 21 ++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c7ada8d7/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java index 4f5fdae..1e36962 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java @@ -12,9 +12,10 @@ */ package org.apache.flink.python.api.streaming.data; +import java.io.DataInputStream; +import java.io.DataOutputStream; import org.apache.flink.python.api.streaming.util.StreamPrinter; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; import java.lang.reflect.Field; @@ -63,8 +64,8 @@ public class PythonStreamer implements Serializable { private Thread shutdownThread; protected ServerSocket server; protected Socket socket; - protected InputStream in; - protected OutputStream out; + protected DataInputStream in; + protected DataOutputStream out; protected int port; protected PythonSender sender; @@ -155,8 +156,8 @@ public class PythonStreamer implements Serializable { } socket = server.accept(); - in = socket.getInputStream(); - out = socket.getOutputStream(); + in = new DataInputStream(socket.getInputStream()); + out = new DataOutputStream(socket.getOutputStream()); } /** @@ -242,7 +243,7 @@ public class PythonStreamer implements Serializable { names[x] = config.getString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + x, null); } - in.read(buffer, 0, 4); + in.readFully(buffer, 0, 4); checkForError(); int size = sender.sendRecord(broadcastCount); sendWriteNotification(size, false); @@ -250,13 +251,13 @@ public class PythonStreamer implements Serializable { for (String name : names) { Iterator bcv = function.getRuntimeContext().getBroadcastVariable(name).iterator(); - in.read(buffer, 0, 4); + in.readFully(buffer, 0, 4); checkForError(); size = sender.sendRecord(name); sendWriteNotification(size, false); while (bcv.hasNext() || sender.hasRemaining(0)) { - in.read(buffer, 0, 4); + in.readFully(buffer, 0, 4); checkForError(); size = sender.sendBuffer(bcv, 0); sendWriteNotification(size, bcv.hasNext() || sender.hasRemaining(0)); @@ -280,7 +281,7 @@ public class PythonStreamer implements Serializable { int size; if (i.hasNext()) { while (true) { - in.read(buffer, 0, 4); + in.readFully(buffer, 0, 4); int sig = getInt(buffer, 0); switch (sig) { case SIGNAL_BUFFER_REQUEST: @@ -325,7 +326,7 @@ public class PythonStreamer implements Serializable { int size; if (i1.hasNext() || i2.hasNext()) { while (true) { - in.read(buffer, 0, 4); + in.readFully(buffer, 0, 4); int sig = getInt(buffer, 0); switch (sig) { case SIGNAL_BUFFER_REQUEST_G0: