[Distributed runtime] Allow recursive union of buffer readers

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d1cc30da
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d1cc30da
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d1cc30da

Branch: refs/heads/master
Commit: d1cc30da3e2a796590139da942015620c6035ddd
Parents: fb9f562
Author: Ufuk Celebi <[email protected]>
Authored: Fri Jan 16 17:32:04 2015 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Mon Jan 19 14:44:57 2015 +0100

----------------------------------------------------------------------
 .../io/network/api/reader/BufferReader.java     |  8 +-
 .../io/network/api/reader/BufferReaderBase.java |  6 ++
 .../network/api/reader/UnionBufferReader.java   | 86 ++++++++++++++------
 .../api/reader/UnionBufferReaderTest.java       | 27 ++++++
 4 files changed, 98 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d1cc30da/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
index 1df7216..cb1cf5e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
@@ -89,7 +89,7 @@ public final class BufferReader implements BufferReaderBase {
 
        private final BlockingQueue<InputChannel> inputChannelsWithData = new 
LinkedBlockingQueue<InputChannel>();
 
-       private final AtomicReference<EventListener<BufferReader>> 
readerListener = new AtomicReference<EventListener<BufferReader>>(null);
+       private final AtomicReference<EventListener<BufferReaderBase>> 
readerListener = new AtomicReference<EventListener<BufferReaderBase>>(null);
 
        // 
------------------------------------------------------------------------
 
@@ -211,7 +211,8 @@ public final class BufferReader implements BufferReaderBase 
{
        // Consume
        // 
------------------------------------------------------------------------
 
-       void requestPartitionsOnce() throws IOException {
+       @Override
+       public void requestPartitionsOnce() throws IOException {
                if (!hasRequestedPartitions) {
                        // Sanity check
                        if (totalNumberOfInputChannels != inputChannels.size()) 
{
@@ -367,7 +368,8 @@ public final class BufferReader implements BufferReaderBase 
{
                }
        }
 
-       void subscribeToReader(EventListener<BufferReader> listener) {
+       @Override
+       public void subscribeToReader(EventListener<BufferReaderBase> listener) 
{
                if (!this.readerListener.compareAndSet(null, listener)) {
                        throw new IllegalStateException(listener + " is already 
registered as a record availability listener");
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/d1cc30da/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java
index 04fae71..863ef77 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.api.reader;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.util.event.EventListener;
 
 import java.io.IOException;
 
@@ -83,4 +84,9 @@ public interface BufferReaderBase extends ReaderBase {
        int getNumberOfInputChannels();
 
        boolean isTaskEvent();
+
+       void subscribeToReader(EventListener<BufferReaderBase> listener);
+
+       void requestPartitionsOnce() throws IOException;
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d1cc30da/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java
index e1c03cc..75348e6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java
@@ -40,19 +40,19 @@ import static 
com.google.common.base.Preconditions.checkState;
  */
 public class UnionBufferReader implements BufferReaderBase {
 
-       private final BufferReader[] readers;
+       private final BufferReaderBase[] readers;
 
-       private final DataAvailabilityListener readerListener = new 
DataAvailabilityListener();
+       private final DataAvailabilityListener dataAvailabilityListener;
 
        // Set of readers, which are not closed yet
-       private final Set<BufferReader> remainingReaders;
+       private final Set<BufferReaderBase> remainingReaders;
 
        // Logical channel index offset for each reader
-       private final Map<BufferReader, Integer> readerToIndexOffsetMap = new 
HashMap<BufferReader, Integer>();
+       private final Map<BufferReaderBase, Integer> readerToIndexOffsetMap = 
new HashMap<BufferReaderBase, Integer>();
 
        private int totalNumInputChannels;
 
-       private BufferReader currentReader;
+       private BufferReaderBase currentReader;
 
        private int currentReaderChannelIndexOffset;
 
@@ -64,19 +64,21 @@ public class UnionBufferReader implements BufferReaderBase {
 
        private boolean isTaskEvent;
 
-       public UnionBufferReader(BufferReader... readers) {
+       public UnionBufferReader(BufferReaderBase... readers) {
                checkNotNull(readers);
                checkArgument(readers.length >= 2, "Union buffer reader must be 
initialized with at least two individual buffer readers");
 
                this.readers = readers;
-               this.remainingReaders = new 
HashSet<BufferReader>(readers.length + 1, 1.0F);
+               this.remainingReaders = new 
HashSet<BufferReaderBase>(readers.length + 1, 1.0F);
+
+               this.dataAvailabilityListener = new 
DataAvailabilityListener(this);
 
                int currentChannelIndexOffset = 0;
 
                for (int i = 0; i < readers.length; i++) {
-                       BufferReader reader = readers[i];
+                       BufferReaderBase reader = readers[i];
 
-                       reader.subscribeToReader(readerListener);
+                       reader.subscribeToReader(dataAvailabilityListener);
 
                        remainingReaders.add(reader);
                        readerToIndexOffsetMap.put(reader, 
currentChannelIndexOffset);
@@ -87,20 +89,26 @@ public class UnionBufferReader implements BufferReaderBase {
        }
 
        @Override
-       public Buffer getNextBuffer() throws IOException, InterruptedException {
+       public void requestPartitionsOnce() throws IOException {
                if (!hasRequestedPartitions) {
-                       for (BufferReader reader : readers) {
+                       for (BufferReaderBase reader : readers) {
                                reader.requestPartitionsOnce();
                        }
 
                        hasRequestedPartitions = true;
                }
+       }
+
+
+       @Override
+       public Buffer getNextBuffer() throws IOException, InterruptedException {
+               requestPartitionsOnce();
 
                do {
                        if (currentReader == null) {
                                // Finished when all readers are finished
                                if (isFinished()) {
-                                       readerListener.clear();
+                                       dataAvailabilityListener.clear();
                                        return null;
                                }
                                // Finished with superstep when all readers 
finished superstep
@@ -110,7 +118,7 @@ public class UnionBufferReader implements BufferReaderBase {
                                }
                                else {
                                        while (true) {
-                                               currentReader = 
readerListener.getNextReaderBlocking();
+                                               currentReader = 
dataAvailabilityListener.getNextReaderBlocking();
                                                currentReaderChannelIndexOffset 
= readerToIndexOffsetMap.get(currentReader);
 
                                                if (isIterative && 
!remainingReaders.contains(currentReader)) {
@@ -118,7 +126,7 @@ public class UnionBufferReader implements BufferReaderBase {
                                                        // of superstep event 
and notified the union reader
                                                        // about newer data 
*before* all other readers have
                                                        // done so, we delay 
this notifications.
-                                                       
readerListener.addReader(currentReader);
+                                                       
dataAvailabilityListener.addReader(currentReader);
                                                }
                                                else {
                                                        break;
@@ -169,8 +177,13 @@ public class UnionBufferReader implements BufferReaderBase 
{
        }
 
        @Override
+       public void subscribeToReader(EventListener<BufferReaderBase> listener) 
{
+               dataAvailabilityListener.registerListener(listener);
+       }
+
+       @Override
        public boolean isFinished() {
-               for (BufferReader reader : readers) {
+               for (BufferReaderBase reader : readers) {
                        if (!reader.isFinished()) {
                                return false;
                        }
@@ -182,7 +195,7 @@ public class UnionBufferReader implements BufferReaderBase {
        private void resetRemainingReaders() {
                checkState(isIterative, "Tried to reset remaining reader with 
non-iterative reader.");
                checkState(remainingReaders.isEmpty(), "Tried to reset 
remaining readers, but there are some remaining readers.");
-               for (BufferReader reader : readers) {
+               for (BufferReaderBase reader : readers) {
                        remainingReaders.add(reader);
                }
        }
@@ -193,14 +206,14 @@ public class UnionBufferReader implements 
BufferReaderBase {
 
        @Override
        public void subscribeToTaskEvent(EventListener<TaskEvent> 
eventListener, Class<? extends TaskEvent> eventType) {
-               for (BufferReader reader : readers) {
+               for (BufferReaderBase reader : readers) {
                        reader.subscribeToTaskEvent(eventListener, eventType);
                }
        }
 
        @Override
        public void sendTaskEvent(TaskEvent event) throws IOException, 
InterruptedException {
-               for (BufferReader reader : readers) {
+               for (BufferReaderBase reader : readers) {
                        reader.sendTaskEvent(event);
                }
        }
@@ -213,21 +226,21 @@ public class UnionBufferReader implements 
BufferReaderBase {
        public void setIterativeReader() {
                isIterative = true;
 
-               for (BufferReader reader : readers) {
+               for (BufferReaderBase reader : readers) {
                        reader.setIterativeReader();
                }
        }
 
        @Override
        public void startNextSuperstep() {
-               for (BufferReader reader : readers) {
+               for (BufferReaderBase reader : readers) {
                        reader.startNextSuperstep();
                }
        }
 
        @Override
        public boolean hasReachedEndOfSuperstep() {
-               for (BufferReader reader : readers) {
+               for (BufferReaderBase reader : readers) {
                        if (!reader.hasReachedEndOfSuperstep()) {
                                return false;
                        }
@@ -240,25 +253,46 @@ public class UnionBufferReader implements 
BufferReaderBase {
        // Data availability notifications
        // 
------------------------------------------------------------------------
 
-       private static class DataAvailabilityListener implements 
EventListener<BufferReader> {
+       private static class DataAvailabilityListener implements 
EventListener<BufferReaderBase> {
+
+               private final UnionBufferReader unionReader;
 
-               private final BlockingQueue<BufferReader> readersWithData = new 
LinkedBlockingQueue<BufferReader>();
+               private final BlockingQueue<BufferReaderBase> readersWithData = 
new LinkedBlockingQueue<BufferReaderBase>();
+
+               private volatile EventListener<BufferReaderBase> 
registeredListener;
+
+               private DataAvailabilityListener(UnionBufferReader unionReader) 
{
+                       this.unionReader = unionReader;
+               }
 
                @Override
-               public void onEvent(BufferReader reader) {
+               public void onEvent(BufferReaderBase reader) {
                        readersWithData.add(reader);
+
+                       if (registeredListener != null) {
+                               registeredListener.onEvent(unionReader);
+                       }
                }
 
-               BufferReader getNextReaderBlocking() throws 
InterruptedException {
+               BufferReaderBase getNextReaderBlocking() throws 
InterruptedException {
                        return readersWithData.take();
                }
 
-               void addReader(BufferReader reader) {
+               void addReader(BufferReaderBase reader) {
                        readersWithData.add(reader);
                }
 
                void clear() {
                        readersWithData.clear();
                }
+
+               void registerListener(EventListener<BufferReaderBase> listener) 
{
+                       if (registeredListener == null) {
+                               registeredListener = listener;
+                       }
+                       else {
+                               throw new IllegalStateException("Already 
registered listener.");
+                       }
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d1cc30da/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReaderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReaderTest.java
index 8871d4e..a105557 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReaderTest.java
@@ -109,4 +109,31 @@ public class UnionBufferReaderTest {
 
                verifyListenerCalled(listener, 8);
        }
+
+       @Test
+       public void testGetNextBufferUnionOfUnionReader() throws Exception {
+               final MockBufferReader reader1 = new MockBufferReader();
+               final MockBufferReader reader2 = new MockBufferReader();
+
+               final UnionBufferReader unionReader = new 
UnionBufferReader(reader1.getMock(), reader2.getMock());
+
+               final MockBufferReader reader3 = new MockBufferReader();
+
+               final UnionBufferReader unionUnionReader = new 
UnionBufferReader(unionReader, reader3.getMock());
+
+               
reader1.readBuffer().readBuffer().readBuffer().readEvent().readEvent().readBuffer().finish();
+
+               
reader2.readEvent().readBuffer().readBuffer().readEvent().readBuffer().finish();
+
+               
reader3.readBuffer().readBuffer().readEvent().readEvent().finish();
+
+               // Task event listener to be notified...
+               final EventListener<TaskEvent> listener = 
mock(EventListener.class);
+               unionUnionReader.subscribeToTaskEvent(listener, 
TestTaskEvent.class);
+
+               // Consume the reader
+               consumeAndVerify(unionUnionReader, 9);
+
+               verifyListenerCalled(listener, 6);
+       }
 }

Reply via email to