[FLINK-3014] Replace InStr.read() calls with DataInStr.readFully()

This closes #1432


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c7ada8d7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c7ada8d7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c7ada8d7

Branch: refs/heads/master
Commit: c7ada8d785087e0209071a8219ff841006b96639
Parents: fc4d946
Author: zentol <ches...@apache.org>
Authored: Sun Nov 22 16:24:30 2015 +0100
Committer: zentol <s.mo...@web.de>
Committed: Wed Jan 20 07:24:46 2016 +0100

----------------------------------------------------------------------
 .../api/streaming/data/PythonStreamer.java      | 21 ++++++++++----------
 1 file changed, 11 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c7ada8d7/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 4f5fdae..1e36962 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -12,9 +12,10 @@
  */
 package org.apache.flink.python.api.streaming.data;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import org.apache.flink.python.api.streaming.util.StreamPrinter;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.lang.reflect.Field;
@@ -63,8 +64,8 @@ public class PythonStreamer implements Serializable {
        private Thread shutdownThread;
        protected ServerSocket server;
        protected Socket socket;
-       protected InputStream in;
-       protected OutputStream out;
+       protected DataInputStream in;
+       protected DataOutputStream out;
        protected int port;
 
        protected PythonSender sender;
@@ -155,8 +156,8 @@ public class PythonStreamer implements Serializable {
                }
 
                socket = server.accept();
-               in = socket.getInputStream();
-               out = socket.getOutputStream();
+               in = new DataInputStream(socket.getInputStream());
+               out = new DataOutputStream(socket.getOutputStream());
        }
 
        /**
@@ -242,7 +243,7 @@ public class PythonStreamer implements Serializable {
                                names[x] = 
config.getString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + x, null);
                        }
 
-                       in.read(buffer, 0, 4);
+                       in.readFully(buffer, 0, 4);
                        checkForError();
                        int size = sender.sendRecord(broadcastCount);
                        sendWriteNotification(size, false);
@@ -250,13 +251,13 @@ public class PythonStreamer implements Serializable {
                        for (String name : names) {
                                Iterator bcv = 
function.getRuntimeContext().getBroadcastVariable(name).iterator();
 
-                               in.read(buffer, 0, 4);
+                               in.readFully(buffer, 0, 4);
                                checkForError();
                                size = sender.sendRecord(name);
                                sendWriteNotification(size, false);
 
                                while (bcv.hasNext() || sender.hasRemaining(0)) 
{
-                                       in.read(buffer, 0, 4);
+                                       in.readFully(buffer, 0, 4);
                                        checkForError();
                                        size = sender.sendBuffer(bcv, 0);
                                        sendWriteNotification(size, 
bcv.hasNext() || sender.hasRemaining(0));
@@ -280,7 +281,7 @@ public class PythonStreamer implements Serializable {
                        int size;
                        if (i.hasNext()) {
                                while (true) {
-                                       in.read(buffer, 0, 4);
+                                       in.readFully(buffer, 0, 4);
                                        int sig = getInt(buffer, 0);
                                        switch (sig) {
                                                case SIGNAL_BUFFER_REQUEST:
@@ -325,7 +326,7 @@ public class PythonStreamer implements Serializable {
                        int size;
                        if (i1.hasNext() || i2.hasNext()) {
                                while (true) {
-                                       in.read(buffer, 0, 4);
+                                       in.readFully(buffer, 0, 4);
                                        int sig = getInt(buffer, 0);
                                        switch (sig) {
                                                case SIGNAL_BUFFER_REQUEST_G0:

Reply via email to