This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6c7603724be6c146cfc8019b77b60236d1d3d582 Author: Nico Kruber <[email protected]> AuthorDate: Tue Aug 21 17:34:49 2018 +0200 [FLINK-10141][network] move notifications outside the bufferQueue and receivedBuffers locks This means, notifyCreditAvailable() and notifyChannelNonEmpty() are called without any locks being acquired. The change is safe since these callbacks do their own synchronization already (if needed). This closes #6553. --- .../partition/consumer/RemoteInputChannel.java | 62 +++++++++++++--------- 1 file changed, 37 insertions(+), 25 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 79d25c6..c4954c0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -306,8 +306,8 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler, int numAddedBuffers; synchronized (bufferQueue) { - // Important: check the isReleased state inside synchronized block, so there is no - // race condition when recycle and releaseAllResources running in parallel. + // Similar to notifyBufferAvailable(), make sure that we never add a buffer + // after releaseAllResources() released all buffers (see below for details). if (isReleased.get()) { try { inputGate.returnExclusiveSegments(Collections.singletonList(segment)); @@ -368,8 +368,13 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler, checkState(isWaitingForFloatingBuffers, "This channel should be waiting for floating buffers."); - // Important: double check the isReleased state inside synchronized block, so there is no - // race condition when notifyBufferAvailable and releaseAllResources running in parallel. + // Important: make sure that we never add a buffer after releaseAllResources() + // released all buffers. Following scenarios exist: + // 1) releaseAllResources() already released buffers inside bufferQueue + // -> then isReleased is set correctly + // 2) releaseAllResources() did not yet release buffers from bufferQueue + // -> we may or may not have set isReleased yet but will always wait for the + // lock on bufferQueue to release buffers if (isReleased.get() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) { isWaitingForFloatingBuffers = false; recycleBuffer = false; // just in case @@ -385,10 +390,10 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler, } else { needMoreBuffers = true; } + } - if (unannouncedCredit.getAndAdd(1) == 0) { - notifyCreditAvailable(); - } + if (unannouncedCredit.getAndAdd(1) == 0) { + notifyCreditAvailable(); } return needMoreBuffers; @@ -484,8 +489,8 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler, int numRequestedBuffers = 0; synchronized (bufferQueue) { - // Important: check the isReleased state inside synchronized block, so there is no - // race condition when onSenderBacklog and releaseAllResources running in parallel. + // Similar to notifyBufferAvailable(), make sure that we never add a buffer + // after releaseAllResources() released all buffers (see above for details). if (isReleased.get()) { return; } @@ -510,33 +515,40 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler, } public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException { - boolean success = false; + boolean recycleBuffer = true; try { + + final boolean wasEmpty; synchronized (receivedBuffers) { - if (!isReleased.get()) { - if (expectedSequenceNumber == sequenceNumber) { - final boolean wasEmpty = receivedBuffers.isEmpty(); + // Similar to notifyBufferAvailable(), make sure that we never add a buffer + // after releaseAllResources() released all buffers from receivedBuffers + // (see above for details). + if (isReleased.get()) { + return; + } - receivedBuffers.add(buffer); - expectedSequenceNumber++; + if (expectedSequenceNumber != sequenceNumber) { + onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber)); + return; + } - if (wasEmpty) { - notifyChannelNonEmpty(); - } + wasEmpty = receivedBuffers.isEmpty(); + receivedBuffers.add(buffer); + recycleBuffer = false; + } - success = true; - } else { - onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber)); - } - } + ++expectedSequenceNumber; + + if (wasEmpty) { + notifyChannelNonEmpty(); } - if (success && backlog >= 0) { + if (backlog >= 0) { onSenderBacklog(backlog); } } finally { - if (!success) { + if (recycleBuffer) { buffer.recycleBuffer(); } }
