This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 0c1af6c  [FLINK-22424][network] Prevent releasing 
PipelinedSubpartition while Task can still write to it
0c1af6c is described below

commit 0c1af6c1dfdc97275fa41abfe41125f64405f7f9
Author: Piotr Nowojski <[email protected]>
AuthorDate: Fri Apr 23 14:45:44 2021 +0200

    [FLINK-22424][network] Prevent releasing PipelinedSubpartition while Task 
can still write to it
    
    This bug was happening when a downstream tasks were failing over or being 
cancelled. If all
    of the downstream tasks released subpartition views, underlying memory 
segments/buffers could
    have been recycled, while upstream task was still writting some records.
    
    The problem is fixed by adding the writer (result partition) itself as one 
more reference
    counted user of the result partition
---
 .../ReleaseOnConsumptionResultPartition.java       |  33 ++++--
 .../io/network/buffer/UnpooledBufferPool.java      | 118 +++++++++++++++++++++
 .../network/netty/PartitionRequestQueueTest.java   |   1 +
 .../ReleaseOnConsumptionResultPartitionTest.java   |  25 ++++-
 4 files changed, 166 insertions(+), 11 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java
index 95b73bc..f2c8ff2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java
@@ -30,6 +30,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 
 /** ResultPartition that releases itself once all subpartitions have been 
consumed. */
 public class ReleaseOnConsumptionResultPartition extends ResultPartition {
+    private static final int PIPELINED_RESULT_PARTITION_ITSELF = -42;
 
     private static final Object lock = new Object();
 
@@ -40,7 +41,7 @@ public class ReleaseOnConsumptionResultPartition extends 
ResultPartition {
      * The total number of references to subpartitions of this result. The 
result partition can be
      * safely released, iff the reference count is zero.
      */
-    private int numUnconsumedSubpartitions;
+    private int numberOfUsers;
 
     ReleaseOnConsumptionResultPartition(
             String owningTaskName,
@@ -64,19 +65,23 @@ public class ReleaseOnConsumptionResultPartition extends 
ResultPartition {
                 bufferPoolFactory);
 
         this.consumedSubpartitions = new boolean[subpartitions.length];
-        this.numUnconsumedSubpartitions = subpartitions.length;
+        this.numberOfUsers = subpartitions.length + 1;
     }
 
     @Override
     public ResultSubpartitionView createSubpartitionView(
             int index, BufferAvailabilityListener availabilityListener) throws 
IOException {
-        checkState(numUnconsumedSubpartitions > 0, "Partition not pinned.");
+        checkState(numberOfUsers > 0, "Partition not pinned.");
 
         return super.createSubpartitionView(index, availabilityListener);
     }
 
     @Override
     void onConsumedSubpartition(int subpartitionIndex) {
+        decrementNumberOfUsers(subpartitionIndex);
+    }
+
+    private void decrementNumberOfUsers(int subpartitionIndex) {
         if (isReleased()) {
             return;
         }
@@ -86,13 +91,15 @@ public class ReleaseOnConsumptionResultPartition extends 
ResultPartition {
         // we synchronize only the bookkeeping section, to avoid holding the 
lock during any
         // calls into other components
         synchronized (lock) {
-            if (consumedSubpartitions[subpartitionIndex]) {
-                // repeated call - ignore
-                return;
-            }
+            if (subpartitionIndex != PIPELINED_RESULT_PARTITION_ITSELF) {
+                if (consumedSubpartitions[subpartitionIndex]) {
+                    // repeated call - ignore
+                    return;
+                }
 
-            consumedSubpartitions[subpartitionIndex] = true;
-            remainingUnconsumed = (--numUnconsumedSubpartitions);
+                consumedSubpartitions[subpartitionIndex] = true;
+            }
+            remainingUnconsumed = (--numberOfUsers);
         }
 
         LOG.debug(
@@ -115,7 +122,13 @@ public class ReleaseOnConsumptionResultPartition extends 
ResultPartition {
                 + ", "
                 + subpartitions.length
                 + " subpartitions, "
-                + numUnconsumedSubpartitions
+                + numberOfUsers
                 + " pending consumptions]";
     }
+
+    @Override
+    public void close() {
+        decrementNumberOfUsers(PIPELINED_RESULT_PARTITION_ITSELF);
+        super.close();
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/UnpooledBufferPool.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/UnpooledBufferPool.java
new file mode 100644
index 0000000..68ac9c1
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/UnpooledBufferPool.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import java.util.concurrent.CompletableFuture;
+
+/** Implementation of {@link BufferPool} that works on un-pooled memory 
segments. */
+public class UnpooledBufferPool implements BufferPool {
+
+    private static final int SEGMENT_SIZE = 1024;
+
+    @Override
+    public void lazyDestroy() {}
+
+    @Override
+    public Buffer requestBuffer() {
+        return new NetworkBuffer(requestMemorySegment(), this);
+    }
+
+    @Override
+    public BufferBuilder requestBufferBuilder() {
+        return new BufferBuilder(requestMemorySegment(), this);
+    }
+
+    private MemorySegment requestMemorySegment() {
+        return 
MemorySegmentFactory.allocateUnpooledOffHeapMemory(SEGMENT_SIZE, null);
+    }
+
+    @Override
+    public BufferBuilder requestBufferBuilderBlocking() throws 
InterruptedException {
+        return requestBufferBuilder();
+    }
+
+    @Override
+    public BufferBuilder requestBufferBuilder(int targetChannel) {
+        return requestBufferBuilder();
+    }
+
+    @Override
+    public BufferBuilder requestBufferBuilderBlocking(int targetChannel)
+            throws InterruptedException {
+        return requestBufferBuilder();
+    }
+
+    @Override
+    public boolean addBufferListener(BufferListener listener) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isDestroyed() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int getNumberOfRequiredMemorySegments() {
+        return Integer.MAX_VALUE;
+    }
+
+    @Override
+    public int getMaxNumberOfMemorySegments() {
+        return -1;
+    }
+
+    @Override
+    public int getNumBuffers() {
+        return Integer.MAX_VALUE;
+    }
+
+    @Override
+    public void setNumBuffers(int numBuffers) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int getNumberOfAvailableMemorySegments() {
+        return Integer.MAX_VALUE;
+    }
+
+    @Override
+    public int bestEffortGetNumOfUsedBuffers() {
+        return 0;
+    }
+
+    @Override
+    public BufferRecycler[] getSubpartitionBufferRecyclers() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void recycle(MemorySegment memorySegment) {
+        memorySegment.free();
+    }
+
+    @Override
+    public CompletableFuture<?> getAvailableFuture() {
+        return AVAILABLE;
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
index e7ef08c..ad117cb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
@@ -484,6 +484,7 @@ public class PartitionRequestQueueTest {
         assertFalse(queue.getAvailableReaders().contains(reader));
         // the partition and its reader view should all be released
         assertTrue(reader.isReleased());
+        partition.close();
         assertTrue(partition.isReleased());
         for (ResultSubpartition subpartition : partition.getAllPartitions()) {
             assertTrue(subpartition.isReleased());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartitionTest.java
index 57976bd..cf49ae5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartitionTest.java
@@ -17,10 +17,14 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.runtime.io.network.buffer.UnpooledBufferPool;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -43,6 +47,25 @@ public class ReleaseOnConsumptionResultPartitionTest extends 
TestLogger {
         assertFalse(partition.isReleased());
 
         partition.onConsumedSubpartition(1);
+        partition.close();
+        assertTrue(partition.isReleased());
+    }
+
+    @Test
+    public void testConsumptionBeforePartitionClose() throws IOException, 
InterruptedException {
+        final ResultPartition partition =
+                new ResultPartitionBuilder()
+                        .setResultPartitionType(ResultPartitionType.PIPELINED)
+                        .setNumberOfSubpartitions(1)
+                        .setBufferPoolFactory(ignored -> new 
UnpooledBufferPool())
+                        .build();
+
+        partition.setup();
+        partition.getBufferBuilder(0).append(ByteBuffer.allocate(16));
+        partition.onConsumedSubpartition(0);
+        assertFalse(partition.isReleased());
+        partition.getBufferBuilder(0).append(ByteBuffer.allocate(16));
+        partition.close();
         assertTrue(partition.isReleased());
     }
 
@@ -77,7 +100,7 @@ public class ReleaseOnConsumptionResultPartitionTest extends 
TestLogger {
         partition.onConsumedSubpartition(0);
         partition.onConsumedSubpartition(0);
         partition.onConsumedSubpartition(1);
-
+        partition.close();
         assertTrue(partition.isReleased());
     }
 }

Reply via email to