Repository: flink
Updated Branches:
  refs/heads/release-1.1 c0d5f1dc9 -> 9ed7752eb


[FLINK-5326] [network] Check release flag of parent in reader

In PipelinedSubpartitionView, there is a possible race with
releasing the parent subpartition and querying for a buffer
in the view.

The parent partition release clears all buffers in locked
scope and releases the view outside of the lock. If concurrently
the view is queried for a buffer it might get null, which
is only allowed if the view was released.

Because the release is only forwarded out of the lock scope,
this can happen before the release has propagated.

As a solution, we check the parent release status as well in the
view. This is how it is handled in the spilled views, too.

This surfaced with the recent refactorings, because the previous
consumption model required multiple rounds of get, registerListener,
isReleased calls, which hid this problem.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/04db15a3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/04db15a3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/04db15a3

Branch: refs/heads/release-1.1
Commit: 04db15a3e2e9a29992764cba1cd99f165e0d00b2
Parents: c0d5f1d
Author: Ufuk Celebi <[email protected]>
Authored: Tue Dec 13 11:26:47 2016 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Tue Dec 13 13:59:47 2016 +0100

----------------------------------------------------------------------
 .../partition/PipelinedSubpartitionView.java    |  2 +-
 .../partition/PipelinedSubpartitionTest.java    | 28 ++++++++++++++++++++
 2 files changed, 29 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/04db15a3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
index 52c78ea..fda2135 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
@@ -70,7 +70,7 @@ class PipelinedSubpartitionView implements 
ResultSubpartitionView {
 
        @Override
        public boolean isReleased() {
-               return isReleased.get();
+               return isReleased.get() || parent.isReleased();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/04db15a3/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index a56177e..a97e306 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -19,11 +19,14 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
+import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestProducerSource;
 import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
@@ -31,19 +34,25 @@ import 
org.apache.flink.runtime.io.network.util.TestSubpartitionProducer;
 import org.junit.AfterClass;
 import org.junit.Test;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
@@ -132,6 +141,25 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                testProduceConsume(true, true);
        }
 
+       /**
+        * Verifies that the isReleased() check of the view checks the parent
+        * subpartition.
+        */
+       @Test
+       public void testIsReleasedChecksParent() throws Exception {
+               PipelinedSubpartition subpartition = 
mock(PipelinedSubpartition.class);
+
+               PipelinedSubpartitionView reader = new 
PipelinedSubpartitionView(
+                               subpartition, 
mock(BufferAvailabilityListener.class));
+
+               assertFalse(reader.isReleased());
+               verify(subpartition, times(1)).isReleased();
+
+               when(subpartition.isReleased()).thenReturn(true);
+               assertTrue(reader.isReleased());
+               verify(subpartition, times(2)).isReleased();
+       }
+
        private void testProduceConsume(boolean isSlowProducer, boolean 
isSlowConsumer) throws Exception {
                // Config
                final int producerBufferPoolSize = 8;

Reply via email to