Repository: flink Updated Branches: refs/heads/master 6cc35837f -> 95fece85d
[Distributed runtime] Rename read method of BufferReader to indicate blocking behaviour Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9709a84 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9709a84 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9709a84 Branch: refs/heads/master Commit: c9709a849837ea4c2f0de48fb855d9e7abd05ea1 Parents: d1cc30d Author: Ufuk Celebi <[email protected]> Authored: Mon Jan 19 12:21:15 2015 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Mon Jan 19 14:44:57 2015 +0100 ---------------------------------------------------------------------- .../io/network/api/reader/AbstractRecordReader.java | 2 +- .../flink/runtime/io/network/api/reader/BufferReader.java | 2 +- .../runtime/io/network/api/reader/BufferReaderBase.java | 10 +++++----- .../runtime/io/network/api/reader/UnionBufferReader.java | 4 ++-- .../runtime/io/network/api/reader/BufferReaderTest.java | 4 ++-- 5 files changed, 11 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c9709a84/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java index cf4c302..15b8dcc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java @@ -75,7 +75,7 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> implements Rea } } - final Buffer nextBuffer = reader.getNextBuffer(); + final Buffer nextBuffer = reader.getNextBufferBlocking(); final int channelIndex = reader.getChannelIndexOfLastBuffer(); if (nextBuffer == null) { http://git-wip-us.apache.org/repos/asf/flink/blob/c9709a84/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java index cb1cf5e..91784f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java @@ -230,7 +230,7 @@ public final class BufferReader implements BufferReaderBase { } @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { + public Buffer getNextBufferBlocking() throws IOException, InterruptedException { requestPartitionsOnce(); while (true) { http://git-wip-us.apache.org/repos/asf/flink/blob/c9709a84/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java index 863ef77..d1dbefd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java @@ -30,9 +30,9 @@ import java.io.IOException; * <p> * {@link BufferReaderBase} is the runtime API for consuming results. Events * are handled by the reader and users can query for buffers with - * {@link #getNextBuffer()} or {@link #getNextBuffer(Buffer)}. + * {@link #getNextBufferBlocking()} or {@link #getNextBuffer(Buffer)}. * <p> - * <strong>Important</strong>: If {@link #getNextBuffer()} is used, it is + * <strong>Important</strong>: If {@link #getNextBufferBlocking()} is used, it is * necessary to release the returned buffers with {@link Buffer#recycle()} * after they are consumed. */ @@ -50,10 +50,10 @@ public interface BufferReaderBase extends ReaderBase { * * @see #getChannelIndexOfLastBuffer() */ - Buffer getNextBuffer() throws IOException, InterruptedException; + Buffer getNextBufferBlocking() throws IOException, InterruptedException; /** - * {@link #getNextBuffer()} requires the user to quickly recycle the + * {@link #getNextBufferBlocking()} requires the user to quickly recycle the * returned buffer. For a fully buffer-oriented runtime, we need to * support a variant of this method, which allows buffers to be exchanged * in order to save unnecessary memory copies between buffer pools. @@ -66,7 +66,7 @@ public interface BufferReaderBase extends ReaderBase { /** * Returns a channel index for the last {@link Buffer} instance returned by - * {@link #getNextBuffer()} or {@link #getNextBuffer(Buffer)}. + * {@link #getNextBufferBlocking()} or {@link #getNextBuffer(Buffer)}. * <p> * The returned index is guaranteed to be the same for all buffers read by * the same {@link RemoteInputChannel} instance. This is useful when data spans http://git-wip-us.apache.org/repos/asf/flink/blob/c9709a84/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java index 75348e6..241e212 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java @@ -101,7 +101,7 @@ public class UnionBufferReader implements BufferReaderBase { @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { + public Buffer getNextBufferBlocking() throws IOException, InterruptedException { requestPartitionsOnce(); do { @@ -135,7 +135,7 @@ public class UnionBufferReader implements BufferReaderBase { } } - Buffer buffer = currentReader.getNextBuffer(); + Buffer buffer = currentReader.getNextBufferBlocking(); channelIndexOfLastReadBuffer = currentReaderChannelIndexOffset + currentReader.getChannelIndexOfLastBuffer(); isTaskEvent = false; http://git-wip-us.apache.org/repos/asf/flink/blob/c9709a84/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java index e933992..0b561d2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java @@ -109,7 +109,7 @@ public class BufferReaderTest { final BufferReader reader = mockReader.getMock(); // Should throw Exception, because it's a non-iterative reader - reader.getNextBuffer(); + reader.getNextBufferBlocking(); } // ------------------------------------------------------------------------ @@ -128,7 +128,7 @@ public class BufferReaderTest { while (true) { Buffer buffer; - while ((buffer = reader.getNextBuffer()) != null) { + while ((buffer = reader.getNextBufferBlocking()) != null) { buffer.recycle(); numReadBuffers++;
