http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java deleted file mode 100644 index c86697f..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.partition; - -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader; -import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; -import org.apache.flink.runtime.io.disk.iomanager.SynchronousBufferFileReader; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferRecycler; -import org.apache.flink.runtime.util.event.NotificationListener; - -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * View over a spilled subpartition. - * - * <p> Reads are done synchronously. - */ -class SpilledSubpartitionViewSyncIO implements ResultSubpartitionView { - - /** The subpartition this view belongs to. */ - private final ResultSubpartition parent; - - /** The synchronous file reader to do the actual I/O. */ - private final BufferFileReader fileReader; - - /** The buffer pool to read data into. */ - private final SpillReadBufferPool bufferPool; - - /** Flag indicating whether all resources have been released. */ - private AtomicBoolean isReleased = new AtomicBoolean(); - - /** Spilled file size */ - private final long fileSize; - - SpilledSubpartitionViewSyncIO( - ResultSubpartition parent, - int memorySegmentSize, - FileIOChannel.ID channelId, - long initialSeekPosition) throws IOException { - - checkArgument(initialSeekPosition >= 0, "Initial seek position is < 0."); - - this.parent = checkNotNull(parent); - - this.bufferPool = new SpillReadBufferPool(2, memorySegmentSize); - - this.fileReader = new SynchronousBufferFileReader(channelId, false); - - if (initialSeekPosition > 0) { - fileReader.seekToPosition(initialSeekPosition); - } - - this.fileSize = fileReader.getSize(); - } - - @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { - - if (fileReader.hasReachedEndOfFile()) { - return null; - } - - // It's OK to request the buffer in a blocking fashion as the buffer pool is NOT shared - // among all consumed subpartitions. - final Buffer buffer = bufferPool.requestBufferBlocking(); - - fileReader.readInto(buffer); - - return buffer; - } - - @Override - public boolean registerListener(NotificationListener listener) throws IOException { - return false; - } - - @Override - public void notifySubpartitionConsumed() throws IOException { - parent.onConsumedSubpartition(); - } - - @Override - public void releaseAllResources() throws IOException { - if (isReleased.compareAndSet(false, true)) { - fileReader.close(); - bufferPool.destroy(); - } - } - - @Override - public boolean isReleased() { - return parent.isReleased() || isReleased.get(); - } - - @Override - public Throwable getFailureCause() { - return parent.getFailureCause(); - } - - @Override - public String toString() { - return String.format("SpilledSubpartitionView[sync](index: %d, file size: %d bytes) of ResultPartition %s", - parent.index, - fileSize, - parent.parent.getPartitionId()); - } - - /** - * A buffer pool to provide buffer to read the file into. - * - * <p> This pool ensures that a consuming input gate makes progress in all cases, even when all - * buffers of the input gate buffer pool have been requested by remote input channels. - * - * TODO Replace with asynchronous buffer pool request as this introduces extra buffers per - * consumed subpartition. - */ - private static class SpillReadBufferPool implements BufferRecycler { - - private final Queue<Buffer> buffers; - - private boolean isDestroyed; - - public SpillReadBufferPool(int numberOfBuffers, int memorySegmentSize) { - this.buffers = new ArrayDeque<Buffer>(numberOfBuffers); - - synchronized (buffers) { - for (int i = 0; i < numberOfBuffers; i++) { - buffers.add(new Buffer(MemorySegmentFactory.allocateUnpooledSegment(memorySegmentSize), this)); - } - } - } - - @Override - public void recycle(MemorySegment memorySegment) { - synchronized (buffers) { - if (isDestroyed) { - memorySegment.free(); - } - else { - buffers.add(new Buffer(memorySegment, this)); - buffers.notifyAll(); - } - } - } - - private Buffer requestBufferBlocking() throws InterruptedException { - synchronized (buffers) { - while (true) { - if (isDestroyed) { - return null; - } - - Buffer buffer = buffers.poll(); - - if (buffer != null) { - return buffer; - } - // Else: wait for a buffer - buffers.wait(); - } - } - } - - private void destroy() { - synchronized (buffers) { - isDestroyed = true; - buffers.notifyAll(); - } - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java index 885e738..3e93ae6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java @@ -34,18 +34,35 @@ public class BufferOrEvent { private final AbstractEvent event; + /** + * Indicate availability of further instances for the union input gate. + * This is not needed outside of the input gate unioning logic and cannot + * be set outside of the consumer package. + */ + private final boolean moreAvailable; + private int channelIndex; - public BufferOrEvent(Buffer buffer, int channelIndex) { + BufferOrEvent(Buffer buffer, int channelIndex, boolean moreAvailable) { this.buffer = checkNotNull(buffer); this.event = null; this.channelIndex = channelIndex; + this.moreAvailable = moreAvailable; } - public BufferOrEvent(AbstractEvent event, int channelIndex) { + BufferOrEvent(AbstractEvent event, int channelIndex, boolean moreAvailable) { this.buffer = null; this.event = checkNotNull(event); this.channelIndex = channelIndex; + this.moreAvailable = moreAvailable; + } + + public BufferOrEvent(Buffer buffer, int channelIndex) { + this(buffer, channelIndex, true); + } + + public BufferOrEvent(AbstractEvent event, int channelIndex) { + this(event, channelIndex, true); } public boolean isBuffer() { @@ -73,6 +90,10 @@ public class BufferOrEvent { this.channelIndex = channelIndex; } + boolean moreAvailable() { + return moreAvailable; + } + @Override public String toString() { return String.format("BufferOrEvent [%s, channelIndex = %d]", http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java index 5d82903..a48bfaf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java @@ -101,10 +101,19 @@ public abstract class InputChannel { } /** - * Notifies the owning {@link SingleInputGate} about an available {@link Buffer} instance. + * Notifies the owning {@link SingleInputGate} that this channel became non-empty. + * + * <p>This is guaranteed to be called only when a Buffer was added to a previously + * empty input channel. The notion of empty is atomically consistent with the flag + * {@link BufferAndAvailability#moreAvailable()} when polling the next buffer + * from this channel. + * + * <p><b>Note:</b> When the input channel observes an exception, this + * method is called regardless of whether the channel was empty before. That ensures + * that the parent InputGate will always be notified about the exception. */ - protected void notifyAvailableBuffer() { - inputGate.onAvailableBuffer(this); + protected void notifyChannelNonEmpty() { + inputGate.notifyChannelNonEmpty(this); } // ------------------------------------------------------------------------ @@ -123,7 +132,7 @@ public abstract class InputChannel { /** * Returns the next buffer from the consumed subpartition. */ - abstract Buffer getNextBuffer() throws IOException, InterruptedException; + abstract BufferAndAvailability getNextBuffer() throws IOException, InterruptedException; // ------------------------------------------------------------------------ // Task events @@ -182,7 +191,7 @@ public abstract class InputChannel { protected void setError(Throwable cause) { if (this.cause.compareAndSet(null, checkNotNull(cause))) { // Notify the input gate. - notifyAvailableBuffer(); + notifyChannelNonEmpty(); } } @@ -225,4 +234,28 @@ public abstract class InputChannel { // Reached maximum backoff return false; } + + // ------------------------------------------------------------------------ + + /** + * A combination of a {@link Buffer} and a flag indicating availability of further buffers. + */ + public static final class BufferAndAvailability { + + private final Buffer buffer; + private final boolean moreAvailable; + + public BufferAndAvailability(Buffer buffer, boolean moreAvailable) { + this.buffer = checkNotNull(buffer); + this.moreAvailable = moreAvailable; + } + + public Buffer buffer() { + return buffer; + } + + public boolean moreAvailable() { + return moreAvailable; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java index 1cd5fc5..1f2182e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.runtime.event.TaskEvent; -import org.apache.flink.runtime.util.event.EventListener; import java.io.IOException; @@ -77,7 +76,7 @@ public interface InputGate { void sendTaskEvent(TaskEvent event) throws IOException; - void registerListener(EventListener<InputGate> listener); + void registerListener(InputGateListener listener); int getPageSize(); } http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java new file mode 100644 index 0000000..00fa782 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +/** + * Listener interface implemented by consumers of {@link InputGate} instances + * that want to be notified of availability of buffer or event instances. + */ +public interface InputGateListener { + + /** + * Notification callback if the input gate moves from zero to non-zero + * available input channels with data. + * + * @param inputGate Input Gate that became available. + */ + void notifyInputGateNonEmpty(InputGate inputGate); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index 6fcd2f9..b34dbff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -18,18 +18,16 @@ package org.apache.flink.runtime.io.network.partition.consumer; -import org.apache.flink.runtime.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.TaskEventDispatcher; -import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; -import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; import org.apache.flink.runtime.io.network.partition.ProducerFailedException; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; -import org.apache.flink.runtime.util.event.NotificationListener; +import org.apache.flink.runtime.metrics.groups.IOMetricGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Tuple2; @@ -37,6 +35,7 @@ import scala.Tuple2; import java.io.IOException; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicLong; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -44,11 +43,13 @@ import static org.apache.flink.util.Preconditions.checkState; /** * An input channel, which requests a local subpartition. */ -public class LocalInputChannel extends InputChannel implements NotificationListener { +public class LocalInputChannel extends InputChannel implements BufferAvailabilityListener { private static final Logger LOG = LoggerFactory.getLogger(LocalInputChannel.class); - private final Object requestLock = new Object(); + // ------------------------------------------------------------------------ + + private final Object requestReleaseLock = new Object(); /** The local partition manager. */ private final ResultPartitionManager partitionManager; @@ -56,38 +57,40 @@ public class LocalInputChannel extends InputChannel implements NotificationListe /** Task event dispatcher for backwards events. */ private final TaskEventDispatcher taskEventDispatcher; + /** Number of available buffers used to keep track of non-empty gate notifications. */ + private final AtomicLong numBuffersAvailable; + /** The consumed subpartition */ private volatile ResultSubpartitionView subpartitionView; private volatile boolean isReleased; - private volatile Buffer lookAhead; - LocalInputChannel( - SingleInputGate inputGate, - int channelIndex, - ResultPartitionID partitionId, - ResultPartitionManager partitionManager, - TaskEventDispatcher taskEventDispatcher, - IOMetricGroup metrics) { + SingleInputGate inputGate, + int channelIndex, + ResultPartitionID partitionId, + ResultPartitionManager partitionManager, + TaskEventDispatcher taskEventDispatcher, + IOMetricGroup metrics) { this(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher, - new Tuple2<Integer, Integer>(0, 0), metrics); + new Tuple2<Integer, Integer>(0, 0), metrics); } LocalInputChannel( - SingleInputGate inputGate, - int channelIndex, - ResultPartitionID partitionId, - ResultPartitionManager partitionManager, - TaskEventDispatcher taskEventDispatcher, - Tuple2<Integer, Integer> initialAndMaxBackoff, - IOMetricGroup metrics) { + SingleInputGate inputGate, + int channelIndex, + ResultPartitionID partitionId, + ResultPartitionManager partitionManager, + TaskEventDispatcher taskEventDispatcher, + Tuple2<Integer, Integer> initialAndMaxBackoff, + IOMetricGroup metrics) { super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, metrics.getNumBytesInLocalCounter()); this.partitionManager = checkNotNull(partitionManager); this.taskEventDispatcher = checkNotNull(taskEventDispatcher); + this.numBuffersAvailable = new AtomicLong(); } // ------------------------------------------------------------------------ @@ -97,30 +100,36 @@ public class LocalInputChannel extends InputChannel implements NotificationListe @Override void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException { // The lock is required to request only once in the presence of retriggered requests. - synchronized (requestLock) { + synchronized (requestReleaseLock) { + checkState(!isReleased, "released"); + if (subpartitionView == null) { LOG.debug("{}: Requesting LOCAL subpartition {} of partition {}.", - this, subpartitionIndex, partitionId); + this, subpartitionIndex, partitionId); try { - subpartitionView = partitionManager.createSubpartitionView( - partitionId, subpartitionIndex, inputGate.getBufferProvider()); - } - catch (PartitionNotFoundException notFound) { + ResultSubpartitionView subpartitionView = partitionManager.createSubpartitionView( + partitionId, subpartitionIndex, inputGate.getBufferProvider(), this); + + if (subpartitionView == null) { + throw new IOException("Error requesting subpartition."); + } + + // make the subpartition view visible + this.subpartitionView = subpartitionView; + + // check if the channel was released in the meantime + if (isReleased) { + subpartitionView.releaseAllResources(); + this.subpartitionView = null; + } + } catch (PartitionNotFoundException notFound) { if (increaseBackoff()) { inputGate.retriggerPartitionRequest(partitionId.getPartitionId()); - return; - } - else { + } else { throw notFound; } } - - if (subpartitionView == null) { - throw new IOException("Error requesting subpartition."); - } - - getNextLookAhead(); } } } @@ -128,17 +137,16 @@ public class LocalInputChannel extends InputChannel implements NotificationListe /** * Retriggers a subpartition request. */ - void retriggerSubpartitionRequest(Timer timer, final int subpartitionIndex) throws IOException, InterruptedException { - synchronized (requestLock) { - checkState(subpartitionView == null, "Already requested partition."); + void retriggerSubpartitionRequest(Timer timer, final int subpartitionIndex) { + synchronized (requestReleaseLock) { + checkState(subpartitionView == null, "already requested partition"); timer.schedule(new TimerTask() { @Override public void run() { try { requestSubpartition(subpartitionIndex); - } - catch (Throwable t) { + } catch (Throwable t) { setError(t); } } @@ -147,29 +155,49 @@ public class LocalInputChannel extends InputChannel implements NotificationListe } @Override - Buffer getNextBuffer() throws IOException, InterruptedException { + BufferAndAvailability getNextBuffer() throws IOException, InterruptedException { checkError(); - checkState(subpartitionView != null, "Queried for a buffer before requesting the subpartition."); - // After subscribe notification - if (lookAhead == null) { - lookAhead = subpartitionView.getNextBuffer(); + ResultSubpartitionView subpartitionView = this.subpartitionView; + if (subpartitionView == null) { + // this can happen if the request for the partition was triggered asynchronously + // by the time trigger + // would be good to avoid that, by guaranteeing that the requestPartition() and + // getNextBuffer() always come from the same thread + // we could do that by letting the timer insert a special "requesting channel" into the input gate's queue + subpartitionView = checkAndWaitForSubpartitionView(); } - Buffer next = lookAhead; - lookAhead = null; + Buffer next = subpartitionView.getNextBuffer(); + long remaining = numBuffersAvailable.decrementAndGet(); - if (!next.isBuffer() && EventSerializer - .fromBuffer(next, getClass().getClassLoader()) - .getClass() == EndOfPartitionEvent.class) { - - return next; + if (remaining >= 0) { + numBytesIn.inc(next.getSize()); + return new BufferAndAvailability(next, remaining > 0); + } else if (subpartitionView.isReleased()) { + throw new ProducerFailedException(subpartitionView.getFailureCause()); + } else { + throw new IllegalStateException("No buffer available and producer partition not released."); } + } - getNextLookAhead(); + @Override + public void notifyBuffersAvailable(long numBuffers) { + // if this request made the channel non-empty, notify the input gate + if (numBuffers > 0 && numBuffersAvailable.getAndAdd(numBuffers) == 0) { + notifyChannelNonEmpty(); + } + } - numBytesIn.inc(next.getSize()); - return next; + private ResultSubpartitionView checkAndWaitForSubpartitionView() { + // synchronizing on the request lock means this blocks until the asynchronous request + // for the partition view has been completed + // by then the subpartition view is visible or the channel is released + synchronized (requestReleaseLock) { + checkState(!isReleased, "released"); + checkState(subpartitionView != null, "Queried for a buffer before requesting the subpartition."); + return subpartitionView; + } } // ------------------------------------------------------------------------ @@ -208,18 +236,15 @@ public class LocalInputChannel extends InputChannel implements NotificationListe */ @Override void releaseAllResources() throws IOException { - if (!isReleased) { - if (lookAhead != null) { - lookAhead.recycle(); - lookAhead = null; - } + synchronized (requestReleaseLock) { + if (!isReleased) { + isReleased = true; - if (subpartitionView != null) { - subpartitionView.releaseAllResources(); - subpartitionView = null; + if (subpartitionView != null) { + subpartitionView.releaseAllResources(); + subpartitionView = null; + } } - - isReleased = true; } } @@ -227,55 +252,4 @@ public class LocalInputChannel extends InputChannel implements NotificationListe public String toString() { return "LocalInputChannel [" + partitionId + "]"; } - - // ------------------------------------------------------------------------ - // Queue iterator listener (called by producing or disk I/O thread) - // ------------------------------------------------------------------------ - - @Override - public void onNotification() { - if (isReleased) { - return; - } - - try { - getNextLookAhead(); - } - catch (Exception e) { - throw new RuntimeException(e); - } - } - - // ------------------------------------------------------------------------ - - private void getNextLookAhead() throws IOException, InterruptedException { - - final ResultSubpartitionView view = subpartitionView; - - if (view == null) { - return; - } - - while (true) { - lookAhead = view.getNextBuffer(); - - if (lookAhead != null) { - notifyAvailableBuffer(); - break; - } - - if (view.registerListener(this)) { - return; - } - else if (view.isReleased()) { - Throwable cause = view.getFailureCause(); - - if (cause != null) { - setError(new ProducerFailedException(cause)); - } - - return; - } - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 1cd042c..4dc159b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.io.network.partition.consumer; -import org.apache.flink.runtime.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; @@ -27,8 +26,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.netty.PartitionRequestClient; import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.flink.runtime.metrics.groups.IOMetricGroup; import scala.Tuple2; import java.io.IOException; @@ -44,8 +42,6 @@ import static org.apache.flink.util.Preconditions.checkState; */ public class RemoteInputChannel extends InputChannel { - private static final Logger LOG = LoggerFactory.getLogger(RemoteInputChannel.class); - /** ID to distinguish this channel from other channels sharing the same TCP connection. */ private final InputChannelID id = new InputChannelID(); @@ -77,25 +73,25 @@ public class RemoteInputChannel extends InputChannel { private int expectedSequenceNumber = 0; public RemoteInputChannel( - SingleInputGate inputGate, - int channelIndex, - ResultPartitionID partitionId, - ConnectionID connectionId, - ConnectionManager connectionManager, - IOMetricGroup metrics) { + SingleInputGate inputGate, + int channelIndex, + ResultPartitionID partitionId, + ConnectionID connectionId, + ConnectionManager connectionManager, + IOMetricGroup metrics) { this(inputGate, channelIndex, partitionId, connectionId, connectionManager, - new Tuple2<Integer, Integer>(0, 0), metrics); + new Tuple2<Integer, Integer>(0, 0), metrics); } public RemoteInputChannel( - SingleInputGate inputGate, - int channelIndex, - ResultPartitionID partitionId, - ConnectionID connectionId, - ConnectionManager connectionManager, - Tuple2<Integer, Integer> initialAndMaxBackoff, - IOMetricGroup metrics) { + SingleInputGate inputGate, + int channelIndex, + ResultPartitionID partitionId, + ConnectionID connectionId, + ConnectionManager connectionManager, + Tuple2<Integer, Integer> initialAndMaxBackoff, + IOMetricGroup metrics) { super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, metrics.getNumBytesInRemoteCounter()); @@ -115,7 +111,7 @@ public class RemoteInputChannel extends InputChannel { if (partitionRequestClient == null) { // Create a client and request the partition partitionRequestClient = connectionManager - .createPartitionRequestClient(connectionId); + .createPartitionRequestClient(connectionId); partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this, 0); } @@ -129,31 +125,29 @@ public class RemoteInputChannel extends InputChannel { if (increaseBackoff()) { partitionRequestClient.requestSubpartition( - partitionId, subpartitionIndex, this, getCurrentBackoff()); - } - else { + partitionId, subpartitionIndex, this, getCurrentBackoff()); + } else { failPartitionRequest(); } } @Override - Buffer getNextBuffer() throws IOException { + BufferAndAvailability getNextBuffer() throws IOException { checkState(!isReleased.get(), "Queried for a buffer after channel has been closed."); checkState(partitionRequestClient != null, "Queried for a buffer before requesting a queue."); checkError(); - synchronized (receivedBuffers) { - Buffer buffer = receivedBuffers.poll(); - - // Sanity check that channel is only queried after a notification - if (buffer == null) { - throw new IOException("Queried input channel for data although non is available."); - } + final Buffer next; + final int remaining; - numBytesIn.inc(buffer.getSize()); - return buffer; + synchronized (receivedBuffers) { + next = receivedBuffers.poll(); + remaining = receivedBuffers.size(); } + + numBytesIn.inc(next.getSize()); + return new BufferAndAvailability(next, remaining > 0); } // ------------------------------------------------------------------------ @@ -201,8 +195,7 @@ public class RemoteInputChannel extends InputChannel { // buffers received concurrently with closing are properly recycled. if (partitionRequestClient != null) { partitionRequestClient.close(this); - } - else { + } else { connectionManager.closeOpenChannelConnections(connectionId); } } @@ -246,20 +239,22 @@ public class RemoteInputChannel extends InputChannel { synchronized (receivedBuffers) { if (!isReleased.get()) { if (expectedSequenceNumber == sequenceNumber) { + int available = receivedBuffers.size(); + receivedBuffers.add(buffer); expectedSequenceNumber++; - notifyAvailableBuffer(); + if (available == 0) { + notifyChannelNonEmpty(); + } success = true; - } - else { + } else { onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber)); } } } - } - finally { + } finally { if (!success) { buffer.recycle(); } @@ -271,8 +266,7 @@ public class RemoteInputChannel extends InputChannel { if (!isReleased.get()) { if (expectedSequenceNumber == sequenceNumber) { expectedSequenceNumber++; - } - else { + } else { onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber)); } } @@ -303,7 +297,7 @@ public class RemoteInputChannel extends InputChannel { @Override public String getMessage() { return String.format("Buffer re-ordering: expected buffer with sequence number %d, but received %d.", - expectedSequenceNumber, actualSequenceNumber); + expectedSequenceNumber, actualSequenceNumber); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 212aade..105d28b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.partition.consumer; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionLocation; @@ -36,23 +35,22 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; -import org.apache.flink.runtime.util.event.EventListener; +import org.apache.flink.runtime.metrics.groups.IOMetricGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; import java.util.List; import java.util.Map; import java.util.Timer; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -137,7 +135,7 @@ public class SingleInputGate implements InputGate { private final Map<IntermediateResultPartitionID, InputChannel> inputChannels; /** Channels, which notified this input gate about available data. */ - private final BlockingQueue<InputChannel> inputChannelsWithData = new LinkedBlockingQueue<InputChannel>(); + private final ArrayDeque<InputChannel> inputChannelsWithData = new ArrayDeque<>(); private final BitSet channelsWithEndOfPartitionEvents; @@ -159,9 +157,9 @@ public class SingleInputGate implements InputGate { private volatile boolean isReleased; /** Registered listener to forward buffer notifications to. */ - private volatile EventListener<InputGate> registeredListener; + private volatile InputGateListener inputGateListener; - private final List<TaskEvent> pendingEvents = new ArrayList<TaskEvent>(); + private final List<TaskEvent> pendingEvents = new ArrayList<>(); private int numberOfUninitializedChannels; @@ -213,6 +211,10 @@ public class SingleInputGate implements InputGate { return bufferPool; } + public BufferPool getBufferPool() { + return bufferPool; + } + @Override public int getPageSize() { if (bufferPool != null) { @@ -240,7 +242,7 @@ public class SingleInputGate implements InputGate { this.bufferPool = checkNotNull(bufferPool); } - public void setInputChannel(IntermediateResultPartitionID partitionId, InputChannel inputChannel) { + void setInputChannel(IntermediateResultPartitionID partitionId, InputChannel inputChannel) { synchronized (requestLock) { if (inputChannels.put(checkNotNull(partitionId), checkNotNull(inputChannel)) == null && inputChannel.getClass() == UnknownInputChannel.class) { @@ -332,6 +334,7 @@ public class SingleInputGate implements InputGate { } public void releaseAllResources() throws IOException { + boolean released = false; synchronized (requestLock) { if (!isReleased) { try { @@ -358,9 +361,16 @@ public class SingleInputGate implements InputGate { } finally { isReleased = true; + released = true; } } } + + if (released) { + synchronized (inputChannelsWithData) { + inputChannelsWithData.notifyAll(); + } + } } @Override @@ -406,32 +416,50 @@ public class SingleInputGate implements InputGate { @Override public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException { - if (hasReceivedAllEndOfPartitionEvents) { return null; } + if (isReleased) { + throw new IllegalStateException("Released"); + } + requestPartitions(); - InputChannel currentChannel = null; - while (currentChannel == null) { - if (isReleased) { - throw new IllegalStateException("Released"); + InputChannel currentChannel; + boolean moreAvailable; + + synchronized (inputChannelsWithData) { + while (inputChannelsWithData.size() == 0) { + if (isReleased) { + throw new IllegalStateException("Released"); + } + + inputChannelsWithData.wait(); } - currentChannel = inputChannelsWithData.poll(2, TimeUnit.SECONDS); + currentChannel = inputChannelsWithData.remove(); + moreAvailable = inputChannelsWithData.size() > 0; } - final Buffer buffer = currentChannel.getNextBuffer(); + final BufferAndAvailability result = currentChannel.getNextBuffer(); // Sanity check that notifications only happen when data is available - if (buffer == null) { + if (result == null) { throw new IllegalStateException("Bug in input gate/channel logic: input gate got " + "notified by channel about available data, but none was available."); } + // this channel was now removed from the non-empty channels queue + // we re-add it in case it has more data, because in that case no "non-empty" notification + // will come for that channel + if (result.moreAvailable()) { + queueChannel(currentChannel); + } + + final Buffer buffer = result.buffer(); if (buffer.isBuffer()) { - return new BufferOrEvent(buffer, currentChannel.getChannelIndex()); + return new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable); } else { final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader()); @@ -448,7 +476,7 @@ public class SingleInputGate implements InputGate { currentChannel.releaseAllResources(); } - return new BufferOrEvent(event, currentChannel.getChannelIndex()); + return new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable); } } @@ -470,21 +498,16 @@ public class SingleInputGate implements InputGate { // ------------------------------------------------------------------------ @Override - public void registerListener(EventListener<InputGate> listener) { - if (registeredListener == null) { - registeredListener = listener; - } - else { + public void registerListener(InputGateListener inputGateListener) { + if (this.inputGateListener == null) { + this.inputGateListener = inputGateListener; + } else { throw new IllegalStateException("Multiple listeners"); } } - public void onAvailableBuffer(InputChannel channel) { - inputChannelsWithData.add(channel); - EventListener<InputGate> listener = registeredListener; - if (listener != null) { - listener.onEvent(this); - } + void notifyChannelNonEmpty(InputChannel channel) { + queueChannel(checkNotNull(channel)); } void triggerPartitionStateCheck(ResultPartitionID partitionId) { @@ -495,6 +518,27 @@ public class SingleInputGate implements InputGate { partitionId); } + private void queueChannel(InputChannel channel) { + int availableChannels; + + synchronized (inputChannelsWithData) { + availableChannels = inputChannelsWithData.size(); + + inputChannelsWithData.add(channel); + + if (availableChannels == 0) { + inputChannelsWithData.notify(); + } + } + + if (availableChannels == 0) { + InputGateListener listener = inputGateListener; + if (listener != null) { + listener.notifyInputGateNonEmpty(this); + } + } + } + // ------------------------------------------------------------------------ @VisibleForTesting http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java index b1b8911..e8ccbb4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java @@ -22,15 +22,11 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; -import org.apache.flink.runtime.util.event.EventListener; import java.io.IOException; -import java.util.List; +import java.util.ArrayDeque; import java.util.Map; import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.LinkedBlockingQueue; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -63,19 +59,22 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * * It is possible to recursively union union input gates. */ -public class UnionInputGate implements InputGate { +public class UnionInputGate implements InputGate, InputGateListener { /** The input gates to union. */ private final InputGate[] inputGates; private final Set<InputGate> inputGatesWithRemainingData; - /** Data availability listener across all unioned input gates. */ - private final InputGateListener inputGateListener; + /** Gates, which notified this input gate about available data. */ + private final ArrayDeque<InputGate> inputGatesWithData = new ArrayDeque<>(); /** The total number of input channels across all unioned input gates. */ private final int totalNumberOfInputChannels; + /** Registered listener to forward input gate notifications to. */ + private volatile InputGateListener inputGateListener; + /** * A mapping from input gate to (logical) channel index offset. Valid channel indexes go from 0 * (inclusive) to the total number of input channels (exclusive). @@ -100,11 +99,12 @@ public class UnionInputGate implements InputGate { inputGatesWithRemainingData.add(inputGate); currentNumberOfInputChannels += inputGate.getNumberOfInputChannels(); + + // Register the union gate as a listener for all input gates + inputGate.registerListener(this); } this.totalNumberOfInputChannels = currentNumberOfInputChannels; - - this.inputGateListener = new InputGateListener(inputGates, this); } /** @@ -139,7 +139,6 @@ public class UnionInputGate implements InputGate { @Override public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException { - if (inputGatesWithRemainingData.isEmpty()) { return null; } @@ -147,17 +146,31 @@ public class UnionInputGate implements InputGate { // Make sure to request the partitions, if they have not been requested before. requestPartitions(); - final InputGate inputGate = inputGateListener.getNextInputGateToReadFrom(); + final InputGate inputGate; + synchronized (inputGatesWithData) { + while (inputGatesWithData.size() == 0) { + inputGatesWithData.wait(); + } + + inputGate = inputGatesWithData.remove(); + } final BufferOrEvent bufferOrEvent = inputGate.getNextBufferOrEvent(); + if (bufferOrEvent.moreAvailable()) { + // this buffer or event was now removed from the non-empty gates queue + // we re-add it in case it has more data, because in that case no "non-empty" notification + // will come for that gate + queueInputGate(inputGate); + } + if (bufferOrEvent.isEvent() - && bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class - && inputGate.isFinished()) { + && bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class + && inputGate.isFinished()) { if (!inputGatesWithRemainingData.remove(inputGate)) { throw new IllegalStateException("Couldn't find input gate in set of remaining " + - "input gates."); + "input gates."); } } @@ -177,9 +190,12 @@ public class UnionInputGate implements InputGate { } @Override - public void registerListener(EventListener<InputGate> listener) { - // This method is called from the consuming task thread. - inputGateListener.registerListener(listener); + public void registerListener(InputGateListener listener) { + if (this.inputGateListener == null) { + this.inputGateListener = listener; + } else { + throw new IllegalStateException("Multiple listeners"); + } } @Override @@ -195,45 +211,29 @@ public class UnionInputGate implements InputGate { return pageSize; } - /** - * Data availability listener at all unioned input gates. - * - * <p> The listener registers itself at each input gate and is notified for *each incoming - * buffer* at one of the unioned input gates. - */ - private static class InputGateListener implements EventListener<InputGate> { - - private final UnionInputGate unionInputGate; - - private final BlockingQueue<InputGate> inputGatesWithData = new LinkedBlockingQueue<InputGate>(); + @Override + public void notifyInputGateNonEmpty(InputGate inputGate) { + queueInputGate(checkNotNull(inputGate)); + } - private final List<EventListener<InputGate>> registeredListeners = new CopyOnWriteArrayList<EventListener<InputGate>>(); + private void queueInputGate(InputGate inputGate) { + int availableInputGates; - public InputGateListener(InputGate[] inputGates, UnionInputGate unionInputGate) { - for (InputGate inputGate : inputGates) { - inputGate.registerListener(this); - } + synchronized (inputGatesWithData) { + availableInputGates = inputGatesWithData.size(); - this.unionInputGate = unionInputGate; - } - - @Override - public void onEvent(InputGate inputGate) { - // This method is called from the input channel thread, which can be either the same - // thread as the consuming task thread or a different one. inputGatesWithData.add(inputGate); - for (int i = 0; i < registeredListeners.size(); i++) { - registeredListeners.get(i).onEvent(unionInputGate); + if (availableInputGates == 0) { + inputGatesWithData.notify(); } } - InputGate getNextInputGateToReadFrom() throws InterruptedException { - return inputGatesWithData.take(); - } - - public void registerListener(EventListener<InputGate> listener) { - registeredListeners.add(checkNotNull(listener)); + if (availableInputGates == 0) { + InputGateListener listener = inputGateListener; + if (listener != null) { + listener.notifyInputGateNonEmpty(this); + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java index cc91e83..0c02112 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java @@ -23,8 +23,6 @@ import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; import org.apache.flink.runtime.io.network.TaskEventDispatcher; -import org.apache.flink.runtime.io.network.api.reader.BufferReader; -import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import scala.Tuple2; @@ -75,9 +73,9 @@ public class UnknownInputChannel extends InputChannel { } @Override - public Buffer getNextBuffer() throws IOException { + public BufferAndAvailability getNextBuffer() throws IOException { // Nothing to do here - return null; + throw new UnsupportedOperationException("Cannot retrieve a buffer from an UnknownInputChannel"); } @Override @@ -90,8 +88,7 @@ public class UnknownInputChannel extends InputChannel { * <p> * <strong>Important</strong>: It is important that the method correctly * always <code>false</code> for unknown input channels in order to not - * finish the consumption of an intermediate result partition early in - * {@link BufferReader}. + * finish the consumption of an intermediate result partition early. */ @Override public boolean isReleased() { http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 514a8d2..9565666 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -337,7 +337,6 @@ public class Task implements Runnable { networkEnvironment.getPartitionManager(), networkEnvironment.getPartitionConsumableNotifier(), ioManager, - networkEnvironment.getDefaultIOMode(), desc.sendScheduleOrUpdateConsumersMessage()); writers[counter] = new ResultPartitionWriter(producedPartitions[counter]);
