Repository: flink Updated Branches: refs/heads/release-1.1 8a1f16bd4 -> b62808ed7
[FLINK-6044] Replace all unintentional calls to InputStream#read(...) with InputStream#readFully(...) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b62808ed Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b62808ed Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b62808ed Branch: refs/heads/release-1.1 Commit: b62808ed7bd91d3f474b4d768c6ab8e3d86c65bf Parents: 8a1f16b Author: Stefan Richter <[email protected]> Authored: Tue Mar 14 14:45:00 2017 +0100 Committer: Fabian Hueske <[email protected]> Committed: Fri Mar 31 15:07:48 2017 +0200 ---------------------------------------------------------------------- .../api/common/typeutils/base/BigIntSerializer.java | 7 ++++--- .../core/memory/DataOutputViewStreamWrapper.java | 2 +- .../src/main/java/org/apache/flink/types/Record.java | 14 +++++++------- .../typeutils/runtime/TestDataOutputSerializer.java | 10 +++++----- .../typeutils/runtime/kryo/KryoClearedBufferTest.java | 4 +--- .../flink/runtime/util/DataOutputSerializer.java | 2 +- .../io/network/api/writer/RecordWriterTest.java | 2 +- .../apache/flink/streaming/runtime/io/TestEvent.java | 2 +- 8 files changed, 21 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b62808ed/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java index 73b2f54..041165d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java @@ -18,12 +18,13 @@ package org.apache.flink.api.common.typeutils.base; -import java.io.IOException; -import java.math.BigInteger; import org.apache.flink.annotation.Internal; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; +import java.io.IOException; +import java.math.BigInteger; + /** * Serializer for serializing/deserializing BigInteger values including null values. */ @@ -130,7 +131,7 @@ public final class BigIntSerializer extends TypeSerializerSingleton<BigInteger> } } final byte[] bytes = new byte[len - 4]; - source.read(bytes); + source.readFully(bytes); return new BigInteger(bytes); } http://git-wip-us.apache.org/repos/asf/flink/blob/b62808ed/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java index 9ec9c29..4e45532 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java @@ -57,7 +57,7 @@ public class DataOutputViewStreamWrapper extends DataOutputStream implements Dat while (numBytes > 0) { int toCopy = Math.min(numBytes, tempBuffer.length); - source.read(tempBuffer, 0, toCopy); + source.readFully(tempBuffer, 0, toCopy); write(tempBuffer, 0, toCopy); numBytes -= toCopy; } http://git-wip-us.apache.org/repos/asf/flink/blob/b62808ed/flink-core/src/main/java/org/apache/flink/types/Record.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/Record.java b/flink-core/src/main/java/org/apache/flink/types/Record.java index 9990ddf..c296751 100644 --- a/flink-core/src/main/java/org/apache/flink/types/Record.java +++ b/flink-core/src/main/java/org/apache/flink/types/Record.java @@ -19,6 +19,12 @@ package org.apache.flink.types; +import org.apache.flink.annotation.Public; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemoryUtils; +import org.apache.flink.util.InstantiationUtil; + import java.io.DataInput; import java.io.DataOutput; import java.io.EOFException; @@ -27,12 +33,6 @@ import java.io.Serializable; import java.io.UTFDataFormatException; import java.nio.ByteOrder; -import org.apache.flink.annotation.Public; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.MemoryUtils; -import org.apache.flink.util.InstantiationUtil; - /** * The Record represents a multi-valued data record. @@ -1808,7 +1808,7 @@ public final class Record implements Value, CopyableValue<Record> { throw new IOException("Could not write " + numBytes + " bytes since the buffer is full."); } - source.read(this.memory,this.position, numBytes); + source.readFully(this.memory,this.position, numBytes); this.position += numBytes; } } http://git-wip-us.apache.org/repos/asf/flink/blob/b62808ed/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java index 87be6db..d830a21 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java @@ -18,16 +18,16 @@ package org.apache.flink.api.java.typeutils.runtime; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemoryUtils; + import java.io.EOFException; import java.io.IOException; import java.io.UTFDataFormatException; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.MemoryUtils; - public final class TestDataOutputSerializer implements DataOutputView { private byte[] buffer; @@ -301,7 +301,7 @@ public final class TestDataOutputSerializer implements DataOutputView { throw new EOFException("Could not write " + numBytes + " bytes. Buffer overflow."); } - source.read(this.buffer, this.position, numBytes); + source.readFully(this.buffer, this.position, numBytes); this.position += numBytes; } http://git-wip-us.apache.org/repos/asf/flink/blob/b62808ed/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java index 7572408..3b9bb8e 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java @@ -22,12 +22,10 @@ import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; - import org.junit.Assert; import org.junit.Test; @@ -176,7 +174,7 @@ public class KryoClearedBufferTest { byte[] tempBuffer = new byte[numBytes]; - source.read(tempBuffer); + source.readFully(tempBuffer); System.arraycopy(tempBuffer, 0, buffer, position, numBytes); http://git-wip-us.apache.org/repos/asf/flink/blob/b62808ed/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java index 18940ed..4f1cf77 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java @@ -324,7 +324,7 @@ public class DataOutputSerializer implements DataOutputView { throw new EOFException("Could not write " + numBytes + " bytes. Buffer overflow."); } - source.read(this.buffer, this.position, numBytes); + source.readFully(this.buffer, this.position, numBytes); this.position += numBytes; } http://git-wip-us.apache.org/repos/asf/flink/blob/b62808ed/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java index 4871c26..b2f70ae 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java @@ -505,7 +505,7 @@ public class RecordWriterTest { @Override public void read(DataInputView in) throws IOException { - in.read(bytes); + in.readFully(bytes); } } http://git-wip-us.apache.org/repos/asf/flink/blob/b62808ed/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java index 286477a..9fcb7fe 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java @@ -58,7 +58,7 @@ public class TestEvent extends AbstractEvent { public void read(DataInputView in) throws IOException { this.magicNumber = in.readLong(); this.payload = new byte[in.readInt()]; - in.read(this.payload); + in.readFully(this.payload); } // ------------------------------------------------------------------------
