[FLINK-1409] Let CoRecordReader subscribe to buffer reader notifications

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/95fece85
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/95fece85
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/95fece85

Branch: refs/heads/master
Commit: 95fece85d086c9a911fd5467ef1091308348cecb
Parents: c9709a8
Author: Ufuk Celebi <[email protected]>
Authored: Mon Jan 19 14:34:37 2015 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Mon Jan 19 15:00:48 2015 +0100

----------------------------------------------------------------------
 .../api/streamvertex/CoStreamVertex.java        |  28 +-
 .../flink/streaming/io/CoRecordReader.java      | 354 ++++++++++++-------
 2 files changed, 228 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/95fece85/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
index a0313d5..f065d9c 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
@@ -18,7 +18,7 @@
 package org.apache.flink.streaming.api.streamvertex;
 
 import org.apache.flink.runtime.io.network.api.reader.BufferReader;
-import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
+import org.apache.flink.runtime.io.network.api.reader.BufferReaderBase;
 import org.apache.flink.runtime.io.network.api.reader.UnionBufferReader;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
@@ -94,27 +94,13 @@ public class CoStreamVertex<IN1, IN2, OUT> extends 
StreamVertex<IN1, OUT> {
                        }
                }
 
-               MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>> 
reader1;
-               if (inputList1.size() == 1) {
-                       reader1 = new 
MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>>(inputList1.get(0));
-               }
-               else if (inputList1.size() > 1) {
-                       reader1 = new 
MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>>(new 
UnionBufferReader(inputList1.toArray(new BufferReader[inputList1.size()])));
-               }
-               else {
-                       throw new IllegalStateException("Illegal input size for 
first input.");
-               }
+               final BufferReaderBase reader1 = inputList1.size() == 1
+                               ? inputList1.get(0)
+                               : new UnionBufferReader(inputList1.toArray(new 
BufferReader[inputList1.size()]));
 
-               MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>> 
reader2;
-               if (inputList2.size() == 1) {
-                       reader2 = new 
MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>>(inputList2.get(0));
-               }
-               else if (inputList2.size() > 1) {
-                       reader2 = new 
MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>>(new 
UnionBufferReader(inputList2.toArray(new BufferReader[inputList2.size()])));
-               }
-               else {
-                       throw new IllegalStateException("Illegal input size for 
first input.");
-               }
+               final BufferReaderBase reader2 = inputList2.size() == 1
+                               ? inputList2.get(0)
+                               : new UnionBufferReader(inputList2.toArray(new 
BufferReader[inputList2.size()]));
 
                coReader = new 
CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, 
DeserializationDelegate<StreamRecord<IN2>>>(reader1, reader2);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/95fece85/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
index fdb6da8..0b1b373 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
@@ -1,133 +1,221 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.event.task.TaskEvent;
-import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
-import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
-import org.apache.flink.runtime.util.event.EventListener;
-
-import java.io.IOException;
-
-/**
- * A CoRecordReader wraps {@link MutableRecordReader}s of two different input
- * types to read records effectively.
- */
-@SuppressWarnings("rawtypes")
-public class CoRecordReader<T1 extends IOReadableWritable, T2 extends 
IOReadableWritable> implements ReaderBase {
-
-       /**
-        * Readers for the two input types
-        */
-       private MutableRecordReader<T1> reader1;
-       private MutableRecordReader<T2> reader2;
-
-       private boolean finishedReader1 = false;
-
-       private boolean finishedReader2 = false;
-
-       private boolean endOfSuperstepReader1 = false;
-
-       private boolean endOfSuperstepReader2 = false;
-
-       public CoRecordReader(MutableRecordReader<T1> reader1, 
MutableRecordReader<T2> reader2) {
-               this.reader1 = reader1;
-               this.reader2 = reader2;
-       }
-
-       @SuppressWarnings("unchecked")
-       protected int getNextRecord(T1 target1, T2 target2) throws IOException, 
InterruptedException {
-               do {
-                       if (finishedReader1 && finishedReader2) {
-                               return 0;
-                       }
-
-                       if (endOfSuperstepReader1 && endOfSuperstepReader2) {
-                               endOfSuperstepReader1 = false;
-                               endOfSuperstepReader2 = false;
-
-                               return 0;
-                       }
-
-                       if (!finishedReader1 && !endOfSuperstepReader1) {
-                               if (reader1.next(target1)) {
-                                       return 1;
-                               }
-                               else if (reader1.isFinished()) {
-                                       finishedReader1 = true;
-                               }
-                               else if (reader1.hasReachedEndOfSuperstep()) {
-                                       endOfSuperstepReader1 = true;
-                               }
-                               else {
-                                       throw new IOException("Unexpected 
return value from reader.");
-                               }
-                       }
-
-                       if (!finishedReader2 && !endOfSuperstepReader2) {
-                               if (reader2.next(target2)) {
-                                       return 2;
-                               }
-                               else if (reader2.isFinished()) {
-                                       finishedReader2 = true;
-                               }
-                               else if (reader2.hasReachedEndOfSuperstep()) {
-                                       endOfSuperstepReader2 = true;
-                               }
-                               else {
-                                       throw new IOException("Unexpected 
return value from reader.");
-                               }
-                       }
-               } while (true);
-       }
-
-       @Override
-       public boolean isFinished() {
-               return reader1.isFinished() && reader2.isFinished();
-       }
-
-       @Override
-       public void subscribeToTaskEvent(EventListener<TaskEvent> 
eventListener, Class<? extends TaskEvent> eventType) {
-               reader1.subscribeToTaskEvent(eventListener, eventType);
-               reader2.subscribeToTaskEvent(eventListener, eventType);
-       }
-
-       @Override
-       public void sendTaskEvent(TaskEvent event) throws IOException, 
InterruptedException {
-               reader1.sendTaskEvent(event);
-               reader2.sendTaskEvent(event);
-       }
-
-       @Override
-       public void setIterativeReader() {
-               reader1.setIterativeReader();
-               reader2.setIterativeReader();
-       }
-
-       @Override
-       public void startNextSuperstep() {
-               reader1.startNextSuperstep();
-               reader2.startNextSuperstep();
-       }
-
-       @Override
-       public boolean hasReachedEndOfSuperstep() {
-               return reader1.hasReachedEndOfSuperstep() && 
reader2.hasReachedEndOfSuperstep();
-       }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.network.api.reader.BufferReaderBase;
+import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
+import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
+import 
org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.util.event.EventListener;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * A CoRecordReader wraps {@link MutableRecordReader}s of two different input
+ * types to read records effectively.
+ */
+@SuppressWarnings("rawtypes")
+public class CoRecordReader<T1 extends IOReadableWritable, T2 extends 
IOReadableWritable> implements ReaderBase, EventListener<BufferReaderBase> {
+
+       private final BufferReaderBase bufferReader1;
+
+       private final BufferReaderBase bufferReader2;
+
+       private final BlockingQueue<Integer> availableRecordReaders = new 
LinkedBlockingQueue<Integer>();
+
+       private AdaptiveSpanningRecordDeserializer[] reader1RecordDeserializers;
+
+       private RecordDeserializer<T1> reader1currentRecordDeserializer;
+
+       private AdaptiveSpanningRecordDeserializer[] reader2RecordDeserializers;
+
+       private RecordDeserializer<T2> reader2currentRecordDeserializer;
+
+       // 0 => none, 1 => reader (T1), 2 => reader (T2)
+       private int currentReaderIndex;
+
+       private boolean hasRequestedPartitions;
+
+       public CoRecordReader(BufferReaderBase bufferReader1, BufferReaderBase 
bufferReader2) {
+               this.bufferReader1 = bufferReader1;
+               this.bufferReader2 = bufferReader2;
+
+               this.reader1RecordDeserializers = new 
AdaptiveSpanningRecordDeserializer[bufferReader1.getNumberOfInputChannels()];
+               this.reader2RecordDeserializers = new 
AdaptiveSpanningRecordDeserializer[bufferReader2.getNumberOfInputChannels()];
+
+               for (int i = 0; i < reader1RecordDeserializers.length; i++) {
+                       reader1RecordDeserializers[i] = new 
AdaptiveSpanningRecordDeserializer<T1>();
+               }
+
+               for (int i = 0; i < reader2RecordDeserializers.length; i++) {
+                       reader2RecordDeserializers[i] = new 
AdaptiveSpanningRecordDeserializer<T2>();
+               }
+
+               bufferReader1.subscribeToReader(this);
+               bufferReader2.subscribeToReader(this);
+       }
+
+       public void requestPartitionsOnce() throws IOException {
+               if (!hasRequestedPartitions) {
+                       bufferReader1.requestPartitionsOnce();
+                       bufferReader2.requestPartitionsOnce();
+
+                       hasRequestedPartitions = true;
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       protected int getNextRecord(T1 target1, T2 target2) throws IOException, 
InterruptedException {
+
+               requestPartitionsOnce();
+
+               while (true) {
+                       if (currentReaderIndex == 0) {
+                               if ((bufferReader1.isFinished() && 
bufferReader2.isFinished())) {
+                                       return 0;
+                               }
+
+                               currentReaderIndex = 
getNextReaderIndexBlocking();
+                       }
+
+                       if (currentReaderIndex == 1) {
+                               while (true) {
+                                       if (reader1currentRecordDeserializer != 
null) {
+                                               
RecordDeserializer.DeserializationResult result = 
reader1currentRecordDeserializer.getNextRecord(target1);
+
+                                               if (result.isBufferConsumed()) {
+                                                       
reader1currentRecordDeserializer.getCurrentBuffer().recycle();
+                                                       
reader1currentRecordDeserializer = null;
+
+                                                       currentReaderIndex = 0;
+                                               }
+
+                                               if (result.isFullRecord()) {
+                                                       return 1;
+                                               }
+                                       }
+
+                                       final Buffer nextBuffer = 
bufferReader1.getNextBufferBlocking();
+                                       final int channelIndex = 
bufferReader1.getChannelIndexOfLastBuffer();
+
+                                       if (nextBuffer == null) {
+                                               currentReaderIndex = 0;
+
+                                               break;
+                                       }
+
+                                       reader1currentRecordDeserializer = 
reader1RecordDeserializers[channelIndex];
+                                       
reader1currentRecordDeserializer.setNextBuffer(nextBuffer);
+                               }
+                       }
+                       else if (currentReaderIndex == 2) {
+                               while (true) {
+                                       if (reader2currentRecordDeserializer != 
null) {
+                                               
RecordDeserializer.DeserializationResult result = 
reader2currentRecordDeserializer.getNextRecord(target2);
+
+                                               if (result.isBufferConsumed()) {
+                                                       
reader2currentRecordDeserializer.getCurrentBuffer().recycle();
+                                                       
reader2currentRecordDeserializer = null;
+
+                                                       currentReaderIndex = 0;
+                                               }
+
+                                               if (result.isFullRecord()) {
+                                                       return 2;
+                                               }
+                                       }
+
+                                       final Buffer nextBuffer = 
bufferReader2.getNextBufferBlocking();
+                                       final int channelIndex = 
bufferReader2.getChannelIndexOfLastBuffer();
+
+                                       if (nextBuffer == null) {
+                                               currentReaderIndex = 0;
+
+                                               break;
+                                       }
+
+                                       reader2currentRecordDeserializer = 
reader2RecordDeserializers[channelIndex];
+                                       
reader2currentRecordDeserializer.setNextBuffer(nextBuffer);
+                               }
+                       }
+                       else {
+                               throw new IllegalStateException("Bug: 
unexpected current reader index.");
+                       }
+               }
+       }
+
+       private int getNextReaderIndexBlocking() throws InterruptedException {
+               return availableRecordReaders.take();
+       }
+
+       // 
------------------------------------------------------------------------
+       // Data availability notifications
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void onEvent(BufferReaderBase bufferReader) {
+               if (bufferReader == bufferReader1) {
+                       availableRecordReaders.add(1);
+               }
+               else if (bufferReader == bufferReader2) {
+                       availableRecordReaders.add(2);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public boolean isFinished() {
+               return bufferReader1.isFinished() && bufferReader2.isFinished();
+       }
+
+       @Override
+       public void subscribeToTaskEvent(EventListener<TaskEvent> 
eventListener, Class<? extends TaskEvent> eventType) {
+               bufferReader1.subscribeToTaskEvent(eventListener, eventType);
+               bufferReader2.subscribeToTaskEvent(eventListener, eventType);
+       }
+
+       @Override
+       public void sendTaskEvent(TaskEvent event) throws IOException, 
InterruptedException {
+               bufferReader1.sendTaskEvent(event);
+               bufferReader2.sendTaskEvent(event);
+       }
+
+       @Override
+       public void setIterativeReader() {
+               bufferReader1.setIterativeReader();
+               bufferReader2.setIterativeReader();
+       }
+
+       @Override
+       public void startNextSuperstep() {
+               bufferReader1.startNextSuperstep();
+               bufferReader2.startNextSuperstep();
+       }
+
+       @Override
+       public boolean hasReachedEndOfSuperstep() {
+               return bufferReader1.hasReachedEndOfSuperstep() && 
bufferReader2.hasReachedEndOfSuperstep();
+       }
+}

Reply via email to