Repository: flink Updated Branches: refs/heads/master 38ab7164a -> 369837971
[FLINK-5326] [network] Check release flag of parent in reader In PipelinedSubpartitionView, there is a possible race with releasing the parent subpartition and querying for a buffer in the view. The parent partition release clears all buffers in locked scope and releases the view outside of the lock. If concurrently the view is queried for a buffer it might get null, which is only allowed if the view was released. Because the release is only forwarded out of the lock scope, this can happen before the release has propagated. As a solution, we check the parent release status as well in the view. This is how it is handled in the spilled views, too. This surfaced with the recent refactorings, because the previous consumption model required multiple rounds of get, registerListener, isReleased calls, which hid this problem. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d965d5ab Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d965d5ab Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d965d5ab Branch: refs/heads/master Commit: d965d5abdc389e9b65fd35a69bb16bfb71008504 Parents: 38ab716 Author: Ufuk Celebi <[email protected]> Authored: Tue Dec 13 11:26:47 2016 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Tue Dec 13 11:26:48 2016 +0100 ---------------------------------------------------------------------- .../partition/PipelinedSubpartitionView.java | 2 +- .../partition/PipelinedSubpartitionTest.java | 28 ++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d965d5ab/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java index 52c78ea..fda2135 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java @@ -70,7 +70,7 @@ class PipelinedSubpartitionView implements ResultSubpartitionView { @Override public boolean isReleased() { - return isReleased.get(); + return isReleased.get() || parent.isReleased(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/d965d5ab/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java index a56177e..a97e306 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java @@ -19,11 +19,14 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.util.TestConsumerCallback; +import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider; import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; import org.apache.flink.runtime.io.network.util.TestProducerSource; import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer; @@ -31,19 +34,25 @@ import org.apache.flink.runtime.io.network.util.TestSubpartitionProducer; import org.junit.AfterClass; import org.junit.Test; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import static org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class PipelinedSubpartitionTest extends SubpartitionTestBase { @@ -132,6 +141,25 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { testProduceConsume(true, true); } + /** + * Verifies that the isReleased() check of the view checks the parent + * subpartition. + */ + @Test + public void testIsReleasedChecksParent() throws Exception { + PipelinedSubpartition subpartition = mock(PipelinedSubpartition.class); + + PipelinedSubpartitionView reader = new PipelinedSubpartitionView( + subpartition, mock(BufferAvailabilityListener.class)); + + assertFalse(reader.isReleased()); + verify(subpartition, times(1)).isReleased(); + + when(subpartition.isReleased()).thenReturn(true); + assertTrue(reader.isReleased()); + verify(subpartition, times(2)).isReleased(); + } + private void testProduceConsume(boolean isSlowProducer, boolean isSlowConsumer) throws Exception { // Config final int producerBufferPoolSize = 8;
