[Distributed runtime] Refactor data availability listener of UnionBufferReader to increase readability
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fb9f5620 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fb9f5620 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fb9f5620 Branch: refs/heads/master Commit: fb9f5620e262faebababb77ce1c73b6e0cc0d852 Parents: 6cc3583 Author: Ufuk Celebi <[email protected]> Authored: Fri Jan 16 11:21:47 2015 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Mon Jan 19 14:44:57 2015 +0100 ---------------------------------------------------------------------- .../network/api/reader/UnionBufferReader.java | 47 +++++++++++++------- 1 file changed, 32 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fb9f5620/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java index b5cec0b..e1c03cc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java @@ -38,11 +38,11 @@ import static com.google.common.base.Preconditions.checkState; * A buffer-oriented reader, which unions multiple {@link BufferReader} * instances. */ -public class UnionBufferReader implements BufferReaderBase, EventListener<BufferReader> { +public class UnionBufferReader implements BufferReaderBase { private final BufferReader[] readers; - private final BlockingQueue<BufferReader> readersWithData = new LinkedBlockingQueue<BufferReader>(); + private final DataAvailabilityListener readerListener = new DataAvailabilityListener(); // Set of readers, which are not closed yet private final Set<BufferReader> remainingReaders; @@ -76,7 +76,7 @@ public class UnionBufferReader implements BufferReaderBase, EventListener<Buffer for (int i = 0; i < readers.length; i++) { BufferReader reader = readers[i]; - reader.subscribeToReader(this); + reader.subscribeToReader(readerListener); remainingReaders.add(reader); readerToIndexOffsetMap.put(reader, currentChannelIndexOffset); @@ -100,7 +100,7 @@ public class UnionBufferReader implements BufferReaderBase, EventListener<Buffer if (currentReader == null) { // Finished when all readers are finished if (isFinished()) { - readersWithData.clear(); + readerListener.clear(); return null; } // Finished with superstep when all readers finished superstep @@ -110,7 +110,7 @@ public class UnionBufferReader implements BufferReaderBase, EventListener<Buffer } else { while (true) { - currentReader = readersWithData.take(); + currentReader = readerListener.getNextReaderBlocking(); currentReaderChannelIndexOffset = readerToIndexOffsetMap.get(currentReader); if (isIterative && !remainingReaders.contains(currentReader)) { @@ -118,7 +118,7 @@ public class UnionBufferReader implements BufferReaderBase, EventListener<Buffer // of superstep event and notified the union reader // about newer data *before* all other readers have // done so, we delay this notifications. - readersWithData.add(currentReader); + readerListener.addReader(currentReader); } else { break; @@ -188,15 +188,6 @@ public class UnionBufferReader implements BufferReaderBase, EventListener<Buffer } // ------------------------------------------------------------------------ - // Notifications about available data - // ------------------------------------------------------------------------ - - @Override - public void onEvent(BufferReader readerWithData) { - readersWithData.add(readerWithData); - } - - // ------------------------------------------------------------------------ // TaskEvents // ------------------------------------------------------------------------ @@ -244,4 +235,30 @@ public class UnionBufferReader implements BufferReaderBase, EventListener<Buffer return true; } + + // ------------------------------------------------------------------------ + // Data availability notifications + // ------------------------------------------------------------------------ + + private static class DataAvailabilityListener implements EventListener<BufferReader> { + + private final BlockingQueue<BufferReader> readersWithData = new LinkedBlockingQueue<BufferReader>(); + + @Override + public void onEvent(BufferReader reader) { + readersWithData.add(reader); + } + + BufferReader getNextReaderBlocking() throws InterruptedException { + return readersWithData.take(); + } + + void addReader(BufferReader reader) { + readersWithData.add(reader); + } + + void clear() { + readersWithData.clear(); + } + } }
