This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c15ba1cd5a97ced3bb6411d588c3fb68df8a2869 Author: Nico Kruber <[email protected]> AuthorDate: Tue Sep 18 12:10:21 2018 +0200 [FLINK-10332][network] move data notification out of the synchronized block None of the notifications actually rely on being under the lock and may thus only cause lock contention. This closes #6693. --- .../network/partition/PipelinedSubpartition.java | 44 +++++++++++----------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index d2d7fdb..fe27d97 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -88,6 +88,7 @@ class PipelinedSubpartition extends ResultSubpartition { private boolean add(BufferConsumer bufferConsumer, boolean finish) { checkNotNull(bufferConsumer); + final boolean notifyDataAvailable; synchronized (buffers) { if (isFinished || isReleased) { bufferConsumer.close(); @@ -98,14 +99,13 @@ class PipelinedSubpartition extends ResultSubpartition { buffers.add(bufferConsumer); updateStatistics(bufferConsumer); increaseBuffersInBacklog(bufferConsumer); + notifyDataAvailable = shouldNotifyDataAvailable() || finish; - if (finish) { - isFinished = true; - notifyDataAvailable(); - } - else { - maybeNotifyDataAvailable(); - } + isFinished |= finish; + } + + if (notifyDataAvailable) { + notifyDataAvailable(); } return true; @@ -220,6 +220,7 @@ class PipelinedSubpartition extends ResultSubpartition { @Override public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException { + final boolean notifyDataAvailable; synchronized (buffers) { checkState(!isReleased); checkState(readView == null, @@ -230,9 +231,10 @@ class PipelinedSubpartition extends ResultSubpartition { parent.getOwningTaskName(), index, parent.getPartitionId()); readView = new PipelinedSubpartitionView(this, availabilityListener); - if (!buffers.isEmpty()) { - notifyDataAvailable(); - } + notifyDataAvailable = !buffers.isEmpty(); + } + if (notifyDataAvailable) { + notifyDataAvailable(); } return readView; @@ -283,26 +285,24 @@ class PipelinedSubpartition extends ResultSubpartition { @Override public void flush() { + final boolean notifyDataAvailable; synchronized (buffers) { if (buffers.isEmpty()) { return; } - if (!flushRequested) { - flushRequested = true; // set this before the notification! - // if there is more then 1 buffer, we already notified the reader - // (at the latest when adding the second buffer) - if (buffers.size() == 1) { - notifyDataAvailable(); - } - } + // if there is more then 1 buffer, we already notified the reader + // (at the latest when adding the second buffer) + notifyDataAvailable = !flushRequested && buffers.size() == 1; + flushRequested = true; + } + if (notifyDataAvailable) { + notifyDataAvailable(); } } - private void maybeNotifyDataAvailable() { + private boolean shouldNotifyDataAvailable() { // Notify only when we added first finished buffer. - if (getNumberOfFinishedBuffers() == 1) { - notifyDataAvailable(); - } + return readView != null && !flushRequested && getNumberOfFinishedBuffers() == 1; } private void notifyDataAvailable() {
