[FLINK-5169] [network] Make consumption of InputChannels fair
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6cfce175 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6cfce175 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6cfce175 Branch: refs/heads/release-1.1 Commit: 6cfce17588051281226de9fdff65dcd2476c1460 Parents: e2c53cf Author: Stephan Ewen <[email protected]> Authored: Mon Nov 28 09:59:29 2016 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Mon Nov 28 21:05:00 2016 +0100 ---------------------------------------------------------------------- .../io/network/api/reader/BufferReader.java | 56 --- .../io/network/netty/PartitionRequestQueue.java | 255 +++++------- .../netty/PartitionRequestServerHandler.java | 40 +- .../netty/SequenceNumberingViewReader.java | 130 +++++++ .../partition/BufferAvailabilityListener.java | 33 ++ .../partition/PipelinedSubpartition.java | 148 ++++--- .../partition/PipelinedSubpartitionView.java | 18 +- .../io/network/partition/ResultPartition.java | 9 +- .../partition/ResultPartitionManager.java | 5 +- .../partition/ResultPartitionProvider.java | 3 +- .../network/partition/ResultSubpartition.java | 6 +- .../partition/ResultSubpartitionView.java | 9 +- .../partition/SpillableSubpartition.java | 177 +++++---- .../partition/SpillableSubpartitionView.java | 210 +++++----- .../partition/SpilledSubpartitionView.java | 223 +++++++++++ .../SpilledSubpartitionViewAsyncIO.java | 383 ------------------- .../SpilledSubpartitionViewSyncIO.java | 196 ---------- .../partition/consumer/BufferOrEvent.java | 25 +- .../partition/consumer/InputChannel.java | 43 ++- .../network/partition/consumer/InputGate.java | 3 +- .../partition/consumer/InputGateListener.java | 35 ++ .../partition/consumer/LocalInputChannel.java | 210 +++++----- .../partition/consumer/RemoteInputChannel.java | 80 ++-- .../partition/consumer/SingleInputGate.java | 104 +++-- .../partition/consumer/UnionInputGate.java | 98 ++--- .../partition/consumer/UnknownInputChannel.java | 9 +- .../apache/flink/runtime/taskmanager/Task.java | 1 - 27 files changed, 1147 insertions(+), 1362 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java deleted file mode 100644 index debb352..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.api.reader; - -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; -import org.apache.flink.runtime.io.network.partition.consumer.InputGate; - -import java.io.IOException; - -/** - * A buffer-oriented reader. - */ -public final class BufferReader extends AbstractReader { - - public BufferReader(InputGate gate) { - super(gate); - } - - public Buffer getNextBuffer() throws IOException, InterruptedException { - while (true) { - final BufferOrEvent bufferOrEvent = inputGate.getNextBufferOrEvent(); - - if (bufferOrEvent.isBuffer()) { - return bufferOrEvent.getBuffer(); - } - else { - if (handleEvent(bufferOrEvent.getEvent())) { - return null; - } - } - } - } - - @Override - public void setReporter(AccumulatorRegistry.Reporter reporter) { - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java index 094c9c7..dc80675 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java @@ -27,10 +27,10 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; import org.apache.flink.runtime.io.network.partition.ProducerFailedException; -import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability; import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; -import org.apache.flink.runtime.util.event.NotificationListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,11 +39,10 @@ import java.util.ArrayDeque; import java.util.Queue; import java.util.Set; -import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; /** - * A queue of partition queues, which listens for channel writability changed + * A nonEmptyReader of partition queues, which listens for channel writability changed * events before writing and flushing {@link Buffer} instances. */ class PartitionRequestQueue extends ChannelInboundHandlerAdapter { @@ -52,12 +51,10 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter { private final ChannelFutureListener writeListener = new WriteAndFlushNextMessageIfPossibleListener(); - private final Queue<SequenceNumberingSubpartitionView> queue = new ArrayDeque<SequenceNumberingSubpartitionView>(); + private final Queue<SequenceNumberingViewReader> nonEmptyReader = new ArrayDeque<>(); private final Set<InputChannelID> released = Sets.newHashSet(); - private SequenceNumberingSubpartitionView currentPartitionQueue; - private boolean fatalError; private ChannelHandlerContext ctx; @@ -71,8 +68,22 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter { super.channelRegistered(ctx); } - public void enqueue(ResultSubpartitionView partitionQueue, InputChannelID receiverId) throws Exception { - ctx.pipeline().fireUserEventTriggered(new SequenceNumberingSubpartitionView(partitionQueue, receiverId)); + void notifyReaderNonEmpty(final SequenceNumberingViewReader reader) { + // The notification might come from the same thread. For the initial writes this + // might happen before the reader has set its reference to the view, because + // creating the queue and the initial notification happen in the same method call. + // This can be resolved by separating the creation of the view and allowing + // notifications. + + // TODO This could potentially have a bad performance impact as in the + // worst case (network consumes faster than the producer) each buffer + // will trigger a separate event loop task being scheduled. + ctx.executor().execute(new Runnable() { + @Override + public void run() { + ctx.pipeline().fireUserEventTriggered(reader); + } + }); } public void cancel(InputChannelID receiverId) { @@ -87,45 +98,37 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception { - if (msg.getClass() == SequenceNumberingSubpartitionView.class) { - boolean triggerWrite = queue.isEmpty(); - - queue.add((SequenceNumberingSubpartitionView) msg); - + // The user event triggered event loop callback is used for thread-safe + // hand over of reader queues and cancelled producers. + + if (msg.getClass() == SequenceNumberingViewReader.class) { + // Queue a non-empty reader for consumption. If the queue + // is empty, we try trigger the actual write. Otherwise this + // will be handled by the writeAndFlushIfPossible calls. + boolean triggerWrite = nonEmptyReader.isEmpty(); + nonEmptyReader.add((SequenceNumberingViewReader) msg); if (triggerWrite) { writeAndFlushNextMessageIfPossible(ctx.channel()); } - } - else if (msg.getClass() == InputChannelID.class) { + } else if (msg.getClass() == InputChannelID.class) { + // Release partition view that get a cancel request. InputChannelID toCancel = (InputChannelID) msg; - if (released.contains(toCancel)) { return; } // Cancel the request for the input channel - if (currentPartitionQueue != null && currentPartitionQueue.getReceiverId().equals(toCancel)) { - currentPartitionQueue.releaseAllResources(); - markAsReleased(currentPartitionQueue.receiverId); - currentPartitionQueue = null; - } - else { - int size = queue.size(); - - for (int i = 0; i < size; i++) { - SequenceNumberingSubpartitionView curr = queue.poll(); - - if (curr.getReceiverId().equals(toCancel)) { - curr.releaseAllResources(); - markAsReleased(curr.receiverId); - } - else { - queue.add(curr); - } + int size = nonEmptyReader.size(); + for (int i = 0; i < size; i++) { + SequenceNumberingViewReader reader = nonEmptyReader.poll(); + if (reader.getReceiverId().equals(toCancel)) { + reader.releaseAllResources(); + markAsReleased(reader.getReceiverId()); + } else { + nonEmptyReader.add(reader); } } - } - else { + } else { ctx.fireUserEventTriggered(msg); } } @@ -140,64 +143,84 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter { return; } - Buffer buffer = null; + // The logic here is very similar to the combined input gate and local + // input channel logic. You can think of this class acting as the input + // gate and the consumed views as the local input channels. + BufferAndAvailability next = null; try { if (channel.isWritable()) { while (true) { - if (currentPartitionQueue == null && (currentPartitionQueue = queue.poll()) == null) { + SequenceNumberingViewReader reader = nonEmptyReader.poll(); + + // No queue with available data. We allow this here, because + // of the write callbacks that are executed after each write. + if (reader == null) { return; } - buffer = currentPartitionQueue.getNextBuffer(); + next = reader.getNextBuffer(); - if (buffer == null) { - if (currentPartitionQueue.registerListener(null)) { - currentPartitionQueue = null; - } - else if (currentPartitionQueue.isReleased()) { - markAsReleased(currentPartitionQueue.getReceiverId()); - - Throwable cause = currentPartitionQueue.getFailureCause(); + if (next == null) { + if (reader.isReleased()) { + markAsReleased(reader.getReceiverId()); + Throwable cause = reader.getFailureCause(); if (cause != null) { - ctx.writeAndFlush(new NettyMessage.ErrorResponse( - new ProducerFailedException(cause), - currentPartitionQueue.receiverId)); - } + ErrorResponse msg = new ErrorResponse( + new ProducerFailedException(cause), + reader.getReceiverId()); - currentPartitionQueue = null; + ctx.writeAndFlush(msg); + } + } else { + IllegalStateException err = new IllegalStateException( + "Bug in Netty consumer logic: reader queue got notified by partition " + + "about available data, but none was available."); + handleException(ctx.channel(), err); + return; + } + } else { + // this channel was now removed from the non-empty reader queue + // we re-add it in case it has more data, because in that case no + // "non-empty" notification will come for that reader from the queue. + if (next.moreAvailable()) { + nonEmptyReader.add(reader); } - } - else { - BufferResponse resp = new BufferResponse(buffer, currentPartitionQueue.getSequenceNumber(), currentPartitionQueue.getReceiverId()); - if (!buffer.isBuffer() && - EventSerializer.fromBuffer(buffer, getClass().getClassLoader()).getClass() == EndOfPartitionEvent.class) { + BufferResponse msg = new BufferResponse( + next.buffer(), + reader.getSequenceNumber(), + reader.getReceiverId()); - currentPartitionQueue.notifySubpartitionConsumed(); - currentPartitionQueue.releaseAllResources(); - markAsReleased(currentPartitionQueue.getReceiverId()); + if (isEndOfPartitionEvent(next.buffer())) { + reader.notifySubpartitionConsumed(); + reader.releaseAllResources(); - currentPartitionQueue = null; + markAsReleased(reader.getReceiverId()); } - channel.writeAndFlush(resp).addListener(writeListener); + // Write and flush and wait until this is done before + // trying to continue with the next buffer. + channel.writeAndFlush(msg).addListener(writeListener); return; } } } - } - catch (Throwable t) { - if (buffer != null) { - buffer.recycle(); + } catch (Throwable t) { + if (next != null) { + next.buffer().recycle(); } throw new IOException(t.getMessage(), t); } } + private boolean isEndOfPartitionEvent(Buffer buffer) throws IOException { + return !buffer.isBuffer() && EventSerializer.fromBuffer(buffer, getClass().getClassLoader()).getClass() == EndOfPartitionEvent.class; + } + @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { releaseAllResources(); @@ -215,22 +238,15 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter { releaseAllResources(); if (channel.isActive()) { - channel.writeAndFlush(new NettyMessage.ErrorResponse(cause)).addListener(ChannelFutureListener.CLOSE); + channel.writeAndFlush(new ErrorResponse(cause)).addListener(ChannelFutureListener.CLOSE); } } private void releaseAllResources() throws IOException { - if (currentPartitionQueue != null) { - currentPartitionQueue.releaseAllResources(); - markAsReleased(currentPartitionQueue.getReceiverId()); - - currentPartitionQueue = null; - } - - while ((currentPartitionQueue = queue.poll()) != null) { - currentPartitionQueue.releaseAllResources(); - - markAsReleased(currentPartitionQueue.getReceiverId()); + SequenceNumberingViewReader reader; + while ((reader = nonEmptyReader.poll()) != null) { + reader.releaseAllResources(); + markAsReleased(reader.getReceiverId()); } } @@ -241,7 +257,7 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter { released.add(receiverId); } - // This listener is called after an element of the current queue has been + // This listener is called after an element of the current nonEmptyReader has been // flushed. If successful, the listener triggers further processing of the // queues. private class WriteAndFlushNextMessageIfPossibleListener implements ChannelFutureListener { @@ -251,87 +267,14 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter { try { if (future.isSuccess()) { writeAndFlushNextMessageIfPossible(future.channel()); - } - else if (future.cause() != null) { + } else if (future.cause() != null) { handleException(future.channel(), future.cause()); - } - else { + } else { handleException(future.channel(), new IllegalStateException("Sending cancelled by user.")); } - } - catch (Throwable t) { + } catch (Throwable t) { handleException(future.channel(), t); } } } - - /** - * Simple wrapper for the partition queue iterator, which increments a - * sequence number for each returned buffer and remembers the receiver ID. - */ - private class SequenceNumberingSubpartitionView implements ResultSubpartitionView, NotificationListener { - - private final ResultSubpartitionView queueIterator; - - private final InputChannelID receiverId; - - private int sequenceNumber = -1; - - private SequenceNumberingSubpartitionView(ResultSubpartitionView queueIterator, InputChannelID receiverId) { - this.queueIterator = checkNotNull(queueIterator); - this.receiverId = checkNotNull(receiverId); - } - - private InputChannelID getReceiverId() { - return receiverId; - } - - private int getSequenceNumber() { - return sequenceNumber; - } - - @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { - Buffer buffer = queueIterator.getNextBuffer(); - - if (buffer != null) { - sequenceNumber++; - } - - return buffer; - } - - @Override - public void notifySubpartitionConsumed() throws IOException { - queueIterator.notifySubpartitionConsumed(); - } - - @Override - public boolean isReleased() { - return queueIterator.isReleased(); - } - - @Override - public Throwable getFailureCause() { - return queueIterator.getFailureCause(); - } - - @Override - public boolean registerListener(NotificationListener ignored) throws IOException { - return queueIterator.registerListener(this); - } - - @Override - public void releaseAllResources() throws IOException { - queueIterator.releaseAllResources(); - } - - /** - * Enqueue this iterator again after a notification. - */ - @Override - public void onNotification() { - ctx.pipeline().fireUserEventTriggered(this); - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java index e278d07..12b52ec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java @@ -27,7 +27,6 @@ import org.apache.flink.runtime.io.network.netty.NettyMessage.CancelPartitionReq import org.apache.flink.runtime.io.network.netty.NettyMessage.CloseRequest; import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; -import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,10 +52,10 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes private BufferPool bufferPool; PartitionRequestServerHandler( - ResultPartitionProvider partitionProvider, - TaskEventDispatcher taskEventDispatcher, - PartitionRequestQueue outboundQueue, - NetworkBufferPool networkBufferPool) { + ResultPartitionProvider partitionProvider, + TaskEventDispatcher taskEventDispatcher, + PartitionRequestQueue outboundQueue, + NetworkBufferPool networkBufferPool) { this.partitionProvider = partitionProvider; this.taskEventDispatcher = taskEventDispatcher; @@ -94,15 +93,16 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes LOG.debug("Read channel on {}: {}.", ctx.channel().localAddress(), request); try { - ResultSubpartitionView subpartition = - partitionProvider.createSubpartitionView( - request.partitionId, - request.queueIndex, - bufferPool); - - outboundQueue.enqueue(subpartition, request.receiverId); - } - catch (PartitionNotFoundException notFound) { + SequenceNumberingViewReader reader = new SequenceNumberingViewReader( + request.receiverId, + outboundQueue); + + reader.requestSubpartitionView( + partitionProvider, + request.partitionId, + request.queueIndex, + bufferPool); + } catch (PartitionNotFoundException notFound) { respondWithError(ctx, notFound, request.receiverId); } } @@ -115,20 +115,16 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes if (!taskEventDispatcher.publish(request.partitionId, request.event)) { respondWithError(ctx, new IllegalArgumentException("Task event receiver not found."), request.receiverId); } - } - else if (msgClazz == CancelPartitionRequest.class) { + } else if (msgClazz == CancelPartitionRequest.class) { CancelPartitionRequest request = (CancelPartitionRequest) msg; outboundQueue.cancel(request.receiverId); - } - else if (msgClazz == CloseRequest.class) { + } else if (msgClazz == CloseRequest.class) { outboundQueue.close(); - } - else { + } else { LOG.warn("Received unexpected client request: {}", msg); } - } - catch (Throwable t) { + } catch (Throwable t) { respondWithError(ctx, t); } } http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java new file mode 100644 index 0000000..ef611eb --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferProvider; +import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; +import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Simple wrapper for the partition readerQueue iterator, which increments a + * sequence number for each returned buffer and remembers the receiver ID. + * + * <p>It also keeps track of available buffers and notifies the outbound + * handler about non-emptiness, similar to the {@link LocalInputChannel}. + */ +class SequenceNumberingViewReader implements BufferAvailabilityListener { + + private final Object requestLock = new Object(); + + private final InputChannelID receiverId; + + private final AtomicLong numBuffersAvailable = new AtomicLong(); + + private final PartitionRequestQueue requestQueue; + + private volatile ResultSubpartitionView subpartitionView; + + private int sequenceNumber = -1; + + SequenceNumberingViewReader(InputChannelID receiverId, PartitionRequestQueue requestQueue) { + this.receiverId = receiverId; + this.requestQueue = requestQueue; + } + + void requestSubpartitionView( + ResultPartitionProvider partitionProvider, + ResultPartitionID resultPartitionId, + int subPartitionIndex, + BufferProvider bufferProvider) throws IOException { + + synchronized (requestLock) { + if (subpartitionView == null) { + // This this call can trigger a notification we have to + // schedule a separate task at the event loop that will + // start consuming this. Otherwise the reference to the + // view cannot be available in getNextBuffer(). + this.subpartitionView = partitionProvider.createSubpartitionView( + resultPartitionId, + subPartitionIndex, + bufferProvider, + this); + } else { + throw new IllegalStateException("Subpartition already requested"); + } + } + } + + InputChannelID getReceiverId() { + return receiverId; + } + + int getSequenceNumber() { + return sequenceNumber; + } + + public BufferAndAvailability getNextBuffer() throws IOException, InterruptedException { + Buffer next = subpartitionView.getNextBuffer(); + if (next != null) { + long remaining = numBuffersAvailable.decrementAndGet(); + sequenceNumber++; + + if (remaining >= 0) { + return new BufferAndAvailability(next, remaining > 0); + } else { + throw new IllegalStateException("no buffer available"); + } + } else { + return null; + } + } + + public void notifySubpartitionConsumed() throws IOException { + subpartitionView.notifySubpartitionConsumed(); + } + + public boolean isReleased() { + return subpartitionView.isReleased(); + } + + public Throwable getFailureCause() { + return subpartitionView.getFailureCause(); + } + + public void releaseAllResources() throws IOException { + subpartitionView.releaseAllResources(); + } + + @Override + public void notifyBuffersAvailable(long numBuffers) { + // if this request made the channel non-empty, notify the input gate + if (numBuffers > 0 && numBuffersAvailable.getAndAdd(numBuffers) == 0) { + requestQueue.notifyReaderNonEmpty(this); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java new file mode 100644 index 0000000..114ef7c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +/** + * Listener interface implemented by consumers of {@link ResultSubpartitionView} + * that want to be notified of availability of further buffers. + */ +public interface BufferAvailabilityListener { + + /** + * Called whenever a new number of buffers becomes available. + * + * @param numBuffers The number of buffers that became available. + */ + void notifyBuffersAvailable(long numBuffers); +} http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index b703acb..4d5e378 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -22,7 +22,7 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; -import org.apache.flink.runtime.util.event.NotificationListener; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +30,7 @@ import java.io.IOException; import java.util.ArrayDeque; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** * A pipelined in-memory only subpartition, which can be consumed once. @@ -38,51 +39,47 @@ class PipelinedSubpartition extends ResultSubpartition { private static final Logger LOG = LoggerFactory.getLogger(PipelinedSubpartition.class); + // ------------------------------------------------------------------------ + + /** All buffers of this subpartition. Access to the buffers is synchronized on this object. */ + private final ArrayDeque<Buffer> buffers = new ArrayDeque<>(); + + /** The read view to consume this subpartition. */ + private PipelinedSubpartitionView readView; + /** Flag indicating whether the subpartition has been finished. */ private boolean isFinished; /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; - /** - * A data availability listener. Registered, when the consuming task is faster than the - * producing task. - */ - private NotificationListener registeredListener; - - /** The read view to consume this subpartition. */ - private PipelinedSubpartitionView readView; - - /** All buffers of this subpartition. Access to the buffers is synchronized on this object. */ - final ArrayDeque<Buffer> buffers = new ArrayDeque<Buffer>(); + // ------------------------------------------------------------------------ PipelinedSubpartition(int index, ResultPartition parent) { super(index, parent); } @Override - public boolean add(Buffer buffer) { + public boolean add(Buffer buffer) throws IOException { checkNotNull(buffer); - final NotificationListener listener; + // view reference accessible outside the lock, but assigned inside the locked scope + final PipelinedSubpartitionView reader; synchronized (buffers) { - if (isReleased || isFinished) { + if (isFinished || isReleased) { return false; } // Add the buffer and update the stats buffers.add(buffer); + reader = readView; updateStatistics(buffer); - - // Get the listener... - listener = registeredListener; - registeredListener = null; } // Notify the listener outside of the synchronized block - if (listener != null) { - listener.onNotification(); + if (reader != null) { + reader.notifyBuffersAvailable(1); } return true; @@ -90,36 +87,34 @@ class PipelinedSubpartition extends ResultSubpartition { @Override public void finish() throws IOException { - final NotificationListener listener; + final Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE); + + // view reference accessible outside the lock, but assigned inside the locked scope + final PipelinedSubpartitionView reader; synchronized (buffers) { - if (isReleased || isFinished) { + if (isFinished || isReleased) { return; } - final Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE); - buffers.add(buffer); + reader = readView; updateStatistics(buffer); isFinished = true; - - LOG.debug("Finished {}.", this); - - // Get the listener... - listener = registeredListener; - registeredListener = null; } + LOG.debug("Finished {}.", this); + // Notify the listener outside of the synchronized block - if (listener != null) { - listener.onNotification(); + if (reader != null) { + reader.notifyBuffersAvailable(1); } } @Override public void release() { - final NotificationListener listener; + // view reference accessible outside the lock, but assigned inside the locked scope final PipelinedSubpartitionView view; synchronized (buffers) { @@ -130,40 +125,35 @@ class PipelinedSubpartition extends ResultSubpartition { // Release all available buffers Buffer buffer; while ((buffer = buffers.poll()) != null) { - if (!buffer.isRecycled()) { - buffer.recycle(); - } + buffer.recycle(); } // Get the view... view = readView; readView = null; - // Get the listener... - listener = registeredListener; - registeredListener = null; - // Make sure that no further buffers are added to the subpartition isReleased = true; - - LOG.debug("Released {}.", this); } + LOG.debug("Released {}.", this); + // Release all resources of the view if (view != null) { view.releaseAllResources(); } + } - // Notify the listener outside of the synchronized block - if (listener != null) { - listener.onNotification(); + Buffer pollBuffer() { + synchronized (buffers) { + return buffers.pollFirst(); } } @Override public int releaseMemory() { - // The pipelined subpartition does not react to memory release requests. The buffers will be - // recycled by the consuming task. + // The pipelined subpartition does not react to memory release requests. + // The buffers will be recycled by the consuming task. return 0; } @@ -173,52 +163,42 @@ class PipelinedSubpartition extends ResultSubpartition { } @Override - public PipelinedSubpartitionView createReadView(BufferProvider bufferProvider) { - synchronized (buffers) { - if (readView != null) { - throw new IllegalStateException("Subpartition " + index + " of " - + parent.getPartitionId() + " is being or already has been " + - "consumed, but pipelined subpartitions can only be consumed once."); - } + public PipelinedSubpartitionView createReadView(BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException { + final int queueSize; - readView = new PipelinedSubpartitionView(this); + synchronized (buffers) { + checkState(!isReleased); + checkState(readView == null, + "Subpartition %s of is being (or already has been) consumed, " + + "but pipelined subpartitions can only be consumed once.", index, parent.getPartitionId()); - LOG.debug("Created read view for subpartition {} of partition {}.", index, parent.getPartitionId()); + LOG.debug("Creating read view for subpartition {} of partition {}.", index, parent.getPartitionId()); - return readView; + queueSize = buffers.size(); + readView = new PipelinedSubpartitionView(this, availabilityListener); } + + readView.notifyBuffersAvailable(queueSize); + + return readView; } @Override public String toString() { - synchronized (buffers) { - return String.format("PipelinedSubpartition [number of buffers: %d (%d bytes), " + - "finished? %s, read view? %s]", - getTotalNumberOfBuffers(), getTotalNumberOfBytes(), isFinished, readView != null); - } - } + final long numBuffers; + final long numBytes; + final boolean finished; + final boolean hasReadView; - /** - * Registers a listener with this subpartition and returns whether the registration was - * successful. - * - * <p> A registered listener is notified when the state of the subpartition changes. After a - * notification, the listener is unregistered. Only a single listener is allowed to be - * registered. - */ - boolean registerListener(NotificationListener listener) { synchronized (buffers) { - if (!buffers.isEmpty() || isReleased) { - return false; - } - - if (registeredListener == null) { - registeredListener = listener; - - return true; - } - - throw new IllegalStateException("Already registered listener."); + numBuffers = getTotalNumberOfBuffers(); + numBytes = getTotalNumberOfBytes(); + finished = isFinished; + hasReadView = readView != null; } + + return String.format( + "PipelinedSubpartition [number of buffers: %d (%d bytes), finished? %s, read view? %s]", + numBuffers, numBytes, finished, hasReadView); } } http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java index f8d81a4..52c78ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java @@ -19,8 +19,8 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.util.event.NotificationListener; +import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -33,23 +33,25 @@ class PipelinedSubpartitionView implements ResultSubpartitionView { /** The subpartition this view belongs to. */ private final PipelinedSubpartition parent; + private final BufferAvailabilityListener availabilityListener; + /** Flag indicating whether this view has been released. */ - private AtomicBoolean isReleased = new AtomicBoolean(); + private final AtomicBoolean isReleased; - PipelinedSubpartitionView(PipelinedSubpartition parent) { + PipelinedSubpartitionView(PipelinedSubpartition parent, BufferAvailabilityListener listener) { this.parent = checkNotNull(parent); + this.availabilityListener = checkNotNull(listener); + this.isReleased = new AtomicBoolean(); } @Override public Buffer getNextBuffer() { - synchronized (parent.buffers) { - return parent.buffers.poll(); - } + return parent.pollBuffer(); } @Override - public boolean registerListener(NotificationListener listener) { - return !isReleased.get() && parent.registerListener(listener); + public void notifyBuffersAvailable(long numBuffers) throws IOException { + availabilityListener.notifyBuffersAvailable(numBuffers); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index c30f333..9da919e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner; @@ -131,7 +130,6 @@ public class ResultPartition implements BufferPoolOwner { ResultPartitionManager partitionManager, ResultPartitionConsumableNotifier partitionConsumableNotifier, IOManager ioManager, - IOMode defaultIoMode, boolean sendScheduleOrUpdateConsumersMessage) { this.owningTaskName = checkNotNull(owningTaskName); @@ -147,8 +145,7 @@ public class ResultPartition implements BufferPoolOwner { switch (partitionType) { case BLOCKING: for (int i = 0; i < subpartitions.length; i++) { - subpartitions[i] = new SpillableSubpartition( - i, this, ioManager, defaultIoMode); + subpartitions[i] = new SpillableSubpartition(i, this, ioManager); } break; @@ -321,7 +318,7 @@ public class ResultPartition implements BufferPoolOwner { /** * Returns the requested subpartition. */ - public ResultSubpartitionView createSubpartitionView(int index, BufferProvider bufferProvider) throws IOException { + public ResultSubpartitionView createSubpartitionView(int index, BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException { int refCnt = pendingReferences.get(); checkState(refCnt != -1, "Partition released."); @@ -329,7 +326,7 @@ public class ResultPartition implements BufferPoolOwner { checkElementIndex(index, subpartitions.length, "Subpartition not found."); - ResultSubpartitionView readView = subpartitions[index].createReadView(bufferProvider); + ResultSubpartitionView readView = subpartitions[index].createReadView(bufferProvider, availabilityListener); LOG.debug("Created {}", readView); http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java index 9da3e14..8ad3e34 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java @@ -66,7 +66,8 @@ public class ResultPartitionManager implements ResultPartitionProvider { public ResultSubpartitionView createSubpartitionView( ResultPartitionID partitionId, int subpartitionIndex, - BufferProvider bufferProvider) throws IOException { + BufferProvider bufferProvider, + BufferAvailabilityListener availabilityListener) throws IOException { synchronized (registeredPartitions) { final ResultPartition partition = registeredPartitions.get(partitionId.getProducerId(), @@ -78,7 +79,7 @@ public class ResultPartitionManager implements ResultPartitionProvider { LOG.debug("Requesting subpartition {} of {}.", subpartitionIndex, partition); - return partition.createSubpartitionView(subpartitionIndex, bufferProvider); + return partition.createSubpartitionView(subpartitionIndex, bufferProvider, availabilityListener); } } http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java index 23dd1d3..3fbfd49 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java @@ -30,6 +30,7 @@ public interface ResultPartitionProvider { ResultSubpartitionView createSubpartitionView( ResultPartitionID partitionId, int index, - BufferProvider bufferProvider) throws IOException; + BufferProvider bufferProvider, + BufferAvailabilityListener availabilityListener) throws IOException; } http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java index b7ca9c4..0fafe79 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java @@ -37,7 +37,7 @@ public abstract class ResultSubpartition { // - Statistics ---------------------------------------------------------- /** The total number of buffers (both data and event buffers) */ - private int totalNumberOfBuffers; + private long totalNumberOfBuffers; /** The total number of bytes (both data and event buffers) */ private long totalNumberOfBytes; @@ -52,7 +52,7 @@ public abstract class ResultSubpartition { totalNumberOfBytes += buffer.getSize(); } - protected int getTotalNumberOfBuffers() { + protected long getTotalNumberOfBuffers() { return totalNumberOfBuffers; } @@ -77,7 +77,7 @@ public abstract class ResultSubpartition { abstract public void release() throws IOException; - abstract public ResultSubpartitionView createReadView(BufferProvider bufferProvider) throws IOException; + abstract public ResultSubpartitionView createReadView(BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException; abstract int releaseMemory() throws IOException; http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java index cfc5455..98be90f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.util.event.NotificationListener; import java.io.IOException; @@ -41,13 +40,7 @@ public interface ResultSubpartitionView { */ Buffer getNextBuffer() throws IOException, InterruptedException; - /** - * Subscribes to data availability notifications. - * <p> - * Returns whether the subscription was successful. A subscription fails, - * if there is data available. - */ - boolean registerListener(NotificationListener listener) throws IOException; + void notifyBuffersAvailable(long buffers) throws IOException; void releaseAllResources() throws IOException; http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java index 3e4692a..b584ebb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java @@ -18,42 +18,54 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter; import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; +import java.util.ArrayDeque; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A blocking in-memory subpartition, which is able to spill to disk. + * A spillable sub partition starts out in-memory and spills to disk if asked + * to do so. * - * <p> Buffers are kept in-memory as long as possible. If not possible anymore, all buffers are - * spilled to disk. + * <p>Buffers for the partition come from a {@link BufferPool}. The buffer pool + * is also responsible to trigger the release of the buffers if it needs them + * back. At this point, the spillable sub partition will write all in-memory + * buffers to disk. All added buffers after that point directly go to disk. + * + * <p>This partition type is used for {@link ResultPartitionType#BLOCKING} + * results that are fully produced before they can be consumed. At the point + * when they are consumed, the buffers are (i) all in-memory, (ii) currently + * being spilled to disk, or (iii) completely spilled to disk. Depending on + * this state, different reader variants are returned (see + * {@link SpillableSubpartitionView} and {@link SpilledSubpartitionView}). + * + * <p>Since the network buffer pool size is usually quite small (default is + * {@link ConfigConstants#DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS}), most + * spillable partitions will be spilled for real-world data sets. */ class SpillableSubpartition extends ResultSubpartition { private static final Logger LOG = LoggerFactory.getLogger(SpillableSubpartition.class); - /** All buffers of this subpartition. */ - final ArrayList<Buffer> buffers = new ArrayList<Buffer>(); + /** Buffers are kept in this queue as long as we weren't ask to release any. */ + private final ArrayDeque<Buffer> buffers = new ArrayDeque<>(); - /** The I/O manager to create the spill writer from. */ - final IOManager ioManager; - - /** The default I/O mode to use. */ - final IOMode ioMode; + /** The I/O manager used for spilling buffers to disk. */ + private final IOManager ioManager; /** The writer used for spilling. As long as this is null, we are in-memory. */ - BufferFileWriter spillWriter; + private BufferFileWriter spillWriter; /** Flag indicating whether the subpartition has been finished. */ private boolean isFinished; @@ -64,11 +76,10 @@ class SpillableSubpartition extends ResultSubpartition { /** The read view to consume this subpartition. */ private ResultSubpartitionView readView; - SpillableSubpartition(int index, ResultPartition parent, IOManager ioManager, IOMode ioMode) { + SpillableSubpartition(int index, ResultPartition parent, IOManager ioManager) { super(index, parent); this.ioManager = checkNotNull(ioManager); - this.ioMode = checkNotNull(ioMode); } @Override @@ -80,7 +91,6 @@ class SpillableSubpartition extends ResultSubpartition { return false; } - // In-memory if (spillWriter == null) { buffers.add(buffer); @@ -88,7 +98,7 @@ class SpillableSubpartition extends ResultSubpartition { } } - // Else: Spilling + // Didn't return early => go to disk spillWriter.writeBlock(buffer); return true; @@ -102,7 +112,7 @@ class SpillableSubpartition extends ResultSubpartition { } } - // If we are spilling/have spilled, wait for the writer to finish. + // If we are spilling/have spilled, wait for the writer to finish if (spillWriter != null) { spillWriter.close(); } @@ -117,51 +127,93 @@ class SpillableSubpartition extends ResultSubpartition { return; } - // Recycle all in-memory buffers - for (Buffer buffer : buffers) { - buffer.recycle(); - } - - buffers.clear(); - buffers.trimToSize(); + view = readView; - // If we are spilling/have spilled, wait for the writer to finish and delete the file. - if (spillWriter != null) { - spillWriter.closeAndDelete(); + // No consumer yet, we are responsible to clean everything up. If + // one is available, the view is responsible is to clean up (see + // below). + if (view == null) { + for (Buffer buffer : buffers) { + buffer.recycle(); + } + buffers.clear(); + + // TODO This can block until all buffers are written out to + // disk if a spill is in-progress before deleting the file. + // It is possibly called from the Netty event loop threads, + // which can bring down the network. + if (spillWriter != null) { + spillWriter.closeAndDelete(); + } } - // Get the view... - view = readView; - readView = null; - isReleased = true; } - // Release the view outside of the synchronized block if (view != null) { - view.notifySubpartitionConsumed(); + view.releaseAllResources(); + } + } + + @Override + public ResultSubpartitionView createReadView(BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException { + synchronized (buffers) { + if (!isFinished) { + throw new IllegalStateException("Subpartition has not been finished yet, " + + "but blocking subpartitions can only be consumed after they have " + + "been finished."); + } + + if (readView != null) { + throw new IllegalStateException("Subpartition is being or already has been " + + "consumed, but we currently allow subpartitions to only be consumed once."); + } + + if (spillWriter != null) { + readView = new SpilledSubpartitionView( + this, + bufferProvider.getMemorySegmentSize(), + spillWriter, + getTotalNumberOfBuffers(), + availabilityListener); + } else { + readView = new SpillableSubpartitionView( + this, + buffers, + ioManager, + bufferProvider.getMemorySegmentSize(), + availabilityListener); + } + + return readView; } } @Override public int releaseMemory() throws IOException { synchronized (buffers) { - if (spillWriter == null) { - // Create the spill writer + ResultSubpartitionView view = readView; + + if (view != null && view.getClass() == SpillableSubpartitionView.class) { + // If there is a spilalble view, it's the responsibility of the + // view to release memory. + SpillableSubpartitionView spillableView = (SpillableSubpartitionView) view; + return spillableView.releaseMemory(); + } else if (spillWriter == null) { + // No view and in-memory => spill to disk spillWriter = ioManager.createBufferFileWriter(ioManager.createChannel()); - final int numberOfBuffers = buffers.size(); - + int numberOfBuffers = buffers.size(); long spilledBytes = 0; // Spill all buffers for (int i = 0; i < numberOfBuffers; i++) { - Buffer buffer = buffers.remove(0); + Buffer buffer = buffers.remove(); spilledBytes += buffer.getSize(); spillWriter.writeBlock(buffer); } - LOG.debug("Spilled {} bytes for sub partition {} of {}.", spilledBytes, index, parent.getPartitionId()); + LOG.debug("Spilling {} bytes for sub partition {} of {}.", spilledBytes, index, parent.getPartitionId()); return numberOfBuffers; } @@ -177,54 +229,11 @@ class SpillableSubpartition extends ResultSubpartition { } @Override - public ResultSubpartitionView createReadView(BufferProvider bufferProvider) throws IOException { - synchronized (buffers) { - if (!isFinished) { - throw new IllegalStateException("Subpartition has not been finished yet, " + - "but blocking subpartitions can only be consumed after they have " + - "been finished."); - } - - if (readView != null) { - throw new IllegalStateException("Subpartition is being or already has been " + - "consumed, but we currently allow subpartitions to only be consumed once."); - } - - // Spilled if closed and no outstanding write requests - boolean isSpilled = spillWriter != null && (spillWriter.isClosed() - || spillWriter.getNumberOfOutstandingRequests() == 0); - - if (isSpilled) { - if (ioMode.isSynchronous()) { - readView = new SpilledSubpartitionViewSyncIO( - this, - bufferProvider.getMemorySegmentSize(), - spillWriter.getChannelID(), - 0); - } - else { - readView = new SpilledSubpartitionViewAsyncIO( - this, - bufferProvider, - ioManager, - spillWriter.getChannelID(), - 0); - } - } - else { - readView = new SpillableSubpartitionView( - this, bufferProvider, buffers.size(), ioMode); - } - - return readView; - } - } - - @Override public String toString() { return String.format("SpillableSubpartition [%d number of buffers (%d bytes)," + "finished? %s, read view? %s, spilled? %s]", getTotalNumberOfBuffers(), getTotalNumberOfBytes(), isFinished, readView != null, spillWriter != null); } + } http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java index 29c2002..8119ecc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java @@ -18,15 +18,14 @@ package org.apache.flink.runtime.io.network.partition; -import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; +import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferProvider; -import org.apache.flink.runtime.util.event.NotificationListener; import java.io.IOException; +import java.util.ArrayDeque; import java.util.concurrent.atomic.AtomicBoolean; -import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; class SpillableSubpartitionView implements ResultSubpartitionView { @@ -34,146 +33,163 @@ class SpillableSubpartitionView implements ResultSubpartitionView { /** The subpartition this view belongs to. */ private final SpillableSubpartition parent; - /** The buffer provider to read buffers into (spilling case). */ - private final BufferProvider bufferProvider; + /** All buffers of this subpartition. Access to the buffers is synchronized on this object. */ + private final ArrayDeque<Buffer> buffers; - /** The number of buffers in-memory at the subpartition. */ - private final int numberOfBuffers; + /** IO manager if we need to spill (for spilled case). */ + private final IOManager ioManager; - /** The default I/O mode to use. */ - private final IOMode ioMode; + /** Size of memory segments (for spilled case). */ + private final int memorySegmentSize; - private ResultSubpartitionView spilledView; - - private int currentQueuePosition; - - private long currentBytesRead; + /** + * The buffer availability listener. As long as in-memory, notifications + * happen on a buffer per buffer basis as spilling may happen after a + * notification has been sent out. + */ + private final BufferAvailabilityListener listener; private final AtomicBoolean isReleased = new AtomicBoolean(false); - public SpillableSubpartitionView( - SpillableSubpartition parent, - BufferProvider bufferProvider, - int numberOfBuffers, - IOMode ioMode) { - - this.parent = checkNotNull(parent); - this.bufferProvider = checkNotNull(bufferProvider); - checkArgument(numberOfBuffers >= 0); - this.numberOfBuffers = numberOfBuffers; - this.ioMode = checkNotNull(ioMode); - } - - @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { - - if (isReleased.get()) { - return null; - } - - // 1) In-memory - synchronized (parent.buffers) { - if (parent.isReleased()) { - return null; - } + /** + * The next buffer to hand out. Everytime this is set to a non-null value, + * a listener notification happens. + */ + private Buffer nextBuffer; - if (parent.spillWriter == null) { - if (currentQueuePosition < numberOfBuffers) { - Buffer buffer = parent.buffers.get(currentQueuePosition); + private volatile SpilledSubpartitionView spilledView; - buffer.retain(); + SpillableSubpartitionView( + SpillableSubpartition parent, + ArrayDeque<Buffer> buffers, + IOManager ioManager, + int memorySegmentSize, + BufferAvailabilityListener listener) { - // TODO Fix hard coding of 8 bytes for the header - currentBytesRead += buffer.getSize() + 8; - currentQueuePosition++; - - return buffer; - } + this.parent = checkNotNull(parent); + this.buffers = checkNotNull(buffers); + this.ioManager = checkNotNull(ioManager); + this.memorySegmentSize = memorySegmentSize; + this.listener = checkNotNull(listener); - return null; - } + synchronized (buffers) { + nextBuffer = buffers.poll(); } - // 2) Spilled - if (spilledView != null) { - return spilledView.getNextBuffer(); + if (nextBuffer != null) { + listener.notifyBuffersAvailable(1); } + } - // 3) Spilling - // Make sure that all buffers are written before consuming them. We can't block here, - // because this might be called from an network I/O thread. - if (parent.spillWriter.getNumberOfOutstandingRequests() > 0) { - return null; - } + int releaseMemory() throws IOException { + synchronized (buffers) { + if (spilledView != null || nextBuffer == null) { + // Already spilled or nothing in-memory + return 0; + } else { + // We don't touch next buffer, because a notification has + // already been sent for it. Only when it is consumed, will + // it be recycled. + + // Create the spill writer and write all buffers to disk + BufferFileWriter spillWriter = ioManager.createBufferFileWriter(ioManager.createChannel()); + + int numBuffers = buffers.size(); + for (int i = 0; i < numBuffers; i++) { + Buffer buffer = buffers.remove(); + try { + spillWriter.writeBlock(buffer); + } finally { + buffer.recycle(); + } + } - if (ioMode.isSynchronous()) { - spilledView = new SpilledSubpartitionViewSyncIO( - parent, - bufferProvider.getMemorySegmentSize(), - parent.spillWriter.getChannelID(), - currentBytesRead); - } - else { - spilledView = new SpilledSubpartitionViewAsyncIO( + spilledView = new SpilledSubpartitionView( parent, - bufferProvider, - parent.ioManager, - parent.spillWriter.getChannelID(), - currentBytesRead); - } + memorySegmentSize, + spillWriter, + numBuffers, + listener); - return spilledView.getNextBuffer(); + return numBuffers; + } + } } @Override - public boolean registerListener(NotificationListener listener) throws IOException { - if (spilledView == null) { - synchronized (parent.buffers) { - // Didn't spill yet, buffers should be in-memory - if (parent.spillWriter == null) { - return false; + public Buffer getNextBuffer() throws IOException, InterruptedException { + synchronized (buffers) { + if (isReleased.get()) { + return null; + } else if (nextBuffer != null) { + Buffer current = nextBuffer; + nextBuffer = buffers.poll(); + + if (nextBuffer != null) { + listener.notifyBuffersAvailable(1); } - } - // Spilling - if (parent.spillWriter.getNumberOfOutstandingRequests() > 0) { - return parent.spillWriter.registerAllRequestsProcessedListener(listener); + return current; } + } // else: spilled - return false; + SpilledSubpartitionView spilled = spilledView; + if (spilled != null) { + return spilled.getNextBuffer(); + } else { + throw new IllegalStateException("No in-memory buffers available, but also nothing spilled."); } - - return spilledView.registerListener(listener); } @Override - public void notifySubpartitionConsumed() throws IOException { - parent.onConsumedSubpartition(); + public void notifyBuffersAvailable(long buffers) throws IOException { + // We do the availability listener notification one by one } @Override public void releaseAllResources() throws IOException { if (isReleased.compareAndSet(false, true)) { - if (spilledView != null) { - spilledView.releaseAllResources(); + SpilledSubpartitionView spilled = spilledView; + if (spilled != null) { + spilled.releaseAllResources(); } } } @Override + public void notifySubpartitionConsumed() throws IOException { + SpilledSubpartitionView spilled = spilledView; + if (spilled != null) { + spilled.notifySubpartitionConsumed(); + } else { + parent.onConsumedSubpartition(); + } + } + + @Override public boolean isReleased() { - return parent.isReleased() || isReleased.get(); + SpilledSubpartitionView spilled = spilledView; + if (spilled != null) { + return spilled.isReleased(); + } else { + return parent.isReleased() || isReleased.get(); + } } @Override public Throwable getFailureCause() { - return parent.getFailureCause(); + SpilledSubpartitionView spilled = spilledView; + if (spilled != null) { + return spilled.getFailureCause(); + } else { + return parent.getFailureCause(); + } } @Override public String toString() { return String.format("SpillableSubpartitionView(index: %d) of ResultPartition %s", - parent.index, - parent.parent.getPartitionId()); + parent.index, + parent.parent.getPartitionId()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java new file mode 100644 index 0000000..b087a4e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader; +import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter; +import org.apache.flink.runtime.io.disk.iomanager.SynchronousBufferFileReader; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.util.event.NotificationListener; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Reader for a spilled sub partition. + * + * <p>The partition availability listener is notified about available buffers + * only when the spilling is done. Spilling is done async and if it is still + * in progress, we wait with the notification until the spilling is done. + * + * <p>Reads of the spilled file are done in synchronously. + */ +class SpilledSubpartitionView implements ResultSubpartitionView, NotificationListener { + + /** The subpartition this view belongs to. */ + private final ResultSubpartition parent; + + /** Writer for spills. */ + private final BufferFileWriter spillWriter; + + /** The synchronous file reader to do the actual I/O. */ + private final BufferFileReader fileReader; + + /** The buffer pool to read data into. */ + private final SpillReadBufferPool bufferPool; + + /** Buffer availability listener. */ + private final BufferAvailabilityListener availabilityListener; + + /** The total number of spilled buffers. */ + private final long numberOfSpilledBuffers; + + /** Flag indicating whether all resources have been released. */ + private AtomicBoolean isReleased = new AtomicBoolean(); + + /** Flag indicating whether a spill is still in progress. */ + private volatile boolean isSpillInProgress = true; + + SpilledSubpartitionView( + ResultSubpartition parent, + int memorySegmentSize, + BufferFileWriter spillWriter, + long numberOfSpilledBuffers, + BufferAvailabilityListener availabilityListener) throws IOException { + + this.parent = checkNotNull(parent); + this.bufferPool = new SpillReadBufferPool(2, memorySegmentSize); + this.spillWriter = checkNotNull(spillWriter); + this.fileReader = new SynchronousBufferFileReader(spillWriter.getChannelID(), false); + checkArgument(numberOfSpilledBuffers >= 0); + this.numberOfSpilledBuffers = numberOfSpilledBuffers; + this.availabilityListener = checkNotNull(availabilityListener); + + // Check whether async spilling is still in progress. If not, this returns + // false and we can notify our availability listener about all available buffers. + // Otherwise, we notify only when the spill writer callback happens. + if (!spillWriter.registerAllRequestsProcessedListener(this)) { + isSpillInProgress = false; + availabilityListener.notifyBuffersAvailable(numberOfSpilledBuffers); + } + } + + /** + * This is the call back method for the spill writer. If a spill is still + * in progress when this view is created we wait until this method is called + * before we notify the availability listener. + */ + @Override + public void onNotification() { + isSpillInProgress = false; + availabilityListener.notifyBuffersAvailable(numberOfSpilledBuffers); + } + + @Override + public Buffer getNextBuffer() throws IOException, InterruptedException { + if (fileReader.hasReachedEndOfFile() || isSpillInProgress) { + return null; + } + + // TODO This is fragile as we implicitly expect that multiple calls to + // this method don't happen before recycling buffers returned earlier. + Buffer buffer = bufferPool.requestBufferBlocking(); + fileReader.readInto(buffer); + + return buffer; + } + + @Override + public void notifyBuffersAvailable(long buffers) throws IOException { + // We do the availability listener notification either directly on + // construction of this view (when everything has been spilled) or + // as soon as spilling is done and we are notified about it in the + // #onNotification callback. + } + + @Override + public void notifySubpartitionConsumed() throws IOException { + parent.onConsumedSubpartition(); + } + + @Override + public void releaseAllResources() throws IOException { + if (isReleased.compareAndSet(false, true)) { + // TODO This can block until all buffers are written out to + // disk if a spill is in-progress before deleting the file. + // It is possibly called from the Netty event loop threads, + // which can bring down the network. + spillWriter.closeAndDelete(); + + fileReader.close(); + bufferPool.destroy(); + } + } + + @Override + public boolean isReleased() { + return parent.isReleased() || isReleased.get(); + } + + @Override + public Throwable getFailureCause() { + return parent.getFailureCause(); + } + + @Override + public String toString() { + return String.format("SpilledSubpartitionView[sync](index: %d) of ResultPartition %s", parent.index, parent.parent.getPartitionId()); + } + + /** + * A buffer pool to provide buffer to read the file into. + * + * <p>This pool ensures that a consuming input gate makes progress in all cases, even when all + * buffers of the input gate buffer pool have been requested by remote input channels. + */ + private static class SpillReadBufferPool implements BufferRecycler { + + private final Queue<Buffer> buffers; + + private boolean isDestroyed; + + SpillReadBufferPool(int numberOfBuffers, int memorySegmentSize) { + this.buffers = new ArrayDeque<>(numberOfBuffers); + + synchronized (buffers) { + for (int i = 0; i < numberOfBuffers; i++) { + buffers.add(new Buffer(MemorySegmentFactory.allocateUnpooledSegment(memorySegmentSize), this)); + } + } + } + + @Override + public void recycle(MemorySegment memorySegment) { + synchronized (buffers) { + if (isDestroyed) { + memorySegment.free(); + } else { + buffers.add(new Buffer(memorySegment, this)); + buffers.notifyAll(); + } + } + } + + private Buffer requestBufferBlocking() throws InterruptedException { + synchronized (buffers) { + while (true) { + if (isDestroyed) { + return null; + } + + Buffer buffer = buffers.poll(); + + if (buffer != null) { + return buffer; + } + // Else: wait for a buffer + buffers.wait(); + } + } + } + + private void destroy() { + synchronized (buffers) { + isDestroyed = true; + buffers.notifyAll(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java deleted file mode 100644 index daccd28..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java +++ /dev/null @@ -1,383 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.partition; - -import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader; -import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferProvider; -import org.apache.flink.runtime.util.event.EventListener; -import org.apache.flink.runtime.util.event.NotificationListener; - -import java.io.IOException; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * View over a spilled subpartition. - * - * <p> Reads are triggered asynchronously in batches of configurable size. - */ -class SpilledSubpartitionViewAsyncIO implements ResultSubpartitionView { - - private final static int DEFAULT_READ_BATCH_SIZE = 2; - - private final Object lock = new Object(); - - /** The subpartition this view belongs to. */ - private final ResultSubpartition parent; - - /** The buffer provider to get the buffer read everything into. */ - private final BufferProvider bufferProvider; - - /** The buffer availability listener to be notified on available buffers. */ - private final BufferProviderCallback bufferAvailabilityListener; - - /** The size of read batches. */ - private final int readBatchSize; - - /** - * The size of the current batch (>= 0 and <= the configured batch size). Reads are only - * triggered when the size of the current batch is 0. - */ - private final AtomicInteger currentBatchSize = new AtomicInteger(); - - /** The asynchronous file reader to do the actual I/O. */ - private final BufferFileReader asyncFileReader; - - /** The buffers, which have been returned from the file reader. */ - private final ConcurrentLinkedQueue<Buffer> returnedBuffers = new ConcurrentLinkedQueue<Buffer>(); - - /** A data availability listener. */ - private NotificationListener registeredListener; - - /** Error, which has occurred in the I/O thread. */ - private volatile IOException errorInIOThread; - - /** Flag indicating whether all resources have been released. */ - private volatile boolean isReleased; - - /** Flag indicating whether we reached EOF at the file reader. */ - private volatile boolean hasReachedEndOfFile; - - /** Spilled file size */ - private final long fileSize; - - SpilledSubpartitionViewAsyncIO( - ResultSubpartition parent, - BufferProvider bufferProvider, - IOManager ioManager, - FileIOChannel.ID channelId, - long initialSeekPosition) throws IOException { - - this(parent, bufferProvider, ioManager, channelId, initialSeekPosition, DEFAULT_READ_BATCH_SIZE); - } - - SpilledSubpartitionViewAsyncIO( - ResultSubpartition parent, - BufferProvider bufferProvider, - IOManager ioManager, - FileIOChannel.ID channelId, - long initialSeekPosition, - int readBatchSize) throws IOException { - - checkArgument(initialSeekPosition >= 0, "Initial seek position is < 0."); - checkArgument(readBatchSize >= 1, "Batch read size < 1."); - - this.parent = checkNotNull(parent); - this.bufferProvider = checkNotNull(bufferProvider); - this.bufferAvailabilityListener = new BufferProviderCallback(this); - - this.asyncFileReader = ioManager.createBufferFileReader(channelId, new IOThreadCallback(this)); - - if (initialSeekPosition > 0) { - asyncFileReader.seekToPosition(initialSeekPosition); - } - - this.readBatchSize = readBatchSize; - - this.fileSize = asyncFileReader.getSize(); - - // Trigger the initial read requests - readNextBatchAsync(); - } - - @Override - public Buffer getNextBuffer() throws IOException { - checkError(); - - final Buffer buffer = returnedBuffers.poll(); - - // No buffer returned from the I/O thread currently. Either the current batch is in progress - // or we trigger the next one. - if (buffer == null) { - if (currentBatchSize.get() == 0) { - readNextBatchAsync(); - } - } - else { - currentBatchSize.decrementAndGet(); - } - - return buffer; - } - - @Override - public boolean registerListener(NotificationListener listener) throws IOException { - checkNotNull(listener); - - checkError(); - - synchronized (lock) { - if (isReleased || !returnedBuffers.isEmpty()) { - return false; - } - - if (registeredListener == null) { - registeredListener = listener; - - return true; - } - } - - throw new IllegalStateException("Already registered listener."); - } - - @Override - public void notifySubpartitionConsumed() throws IOException { - parent.onConsumedSubpartition(); - } - - @Override - public void releaseAllResources() throws IOException { - try { - synchronized (lock) { - if (!isReleased) { - // Recycle all buffers. Buffers, which are in flight are recycled as soon as - // they return from the I/O thread. - Buffer buffer; - while ((buffer = returnedBuffers.poll()) != null) { - buffer.recycle(); - } - - isReleased = true; - } - } - } - finally { - asyncFileReader.close(); - } - } - - @Override - public boolean isReleased() { - return parent.isReleased() || isReleased; - } - - @Override - public Throwable getFailureCause() { - return parent.getFailureCause(); - } - - /** - * Requests buffers from the buffer provider and triggers asynchronous read requests to fill - * them. - * - * <p> The number of requested buffers/triggered I/O read requests per call depends on the - * configured size of batch reads. - */ - private void readNextBatchAsync() throws IOException { - // This does not need to be fully synchronized with actually reaching EOF as long as - // we eventually notice it. In the worst case, we trigger some discarded reads and - // notice it when the buffers are returned. - // - // We only trigger reads if the current batch size is 0. - if (hasReachedEndOfFile || currentBatchSize.get() != 0) { - return; - } - - // Number of successful buffer requests or callback registrations. The call back will - // trigger the read as soon as a buffer becomes available again. - int i = 0; - - while (i < readBatchSize) { - final Buffer buffer = bufferProvider.requestBuffer(); - - if (buffer == null) { - // Listen for buffer availability. - currentBatchSize.incrementAndGet(); - - if (bufferProvider.addListener(bufferAvailabilityListener)) { - i++; - } - else if (bufferProvider.isDestroyed()) { - currentBatchSize.decrementAndGet(); - return; - } - else { - // Buffer available again - currentBatchSize.decrementAndGet(); - } - } - else { - currentBatchSize.incrementAndGet(); - - asyncFileReader.readInto(buffer); - } - } - } - - /** - * Returns a buffer from the buffer provider. - * - * <p> Note: This method is called from the thread recycling the available buffer. - */ - private void onAvailableBuffer(Buffer buffer) { - try { - asyncFileReader.readInto(buffer); - } - catch (IOException e) { - notifyError(e); - } - } - - /** - * Returns a successful buffer read request. - * - * <p> Note: This method is always called from the same I/O thread. - */ - private void returnBufferFromIOThread(Buffer buffer) { - final NotificationListener listener; - - synchronized (lock) { - if (hasReachedEndOfFile || isReleased) { - buffer.recycle(); - - return; - } - - returnedBuffers.add(buffer); - - listener = registeredListener; - registeredListener = null; - - // If this was the last buffer before we reached EOF, set the corresponding flag to - // ensure that further buffers are correctly recycled and eventually no further reads - // are triggered. - if (asyncFileReader.hasReachedEndOfFile()) { - hasReachedEndOfFile = true; - } - } - - if (listener != null) { - listener.onNotification(); - } - } - - /** - * Notifies the view about an error. - */ - private void notifyError(IOException error) { - if (errorInIOThread == null) { - errorInIOThread = error; - } - - final NotificationListener listener; - - synchronized (lock) { - listener = registeredListener; - registeredListener = null; - } - - if (listener != null) { - listener.onNotification(); - } - } - - /** - * Checks whether an error has been reported and rethrow the respective Exception, if available. - */ - private void checkError() throws IOException { - if (errorInIOThread != null) { - throw errorInIOThread; - } - } - - /** - * Callback from the I/O thread. - * - * <p> Successful buffer read requests add the buffer to the subpartition view, and failed ones - * notify about the error. - */ - private static class IOThreadCallback implements RequestDoneCallback<Buffer> { - - private final SpilledSubpartitionViewAsyncIO subpartitionView; - - public IOThreadCallback(SpilledSubpartitionViewAsyncIO subpartitionView) { - this.subpartitionView = subpartitionView; - } - - @Override - public void requestSuccessful(Buffer buffer) { - subpartitionView.returnBufferFromIOThread(buffer); - } - - @Override - public void requestFailed(Buffer buffer, IOException error) { - // Recycle the buffer and forward the error - buffer.recycle(); - - subpartitionView.notifyError(error); - } - } - - @Override - public String toString() { - return String.format("SpilledSubpartitionView[async](index: %d, file size: %d bytes) of ResultPartition %s", - parent.index, - fileSize, - parent.parent.getPartitionId()); - } - - /** - * Callback from the buffer provider. - */ - private static class BufferProviderCallback implements EventListener<Buffer> { - - private final SpilledSubpartitionViewAsyncIO subpartitionView; - - private BufferProviderCallback(SpilledSubpartitionViewAsyncIO subpartitionView) { - this.subpartitionView = subpartitionView; - } - - @Override - public void onEvent(Buffer buffer) { - if (buffer == null) { - return; - } - - subpartitionView.onAvailableBuffer(buffer); - } - } -}
