Repository: flink Updated Branches: refs/heads/release-1.1 c0d5f1dc9 -> 9ed7752eb
[FLINK-5326] [network] Check release flag of parent in reader In PipelinedSubpartitionView, there is a possible race with releasing the parent subpartition and querying for a buffer in the view. The parent partition release clears all buffers in locked scope and releases the view outside of the lock. If concurrently the view is queried for a buffer it might get null, which is only allowed if the view was released. Because the release is only forwarded out of the lock scope, this can happen before the release has propagated. As a solution, we check the parent release status as well in the view. This is how it is handled in the spilled views, too. This surfaced with the recent refactorings, because the previous consumption model required multiple rounds of get, registerListener, isReleased calls, which hid this problem. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/04db15a3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/04db15a3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/04db15a3 Branch: refs/heads/release-1.1 Commit: 04db15a3e2e9a29992764cba1cd99f165e0d00b2 Parents: c0d5f1d Author: Ufuk Celebi <[email protected]> Authored: Tue Dec 13 11:26:47 2016 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Tue Dec 13 13:59:47 2016 +0100 ---------------------------------------------------------------------- .../partition/PipelinedSubpartitionView.java | 2 +- .../partition/PipelinedSubpartitionTest.java | 28 ++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/04db15a3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java index 52c78ea..fda2135 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java @@ -70,7 +70,7 @@ class PipelinedSubpartitionView implements ResultSubpartitionView { @Override public boolean isReleased() { - return isReleased.get(); + return isReleased.get() || parent.isReleased(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/04db15a3/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java index a56177e..a97e306 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java @@ -19,11 +19,14 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.util.TestConsumerCallback; +import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider; import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; import org.apache.flink.runtime.io.network.util.TestProducerSource; import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer; @@ -31,19 +34,25 @@ import org.apache.flink.runtime.io.network.util.TestSubpartitionProducer; import org.junit.AfterClass; import org.junit.Test; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import static org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class PipelinedSubpartitionTest extends SubpartitionTestBase { @@ -132,6 +141,25 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { testProduceConsume(true, true); } + /** + * Verifies that the isReleased() check of the view checks the parent + * subpartition. + */ + @Test + public void testIsReleasedChecksParent() throws Exception { + PipelinedSubpartition subpartition = mock(PipelinedSubpartition.class); + + PipelinedSubpartitionView reader = new PipelinedSubpartitionView( + subpartition, mock(BufferAvailabilityListener.class)); + + assertFalse(reader.isReleased()); + verify(subpartition, times(1)).isReleased(); + + when(subpartition.isReleased()).thenReturn(true); + assertTrue(reader.isReleased()); + verify(subpartition, times(2)).isReleased(); + } + private void testProduceConsume(boolean isSlowProducer, boolean isSlowConsumer) throws Exception { // Config final int producerBufferPoolSize = 8;
