Repository: flink
Updated Branches:
  refs/heads/master 6cc35837f -> 95fece85d


[Distributed runtime] Rename read method of BufferReader to indicate blocking 
behaviour


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9709a84
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9709a84
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9709a84

Branch: refs/heads/master
Commit: c9709a849837ea4c2f0de48fb855d9e7abd05ea1
Parents: d1cc30d
Author: Ufuk Celebi <[email protected]>
Authored: Mon Jan 19 12:21:15 2015 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Mon Jan 19 14:44:57 2015 +0100

----------------------------------------------------------------------
 .../io/network/api/reader/AbstractRecordReader.java       |  2 +-
 .../flink/runtime/io/network/api/reader/BufferReader.java |  2 +-
 .../runtime/io/network/api/reader/BufferReaderBase.java   | 10 +++++-----
 .../runtime/io/network/api/reader/UnionBufferReader.java  |  4 ++--
 .../runtime/io/network/api/reader/BufferReaderTest.java   |  4 ++--
 5 files changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c9709a84/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index cf4c302..15b8dcc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -75,7 +75,7 @@ abstract class AbstractRecordReader<T extends 
IOReadableWritable> implements Rea
                                }
                        }
 
-                       final Buffer nextBuffer = reader.getNextBuffer();
+                       final Buffer nextBuffer = 
reader.getNextBufferBlocking();
                        final int channelIndex = 
reader.getChannelIndexOfLastBuffer();
 
                        if (nextBuffer == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c9709a84/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
index cb1cf5e..91784f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
@@ -230,7 +230,7 @@ public final class BufferReader implements BufferReaderBase 
{
        }
 
        @Override
-       public Buffer getNextBuffer() throws IOException, InterruptedException {
+       public Buffer getNextBufferBlocking() throws IOException, 
InterruptedException {
                requestPartitionsOnce();
 
                while (true) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c9709a84/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java
index 863ef77..d1dbefd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java
@@ -30,9 +30,9 @@ import java.io.IOException;
  * <p>
  * {@link BufferReaderBase} is the runtime API for consuming results. Events
  * are handled by the reader and users can query for buffers with
- * {@link #getNextBuffer()} or {@link #getNextBuffer(Buffer)}.
+ * {@link #getNextBufferBlocking()} or {@link #getNextBuffer(Buffer)}.
  * <p>
- * <strong>Important</strong>: If {@link #getNextBuffer()} is used, it is
+ * <strong>Important</strong>: If {@link #getNextBufferBlocking()} is used, it 
is
  * necessary to release the returned buffers with {@link Buffer#recycle()}
  * after they are consumed.
  */
@@ -50,10 +50,10 @@ public interface BufferReaderBase extends ReaderBase {
         *
         * @see #getChannelIndexOfLastBuffer()
         */
-       Buffer getNextBuffer() throws IOException, InterruptedException;
+       Buffer getNextBufferBlocking() throws IOException, InterruptedException;
 
        /**
-        * {@link #getNextBuffer()} requires the user to quickly recycle the
+        * {@link #getNextBufferBlocking()} requires the user to quickly 
recycle the
         * returned buffer. For a fully buffer-oriented runtime, we need to
         * support a variant of this method, which allows buffers to be 
exchanged
         * in order to save unnecessary memory copies between buffer pools.
@@ -66,7 +66,7 @@ public interface BufferReaderBase extends ReaderBase {
 
        /**
         * Returns a channel index for the last {@link Buffer} instance 
returned by
-        * {@link #getNextBuffer()} or {@link #getNextBuffer(Buffer)}.
+        * {@link #getNextBufferBlocking()} or {@link #getNextBuffer(Buffer)}.
         * <p>
         * The returned index is guaranteed to be the same for all buffers read 
by
         * the same {@link RemoteInputChannel} instance. This is useful when 
data spans

http://git-wip-us.apache.org/repos/asf/flink/blob/c9709a84/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java
index 75348e6..241e212 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java
@@ -101,7 +101,7 @@ public class UnionBufferReader implements BufferReaderBase {
 
 
        @Override
-       public Buffer getNextBuffer() throws IOException, InterruptedException {
+       public Buffer getNextBufferBlocking() throws IOException, 
InterruptedException {
                requestPartitionsOnce();
 
                do {
@@ -135,7 +135,7 @@ public class UnionBufferReader implements BufferReaderBase {
                                }
                        }
 
-                       Buffer buffer = currentReader.getNextBuffer();
+                       Buffer buffer = currentReader.getNextBufferBlocking();
                        channelIndexOfLastReadBuffer = 
currentReaderChannelIndexOffset + currentReader.getChannelIndexOfLastBuffer();
 
                        isTaskEvent = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/c9709a84/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
index e933992..0b561d2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
@@ -109,7 +109,7 @@ public class BufferReaderTest {
                final BufferReader reader = mockReader.getMock();
 
                // Should throw Exception, because it's a non-iterative reader
-               reader.getNextBuffer();
+               reader.getNextBufferBlocking();
        }
 
        // 
------------------------------------------------------------------------
@@ -128,7 +128,7 @@ public class BufferReaderTest {
 
                while (true) {
                        Buffer buffer;
-                       while ((buffer = reader.getNextBuffer()) != null) {
+                       while ((buffer = reader.getNextBufferBlocking()) != 
null) {
                                buffer.recycle();
 
                                numReadBuffers++;

Reply via email to