Repository: flink
Updated Branches:
  refs/heads/master 38ab7164a -> 369837971


[FLINK-5326] [network] Check release flag of parent in reader

In PipelinedSubpartitionView, there is a possible race with
releasing the parent subpartition and querying for a buffer
in the view.

The parent partition release clears all buffers in locked
scope and releases the view outside of the lock. If concurrently
the view is queried for a buffer it might get null, which
is only allowed if the view was released.

Because the release is only forwarded out of the lock scope,
this can happen before the release has propagated.

As a solution, we check the parent release status as well in the
view. This is how it is handled in the spilled views, too.

This surfaced with the recent refactorings, because the previous
consumption model required multiple rounds of get, registerListener,
isReleased calls, which hid this problem.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d965d5ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d965d5ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d965d5ab

Branch: refs/heads/master
Commit: d965d5abdc389e9b65fd35a69bb16bfb71008504
Parents: 38ab716
Author: Ufuk Celebi <[email protected]>
Authored: Tue Dec 13 11:26:47 2016 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Tue Dec 13 11:26:48 2016 +0100

----------------------------------------------------------------------
 .../partition/PipelinedSubpartitionView.java    |  2 +-
 .../partition/PipelinedSubpartitionTest.java    | 28 ++++++++++++++++++++
 2 files changed, 29 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d965d5ab/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
index 52c78ea..fda2135 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
@@ -70,7 +70,7 @@ class PipelinedSubpartitionView implements 
ResultSubpartitionView {
 
        @Override
        public boolean isReleased() {
-               return isReleased.get();
+               return isReleased.get() || parent.isReleased();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d965d5ab/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index a56177e..a97e306 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -19,11 +19,14 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
+import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestProducerSource;
 import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
@@ -31,19 +34,25 @@ import 
org.apache.flink.runtime.io.network.util.TestSubpartitionProducer;
 import org.junit.AfterClass;
 import org.junit.Test;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
@@ -132,6 +141,25 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                testProduceConsume(true, true);
        }
 
+       /**
+        * Verifies that the isReleased() check of the view checks the parent
+        * subpartition.
+        */
+       @Test
+       public void testIsReleasedChecksParent() throws Exception {
+               PipelinedSubpartition subpartition = 
mock(PipelinedSubpartition.class);
+
+               PipelinedSubpartitionView reader = new 
PipelinedSubpartitionView(
+                               subpartition, 
mock(BufferAvailabilityListener.class));
+
+               assertFalse(reader.isReleased());
+               verify(subpartition, times(1)).isReleased();
+
+               when(subpartition.isReleased()).thenReturn(true);
+               assertTrue(reader.isReleased());
+               verify(subpartition, times(2)).isReleased();
+       }
+
        private void testProduceConsume(boolean isSlowProducer, boolean 
isSlowConsumer) throws Exception {
                // Config
                final int producerBufferPoolSize = 8;

Reply via email to