[Distributed runtime] Refactor data availability listener of UnionBufferReader 
to increase readability


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fb9f5620
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fb9f5620
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fb9f5620

Branch: refs/heads/master
Commit: fb9f5620e262faebababb77ce1c73b6e0cc0d852
Parents: 6cc3583
Author: Ufuk Celebi <[email protected]>
Authored: Fri Jan 16 11:21:47 2015 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Mon Jan 19 14:44:57 2015 +0100

----------------------------------------------------------------------
 .../network/api/reader/UnionBufferReader.java   | 47 +++++++++++++-------
 1 file changed, 32 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fb9f5620/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java
index b5cec0b..e1c03cc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java
@@ -38,11 +38,11 @@ import static 
com.google.common.base.Preconditions.checkState;
  * A buffer-oriented reader, which unions multiple {@link BufferReader}
  * instances.
  */
-public class UnionBufferReader implements BufferReaderBase, 
EventListener<BufferReader> {
+public class UnionBufferReader implements BufferReaderBase {
 
        private final BufferReader[] readers;
 
-       private final BlockingQueue<BufferReader> readersWithData = new 
LinkedBlockingQueue<BufferReader>();
+       private final DataAvailabilityListener readerListener = new 
DataAvailabilityListener();
 
        // Set of readers, which are not closed yet
        private final Set<BufferReader> remainingReaders;
@@ -76,7 +76,7 @@ public class UnionBufferReader implements BufferReaderBase, 
EventListener<Buffer
                for (int i = 0; i < readers.length; i++) {
                        BufferReader reader = readers[i];
 
-                       reader.subscribeToReader(this);
+                       reader.subscribeToReader(readerListener);
 
                        remainingReaders.add(reader);
                        readerToIndexOffsetMap.put(reader, 
currentChannelIndexOffset);
@@ -100,7 +100,7 @@ public class UnionBufferReader implements BufferReaderBase, 
EventListener<Buffer
                        if (currentReader == null) {
                                // Finished when all readers are finished
                                if (isFinished()) {
-                                       readersWithData.clear();
+                                       readerListener.clear();
                                        return null;
                                }
                                // Finished with superstep when all readers 
finished superstep
@@ -110,7 +110,7 @@ public class UnionBufferReader implements BufferReaderBase, 
EventListener<Buffer
                                }
                                else {
                                        while (true) {
-                                               currentReader = 
readersWithData.take();
+                                               currentReader = 
readerListener.getNextReaderBlocking();
                                                currentReaderChannelIndexOffset 
= readerToIndexOffsetMap.get(currentReader);
 
                                                if (isIterative && 
!remainingReaders.contains(currentReader)) {
@@ -118,7 +118,7 @@ public class UnionBufferReader implements BufferReaderBase, 
EventListener<Buffer
                                                        // of superstep event 
and notified the union reader
                                                        // about newer data 
*before* all other readers have
                                                        // done so, we delay 
this notifications.
-                                                       
readersWithData.add(currentReader);
+                                                       
readerListener.addReader(currentReader);
                                                }
                                                else {
                                                        break;
@@ -188,15 +188,6 @@ public class UnionBufferReader implements 
BufferReaderBase, EventListener<Buffer
        }
 
        // 
------------------------------------------------------------------------
-       // Notifications about available data
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void onEvent(BufferReader readerWithData) {
-               readersWithData.add(readerWithData);
-       }
-
-       // 
------------------------------------------------------------------------
        // TaskEvents
        // 
------------------------------------------------------------------------
 
@@ -244,4 +235,30 @@ public class UnionBufferReader implements 
BufferReaderBase, EventListener<Buffer
 
                return true;
        }
+
+       // 
------------------------------------------------------------------------
+       // Data availability notifications
+       // 
------------------------------------------------------------------------
+
+       private static class DataAvailabilityListener implements 
EventListener<BufferReader> {
+
+               private final BlockingQueue<BufferReader> readersWithData = new 
LinkedBlockingQueue<BufferReader>();
+
+               @Override
+               public void onEvent(BufferReader reader) {
+                       readersWithData.add(reader);
+               }
+
+               BufferReader getNextReaderBlocking() throws 
InterruptedException {
+                       return readersWithData.take();
+               }
+
+               void addReader(BufferReader reader) {
+                       readersWithData.add(reader);
+               }
+
+               void clear() {
+                       readersWithData.clear();
+               }
+       }
 }

Reply via email to