[Distributed runtime] Allow recursive union of buffer readers
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d1cc30da Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d1cc30da Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d1cc30da Branch: refs/heads/master Commit: d1cc30da3e2a796590139da942015620c6035ddd Parents: fb9f562 Author: Ufuk Celebi <[email protected]> Authored: Fri Jan 16 17:32:04 2015 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Mon Jan 19 14:44:57 2015 +0100 ---------------------------------------------------------------------- .../io/network/api/reader/BufferReader.java | 8 +- .../io/network/api/reader/BufferReaderBase.java | 6 ++ .../network/api/reader/UnionBufferReader.java | 86 ++++++++++++++------ .../api/reader/UnionBufferReaderTest.java | 27 ++++++ 4 files changed, 98 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d1cc30da/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java index 1df7216..cb1cf5e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java @@ -89,7 +89,7 @@ public final class BufferReader implements BufferReaderBase { private final BlockingQueue<InputChannel> inputChannelsWithData = new LinkedBlockingQueue<InputChannel>(); - private final AtomicReference<EventListener<BufferReader>> readerListener = new AtomicReference<EventListener<BufferReader>>(null); + private final AtomicReference<EventListener<BufferReaderBase>> readerListener = new AtomicReference<EventListener<BufferReaderBase>>(null); // ------------------------------------------------------------------------ @@ -211,7 +211,8 @@ public final class BufferReader implements BufferReaderBase { // Consume // ------------------------------------------------------------------------ - void requestPartitionsOnce() throws IOException { + @Override + public void requestPartitionsOnce() throws IOException { if (!hasRequestedPartitions) { // Sanity check if (totalNumberOfInputChannels != inputChannels.size()) { @@ -367,7 +368,8 @@ public final class BufferReader implements BufferReaderBase { } } - void subscribeToReader(EventListener<BufferReader> listener) { + @Override + public void subscribeToReader(EventListener<BufferReaderBase> listener) { if (!this.readerListener.compareAndSet(null, listener)) { throw new IllegalStateException(listener + " is already registered as a record availability listener"); } http://git-wip-us.apache.org/repos/asf/flink/blob/d1cc30da/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java index 04fae71..863ef77 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.api.reader; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.util.event.EventListener; import java.io.IOException; @@ -83,4 +84,9 @@ public interface BufferReaderBase extends ReaderBase { int getNumberOfInputChannels(); boolean isTaskEvent(); + + void subscribeToReader(EventListener<BufferReaderBase> listener); + + void requestPartitionsOnce() throws IOException; + } http://git-wip-us.apache.org/repos/asf/flink/blob/d1cc30da/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java index e1c03cc..75348e6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java @@ -40,19 +40,19 @@ import static com.google.common.base.Preconditions.checkState; */ public class UnionBufferReader implements BufferReaderBase { - private final BufferReader[] readers; + private final BufferReaderBase[] readers; - private final DataAvailabilityListener readerListener = new DataAvailabilityListener(); + private final DataAvailabilityListener dataAvailabilityListener; // Set of readers, which are not closed yet - private final Set<BufferReader> remainingReaders; + private final Set<BufferReaderBase> remainingReaders; // Logical channel index offset for each reader - private final Map<BufferReader, Integer> readerToIndexOffsetMap = new HashMap<BufferReader, Integer>(); + private final Map<BufferReaderBase, Integer> readerToIndexOffsetMap = new HashMap<BufferReaderBase, Integer>(); private int totalNumInputChannels; - private BufferReader currentReader; + private BufferReaderBase currentReader; private int currentReaderChannelIndexOffset; @@ -64,19 +64,21 @@ public class UnionBufferReader implements BufferReaderBase { private boolean isTaskEvent; - public UnionBufferReader(BufferReader... readers) { + public UnionBufferReader(BufferReaderBase... readers) { checkNotNull(readers); checkArgument(readers.length >= 2, "Union buffer reader must be initialized with at least two individual buffer readers"); this.readers = readers; - this.remainingReaders = new HashSet<BufferReader>(readers.length + 1, 1.0F); + this.remainingReaders = new HashSet<BufferReaderBase>(readers.length + 1, 1.0F); + + this.dataAvailabilityListener = new DataAvailabilityListener(this); int currentChannelIndexOffset = 0; for (int i = 0; i < readers.length; i++) { - BufferReader reader = readers[i]; + BufferReaderBase reader = readers[i]; - reader.subscribeToReader(readerListener); + reader.subscribeToReader(dataAvailabilityListener); remainingReaders.add(reader); readerToIndexOffsetMap.put(reader, currentChannelIndexOffset); @@ -87,20 +89,26 @@ public class UnionBufferReader implements BufferReaderBase { } @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { + public void requestPartitionsOnce() throws IOException { if (!hasRequestedPartitions) { - for (BufferReader reader : readers) { + for (BufferReaderBase reader : readers) { reader.requestPartitionsOnce(); } hasRequestedPartitions = true; } + } + + + @Override + public Buffer getNextBuffer() throws IOException, InterruptedException { + requestPartitionsOnce(); do { if (currentReader == null) { // Finished when all readers are finished if (isFinished()) { - readerListener.clear(); + dataAvailabilityListener.clear(); return null; } // Finished with superstep when all readers finished superstep @@ -110,7 +118,7 @@ public class UnionBufferReader implements BufferReaderBase { } else { while (true) { - currentReader = readerListener.getNextReaderBlocking(); + currentReader = dataAvailabilityListener.getNextReaderBlocking(); currentReaderChannelIndexOffset = readerToIndexOffsetMap.get(currentReader); if (isIterative && !remainingReaders.contains(currentReader)) { @@ -118,7 +126,7 @@ public class UnionBufferReader implements BufferReaderBase { // of superstep event and notified the union reader // about newer data *before* all other readers have // done so, we delay this notifications. - readerListener.addReader(currentReader); + dataAvailabilityListener.addReader(currentReader); } else { break; @@ -169,8 +177,13 @@ public class UnionBufferReader implements BufferReaderBase { } @Override + public void subscribeToReader(EventListener<BufferReaderBase> listener) { + dataAvailabilityListener.registerListener(listener); + } + + @Override public boolean isFinished() { - for (BufferReader reader : readers) { + for (BufferReaderBase reader : readers) { if (!reader.isFinished()) { return false; } @@ -182,7 +195,7 @@ public class UnionBufferReader implements BufferReaderBase { private void resetRemainingReaders() { checkState(isIterative, "Tried to reset remaining reader with non-iterative reader."); checkState(remainingReaders.isEmpty(), "Tried to reset remaining readers, but there are some remaining readers."); - for (BufferReader reader : readers) { + for (BufferReaderBase reader : readers) { remainingReaders.add(reader); } } @@ -193,14 +206,14 @@ public class UnionBufferReader implements BufferReaderBase { @Override public void subscribeToTaskEvent(EventListener<TaskEvent> eventListener, Class<? extends TaskEvent> eventType) { - for (BufferReader reader : readers) { + for (BufferReaderBase reader : readers) { reader.subscribeToTaskEvent(eventListener, eventType); } } @Override public void sendTaskEvent(TaskEvent event) throws IOException, InterruptedException { - for (BufferReader reader : readers) { + for (BufferReaderBase reader : readers) { reader.sendTaskEvent(event); } } @@ -213,21 +226,21 @@ public class UnionBufferReader implements BufferReaderBase { public void setIterativeReader() { isIterative = true; - for (BufferReader reader : readers) { + for (BufferReaderBase reader : readers) { reader.setIterativeReader(); } } @Override public void startNextSuperstep() { - for (BufferReader reader : readers) { + for (BufferReaderBase reader : readers) { reader.startNextSuperstep(); } } @Override public boolean hasReachedEndOfSuperstep() { - for (BufferReader reader : readers) { + for (BufferReaderBase reader : readers) { if (!reader.hasReachedEndOfSuperstep()) { return false; } @@ -240,25 +253,46 @@ public class UnionBufferReader implements BufferReaderBase { // Data availability notifications // ------------------------------------------------------------------------ - private static class DataAvailabilityListener implements EventListener<BufferReader> { + private static class DataAvailabilityListener implements EventListener<BufferReaderBase> { + + private final UnionBufferReader unionReader; - private final BlockingQueue<BufferReader> readersWithData = new LinkedBlockingQueue<BufferReader>(); + private final BlockingQueue<BufferReaderBase> readersWithData = new LinkedBlockingQueue<BufferReaderBase>(); + + private volatile EventListener<BufferReaderBase> registeredListener; + + private DataAvailabilityListener(UnionBufferReader unionReader) { + this.unionReader = unionReader; + } @Override - public void onEvent(BufferReader reader) { + public void onEvent(BufferReaderBase reader) { readersWithData.add(reader); + + if (registeredListener != null) { + registeredListener.onEvent(unionReader); + } } - BufferReader getNextReaderBlocking() throws InterruptedException { + BufferReaderBase getNextReaderBlocking() throws InterruptedException { return readersWithData.take(); } - void addReader(BufferReader reader) { + void addReader(BufferReaderBase reader) { readersWithData.add(reader); } void clear() { readersWithData.clear(); } + + void registerListener(EventListener<BufferReaderBase> listener) { + if (registeredListener == null) { + registeredListener = listener; + } + else { + throw new IllegalStateException("Already registered listener."); + } + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/d1cc30da/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReaderTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReaderTest.java index 8871d4e..a105557 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReaderTest.java @@ -109,4 +109,31 @@ public class UnionBufferReaderTest { verifyListenerCalled(listener, 8); } + + @Test + public void testGetNextBufferUnionOfUnionReader() throws Exception { + final MockBufferReader reader1 = new MockBufferReader(); + final MockBufferReader reader2 = new MockBufferReader(); + + final UnionBufferReader unionReader = new UnionBufferReader(reader1.getMock(), reader2.getMock()); + + final MockBufferReader reader3 = new MockBufferReader(); + + final UnionBufferReader unionUnionReader = new UnionBufferReader(unionReader, reader3.getMock()); + + reader1.readBuffer().readBuffer().readBuffer().readEvent().readEvent().readBuffer().finish(); + + reader2.readEvent().readBuffer().readBuffer().readEvent().readBuffer().finish(); + + reader3.readBuffer().readBuffer().readEvent().readEvent().finish(); + + // Task event listener to be notified... + final EventListener<TaskEvent> listener = mock(EventListener.class); + unionUnionReader.subscribeToTaskEvent(listener, TestTaskEvent.class); + + // Consume the reader + consumeAndVerify(unionUnionReader, 9); + + verifyListenerCalled(listener, 6); + } }
