[FLINK-1409] Let CoRecordReader subscribe to buffer reader notifications
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/95fece85 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/95fece85 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/95fece85 Branch: refs/heads/master Commit: 95fece85d086c9a911fd5467ef1091308348cecb Parents: c9709a8 Author: Ufuk Celebi <[email protected]> Authored: Mon Jan 19 14:34:37 2015 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Mon Jan 19 15:00:48 2015 +0100 ---------------------------------------------------------------------- .../api/streamvertex/CoStreamVertex.java | 28 +- .../flink/streaming/io/CoRecordReader.java | 354 ++++++++++++------- 2 files changed, 228 insertions(+), 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/95fece85/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java index a0313d5..f065d9c 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java @@ -18,7 +18,7 @@ package org.apache.flink.streaming.api.streamvertex; import org.apache.flink.runtime.io.network.api.reader.BufferReader; -import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader; +import org.apache.flink.runtime.io.network.api.reader.BufferReaderBase; import org.apache.flink.runtime.io.network.api.reader.UnionBufferReader; import org.apache.flink.runtime.plugable.DeserializationDelegate; import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable; @@ -94,27 +94,13 @@ public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> { } } - MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>> reader1; - if (inputList1.size() == 1) { - reader1 = new MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>>(inputList1.get(0)); - } - else if (inputList1.size() > 1) { - reader1 = new MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>>(new UnionBufferReader(inputList1.toArray(new BufferReader[inputList1.size()]))); - } - else { - throw new IllegalStateException("Illegal input size for first input."); - } + final BufferReaderBase reader1 = inputList1.size() == 1 + ? inputList1.get(0) + : new UnionBufferReader(inputList1.toArray(new BufferReader[inputList1.size()])); - MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>> reader2; - if (inputList2.size() == 1) { - reader2 = new MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>>(inputList2.get(0)); - } - else if (inputList2.size() > 1) { - reader2 = new MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>>(new UnionBufferReader(inputList2.toArray(new BufferReader[inputList2.size()]))); - } - else { - throw new IllegalStateException("Illegal input size for first input."); - } + final BufferReaderBase reader2 = inputList2.size() == 1 + ? inputList2.get(0) + : new UnionBufferReader(inputList2.toArray(new BufferReader[inputList2.size()])); coReader = new CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>>(reader1, reader2); } http://git-wip-us.apache.org/repos/asf/flink/blob/95fece85/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java index fdb6da8..0b1b373 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java @@ -1,133 +1,221 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.io; - -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.event.task.TaskEvent; -import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader; -import org.apache.flink.runtime.io.network.api.reader.ReaderBase; -import org.apache.flink.runtime.util.event.EventListener; - -import java.io.IOException; - -/** - * A CoRecordReader wraps {@link MutableRecordReader}s of two different input - * types to read records effectively. - */ -@SuppressWarnings("rawtypes") -public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadableWritable> implements ReaderBase { - - /** - * Readers for the two input types - */ - private MutableRecordReader<T1> reader1; - private MutableRecordReader<T2> reader2; - - private boolean finishedReader1 = false; - - private boolean finishedReader2 = false; - - private boolean endOfSuperstepReader1 = false; - - private boolean endOfSuperstepReader2 = false; - - public CoRecordReader(MutableRecordReader<T1> reader1, MutableRecordReader<T2> reader2) { - this.reader1 = reader1; - this.reader2 = reader2; - } - - @SuppressWarnings("unchecked") - protected int getNextRecord(T1 target1, T2 target2) throws IOException, InterruptedException { - do { - if (finishedReader1 && finishedReader2) { - return 0; - } - - if (endOfSuperstepReader1 && endOfSuperstepReader2) { - endOfSuperstepReader1 = false; - endOfSuperstepReader2 = false; - - return 0; - } - - if (!finishedReader1 && !endOfSuperstepReader1) { - if (reader1.next(target1)) { - return 1; - } - else if (reader1.isFinished()) { - finishedReader1 = true; - } - else if (reader1.hasReachedEndOfSuperstep()) { - endOfSuperstepReader1 = true; - } - else { - throw new IOException("Unexpected return value from reader."); - } - } - - if (!finishedReader2 && !endOfSuperstepReader2) { - if (reader2.next(target2)) { - return 2; - } - else if (reader2.isFinished()) { - finishedReader2 = true; - } - else if (reader2.hasReachedEndOfSuperstep()) { - endOfSuperstepReader2 = true; - } - else { - throw new IOException("Unexpected return value from reader."); - } - } - } while (true); - } - - @Override - public boolean isFinished() { - return reader1.isFinished() && reader2.isFinished(); - } - - @Override - public void subscribeToTaskEvent(EventListener<TaskEvent> eventListener, Class<? extends TaskEvent> eventType) { - reader1.subscribeToTaskEvent(eventListener, eventType); - reader2.subscribeToTaskEvent(eventListener, eventType); - } - - @Override - public void sendTaskEvent(TaskEvent event) throws IOException, InterruptedException { - reader1.sendTaskEvent(event); - reader2.sendTaskEvent(event); - } - - @Override - public void setIterativeReader() { - reader1.setIterativeReader(); - reader2.setIterativeReader(); - } - - @Override - public void startNextSuperstep() { - reader1.startNextSuperstep(); - reader2.startNextSuperstep(); - } - - @Override - public boolean hasReachedEndOfSuperstep() { - return reader1.hasReachedEndOfSuperstep() && reader2.hasReachedEndOfSuperstep(); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.io; + +import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.runtime.event.task.TaskEvent; +import org.apache.flink.runtime.io.network.api.reader.BufferReaderBase; +import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader; +import org.apache.flink.runtime.io.network.api.reader.ReaderBase; +import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.util.event.EventListener; + +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * A CoRecordReader wraps {@link MutableRecordReader}s of two different input + * types to read records effectively. + */ +@SuppressWarnings("rawtypes") +public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadableWritable> implements ReaderBase, EventListener<BufferReaderBase> { + + private final BufferReaderBase bufferReader1; + + private final BufferReaderBase bufferReader2; + + private final BlockingQueue<Integer> availableRecordReaders = new LinkedBlockingQueue<Integer>(); + + private AdaptiveSpanningRecordDeserializer[] reader1RecordDeserializers; + + private RecordDeserializer<T1> reader1currentRecordDeserializer; + + private AdaptiveSpanningRecordDeserializer[] reader2RecordDeserializers; + + private RecordDeserializer<T2> reader2currentRecordDeserializer; + + // 0 => none, 1 => reader (T1), 2 => reader (T2) + private int currentReaderIndex; + + private boolean hasRequestedPartitions; + + public CoRecordReader(BufferReaderBase bufferReader1, BufferReaderBase bufferReader2) { + this.bufferReader1 = bufferReader1; + this.bufferReader2 = bufferReader2; + + this.reader1RecordDeserializers = new AdaptiveSpanningRecordDeserializer[bufferReader1.getNumberOfInputChannels()]; + this.reader2RecordDeserializers = new AdaptiveSpanningRecordDeserializer[bufferReader2.getNumberOfInputChannels()]; + + for (int i = 0; i < reader1RecordDeserializers.length; i++) { + reader1RecordDeserializers[i] = new AdaptiveSpanningRecordDeserializer<T1>(); + } + + for (int i = 0; i < reader2RecordDeserializers.length; i++) { + reader2RecordDeserializers[i] = new AdaptiveSpanningRecordDeserializer<T2>(); + } + + bufferReader1.subscribeToReader(this); + bufferReader2.subscribeToReader(this); + } + + public void requestPartitionsOnce() throws IOException { + if (!hasRequestedPartitions) { + bufferReader1.requestPartitionsOnce(); + bufferReader2.requestPartitionsOnce(); + + hasRequestedPartitions = true; + } + } + + @SuppressWarnings("unchecked") + protected int getNextRecord(T1 target1, T2 target2) throws IOException, InterruptedException { + + requestPartitionsOnce(); + + while (true) { + if (currentReaderIndex == 0) { + if ((bufferReader1.isFinished() && bufferReader2.isFinished())) { + return 0; + } + + currentReaderIndex = getNextReaderIndexBlocking(); + } + + if (currentReaderIndex == 1) { + while (true) { + if (reader1currentRecordDeserializer != null) { + RecordDeserializer.DeserializationResult result = reader1currentRecordDeserializer.getNextRecord(target1); + + if (result.isBufferConsumed()) { + reader1currentRecordDeserializer.getCurrentBuffer().recycle(); + reader1currentRecordDeserializer = null; + + currentReaderIndex = 0; + } + + if (result.isFullRecord()) { + return 1; + } + } + + final Buffer nextBuffer = bufferReader1.getNextBufferBlocking(); + final int channelIndex = bufferReader1.getChannelIndexOfLastBuffer(); + + if (nextBuffer == null) { + currentReaderIndex = 0; + + break; + } + + reader1currentRecordDeserializer = reader1RecordDeserializers[channelIndex]; + reader1currentRecordDeserializer.setNextBuffer(nextBuffer); + } + } + else if (currentReaderIndex == 2) { + while (true) { + if (reader2currentRecordDeserializer != null) { + RecordDeserializer.DeserializationResult result = reader2currentRecordDeserializer.getNextRecord(target2); + + if (result.isBufferConsumed()) { + reader2currentRecordDeserializer.getCurrentBuffer().recycle(); + reader2currentRecordDeserializer = null; + + currentReaderIndex = 0; + } + + if (result.isFullRecord()) { + return 2; + } + } + + final Buffer nextBuffer = bufferReader2.getNextBufferBlocking(); + final int channelIndex = bufferReader2.getChannelIndexOfLastBuffer(); + + if (nextBuffer == null) { + currentReaderIndex = 0; + + break; + } + + reader2currentRecordDeserializer = reader2RecordDeserializers[channelIndex]; + reader2currentRecordDeserializer.setNextBuffer(nextBuffer); + } + } + else { + throw new IllegalStateException("Bug: unexpected current reader index."); + } + } + } + + private int getNextReaderIndexBlocking() throws InterruptedException { + return availableRecordReaders.take(); + } + + // ------------------------------------------------------------------------ + // Data availability notifications + // ------------------------------------------------------------------------ + + @Override + public void onEvent(BufferReaderBase bufferReader) { + if (bufferReader == bufferReader1) { + availableRecordReaders.add(1); + } + else if (bufferReader == bufferReader2) { + availableRecordReaders.add(2); + } + } + + // ------------------------------------------------------------------------ + + @Override + public boolean isFinished() { + return bufferReader1.isFinished() && bufferReader2.isFinished(); + } + + @Override + public void subscribeToTaskEvent(EventListener<TaskEvent> eventListener, Class<? extends TaskEvent> eventType) { + bufferReader1.subscribeToTaskEvent(eventListener, eventType); + bufferReader2.subscribeToTaskEvent(eventListener, eventType); + } + + @Override + public void sendTaskEvent(TaskEvent event) throws IOException, InterruptedException { + bufferReader1.sendTaskEvent(event); + bufferReader2.sendTaskEvent(event); + } + + @Override + public void setIterativeReader() { + bufferReader1.setIterativeReader(); + bufferReader2.setIterativeReader(); + } + + @Override + public void startNextSuperstep() { + bufferReader1.startNextSuperstep(); + bufferReader2.startNextSuperstep(); + } + + @Override + public boolean hasReachedEndOfSuperstep() { + return bufferReader1.hasReachedEndOfSuperstep() && bufferReader2.hasReachedEndOfSuperstep(); + } +}
