This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new a577fe5d3 [CELEBORN-1579] Fix the memory leak of result partition
a577fe5d3 is described below

commit a577fe5d377ec9fd79def86a05ecb1bbf225597d
Author: Weijie Guo <[email protected]>
AuthorDate: Tue Aug 27 15:19:22 2024 +0800

    [CELEBORN-1579] Fix the memory leak of result partition
    
    ### What changes were proposed in this pull request?
    Don't register `RemoteShuffleResultPartition` to partition manager because 
we don't store resource in TM.
    
    In theory, we could also fix this on the Flink side by no longer 
registering partitions to the partition manager for remote/cluster partitions, 
but that will only be released after Flink 2.0 at the soon.
    
    ### Why are the changes needed?
    `RemoteShuffleResultPartition` will be registered to the partition 
manager(at setup phase). Since it's a cluster partition(resources are not 
stored on the Flink TM), Flink does not trigger the resource releasing over TM.
    
    In a session cluster, the partition object is leaked. As a more serious 
consequence, the failure of the partition to release will result in the idle TM 
not being reclaimed by the Flink resource manager.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Self tested in a session cluster.
    
    Closes #2706 from reswqa/fix-leak.
    
    Authored-by: Weijie Guo <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../plugin/flink/RemoteShuffleResultPartition.java   | 18 +++++++++++++++++-
 .../plugin/flink/RemoteShuffleResultPartition.java   | 18 +++++++++++++++++-
 .../plugin/flink/RemoteShuffleResultPartition.java   | 20 +++++++++++++++++++-
 .../plugin/flink/RemoteShuffleResultPartition.java   | 20 +++++++++++++++++++-
 .../plugin/flink/RemoteShuffleResultPartition.java   | 20 +++++++++++++++++++-
 .../plugin/flink/RemoteShuffleResultPartition.java   | 20 +++++++++++++++++++-
 .../plugin/flink/RemoteShuffleResultPartition.java   | 20 +++++++++++++++++++-
 7 files changed, 129 insertions(+), 7 deletions(-)

diff --git 
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index fa8b8692a..286a7677f 100644
--- 
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++ 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -17,6 +17,9 @@
 
 package org.apache.celeborn.plugin.flink;
 
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
@@ -55,6 +58,8 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
 
   RemoteShuffleResultPartitionDelegation delegation;
 
+  private final SupplierWithException<BufferPool, IOException> 
bufferPoolFactory;
+
   public RemoteShuffleResultPartition(
       String owningTaskName,
       int partitionIndex,
@@ -82,11 +87,22 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
     delegation =
         new RemoteShuffleResultPartitionDelegation(
             networkBufferSize, outputGate, this::updateStatistics, 
numSubpartitions);
+    this.bufferPoolFactory = bufferPoolFactory;
   }
 
   @Override
   public void setup() throws IOException {
-    super.setup();
+    // We can't call the `setup` method of the base class, otherwise it will 
cause a partition leak.
+    // The reason is that this partition will be registered to the partition 
manager during
+    // `super.setup()`.
+    // Since this is a cluster/remote partition(i.e. resources are not stored 
on the Flink TM),
+    // Flink does not trigger the resource releasing over TM. Therefore, the 
partition object is
+    // leaked.
+    // So we copy the logic of `setup` but don't register partition to 
partition manager.
+    checkState(
+        this.bufferPool == null,
+        "Bug in result partition setup logic: Already registered buffer 
pool.");
+    this.bufferPool = checkNotNull(bufferPoolFactory.get());
     BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
     delegation.setup(
         bufferPool, bufferCompressor, this::canBeCompressed, 
this::checkInProduceState);
diff --git 
a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
 
b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index 3e35b788f..e507526f5 100644
--- 
a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++ 
b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -17,6 +17,9 @@
 
 package org.apache.celeborn.plugin.flink;
 
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
@@ -51,6 +54,8 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
 
   private final RemoteShuffleResultPartitionDelegation delegation;
 
+  private final SupplierWithException<BufferPool, IOException> 
bufferPoolFactory;
+
   public RemoteShuffleResultPartition(
       String owningTaskName,
       int partitionIndex,
@@ -78,11 +83,22 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
     delegation =
         new RemoteShuffleResultPartitionDelegation(
             networkBufferSize, outputGate, this::updateStatistics, 
numSubpartitions);
+    this.bufferPoolFactory = bufferPoolFactory;
   }
 
   @Override
   public void setup() throws IOException {
-    super.setup();
+    // We can't call the `setup` method of the base class, otherwise it will 
cause a partition leak.
+    // The reason is that this partition will be registered to the partition 
manager during
+    // `super.setup()`.
+    // Since this is a cluster/remote partition(i.e. resources are not stored 
on the Flink TM),
+    // Flink does not trigger the resource releasing over TM. Therefore, the 
partition object is
+    // leaked.
+    // So we copy the logic of `setup` but don't register partition to 
partition manager.
+    checkState(
+        this.bufferPool == null,
+        "Bug in result partition setup logic: Already registered buffer 
pool.");
+    this.bufferPool = checkNotNull(bufferPoolFactory.get());
     BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
     delegation.setup(
         bufferPool, bufferCompressor, this::canBeCompressed, 
this::checkInProduceState);
diff --git 
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
 
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index eb46a290d..5aa057ae1 100644
--- 
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++ 
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -17,6 +17,9 @@
 
 package org.apache.celeborn.plugin.flink;
 
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
@@ -52,6 +55,8 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
 
   private final RemoteShuffleResultPartitionDelegation delegation;
 
+  private final SupplierWithException<BufferPool, IOException> 
bufferPoolFactory;
+
   public RemoteShuffleResultPartition(
       String owningTaskName,
       int partitionIndex,
@@ -82,11 +87,24 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
             outputGate,
             (bufferWithChannel, isBroadcast) -> 
updateStatistics(bufferWithChannel, isBroadcast),
             numSubpartitions);
+    this.bufferPoolFactory = bufferPoolFactory;
   }
 
   @Override
   public void setup() throws IOException {
-    super.setup();
+    // We can't call the `setup` method of the base class, otherwise it will 
cause a partition leak.
+    // The reason is that this partition will be registered to the partition 
manager during
+    // `super.setup()`.
+    // Since this is a cluster/remote partition(i.e. resources are not stored 
on the Flink TM),
+    // Flink does not trigger the resource releasing over TM. Therefore, the 
partition object is
+    // leaked.
+    // So we copy the logic of `setup` but don't register partition to 
partition manager.
+    checkState(
+        this.bufferPool == null,
+        "Bug in result partition setup logic: Already registered buffer 
pool.");
+    this.bufferPool = checkNotNull(bufferPoolFactory.get());
+    // this is an empty method, but still call it in case of we implement it 
in the future.
+    setupInternal();
     BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
     delegation.setup(
         bufferPool,
diff --git 
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
 
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index 37ff33903..8aaa013c5 100644
--- 
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++ 
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -17,6 +17,9 @@
 
 package org.apache.celeborn.plugin.flink;
 
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
@@ -52,6 +55,8 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
 
   private final RemoteShuffleResultPartitionDelegation delegation;
 
+  private final SupplierWithException<BufferPool, IOException> 
bufferPoolFactory;
+
   public RemoteShuffleResultPartition(
       String owningTaskName,
       int partitionIndex,
@@ -79,11 +84,24 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
     delegation =
         new RemoteShuffleResultPartitionDelegation(
             networkBufferSize, outputGate, this::updateStatistics, 
numSubpartitions);
+    this.bufferPoolFactory = bufferPoolFactory;
   }
 
   @Override
   public void setup() throws IOException {
-    super.setup();
+    // We can't call the `setup` method of the base class, otherwise it will 
cause a partition leak.
+    // The reason is that this partition will be registered to the partition 
manager during
+    // `super.setup()`.
+    // Since this is a cluster/remote partition(i.e. resources are not stored 
on the Flink TM),
+    // Flink does not trigger the resource releasing over TM. Therefore, the 
partition object is
+    // leaked.
+    // So we copy the logic of `setup` but don't register partition to 
partition manager.
+    checkState(
+        this.bufferPool == null,
+        "Bug in result partition setup logic: Already registered buffer 
pool.");
+    this.bufferPool = checkNotNull(bufferPoolFactory.get());
+    // this is an empty method, but still call it in case of we implement it 
in the future.
+    setupInternal();
     BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
     delegation.setup(
         bufferPool, bufferCompressor, this::canBeCompressed, 
this::checkInProduceState);
diff --git 
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index 37ff33903..8aaa013c5 100644
--- 
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++ 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -17,6 +17,9 @@
 
 package org.apache.celeborn.plugin.flink;
 
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
@@ -52,6 +55,8 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
 
   private final RemoteShuffleResultPartitionDelegation delegation;
 
+  private final SupplierWithException<BufferPool, IOException> 
bufferPoolFactory;
+
   public RemoteShuffleResultPartition(
       String owningTaskName,
       int partitionIndex,
@@ -79,11 +84,24 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
     delegation =
         new RemoteShuffleResultPartitionDelegation(
             networkBufferSize, outputGate, this::updateStatistics, 
numSubpartitions);
+    this.bufferPoolFactory = bufferPoolFactory;
   }
 
   @Override
   public void setup() throws IOException {
-    super.setup();
+    // We can't call the `setup` method of the base class, otherwise it will 
cause a partition leak.
+    // The reason is that this partition will be registered to the partition 
manager during
+    // `super.setup()`.
+    // Since this is a cluster/remote partition(i.e. resources are not stored 
on the Flink TM),
+    // Flink does not trigger the resource releasing over TM. Therefore, the 
partition object is
+    // leaked.
+    // So we copy the logic of `setup` but don't register partition to 
partition manager.
+    checkState(
+        this.bufferPool == null,
+        "Bug in result partition setup logic: Already registered buffer 
pool.");
+    this.bufferPool = checkNotNull(bufferPoolFactory.get());
+    // this is an empty method, but still call it in case of we implement it 
in the future.
+    setupInternal();
     BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
     delegation.setup(
         bufferPool, bufferCompressor, this::canBeCompressed, 
this::checkInProduceState);
diff --git 
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
 
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index 37ff33903..8aaa013c5 100644
--- 
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++ 
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -17,6 +17,9 @@
 
 package org.apache.celeborn.plugin.flink;
 
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
@@ -52,6 +55,8 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
 
   private final RemoteShuffleResultPartitionDelegation delegation;
 
+  private final SupplierWithException<BufferPool, IOException> 
bufferPoolFactory;
+
   public RemoteShuffleResultPartition(
       String owningTaskName,
       int partitionIndex,
@@ -79,11 +84,24 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
     delegation =
         new RemoteShuffleResultPartitionDelegation(
             networkBufferSize, outputGate, this::updateStatistics, 
numSubpartitions);
+    this.bufferPoolFactory = bufferPoolFactory;
   }
 
   @Override
   public void setup() throws IOException {
-    super.setup();
+    // We can't call the `setup` method of the base class, otherwise it will 
cause a partition leak.
+    // The reason is that this partition will be registered to the partition 
manager during
+    // `super.setup()`.
+    // Since this is a cluster/remote partition(i.e. resources are not stored 
on the Flink TM),
+    // Flink does not trigger the resource releasing over TM. Therefore, the 
partition object is
+    // leaked.
+    // So we copy the logic of `setup` but don't register partition to 
partition manager.
+    checkState(
+        this.bufferPool == null,
+        "Bug in result partition setup logic: Already registered buffer 
pool.");
+    this.bufferPool = checkNotNull(bufferPoolFactory.get());
+    // this is an empty method, but still call it in case of we implement it 
in the future.
+    setupInternal();
     BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
     delegation.setup(
         bufferPool, bufferCompressor, this::canBeCompressed, 
this::checkInProduceState);
diff --git 
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
 
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index 37ff33903..8aaa013c5 100644
--- 
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++ 
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -17,6 +17,9 @@
 
 package org.apache.celeborn.plugin.flink;
 
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
@@ -52,6 +55,8 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
 
   private final RemoteShuffleResultPartitionDelegation delegation;
 
+  private final SupplierWithException<BufferPool, IOException> 
bufferPoolFactory;
+
   public RemoteShuffleResultPartition(
       String owningTaskName,
       int partitionIndex,
@@ -79,11 +84,24 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
     delegation =
         new RemoteShuffleResultPartitionDelegation(
             networkBufferSize, outputGate, this::updateStatistics, 
numSubpartitions);
+    this.bufferPoolFactory = bufferPoolFactory;
   }
 
   @Override
   public void setup() throws IOException {
-    super.setup();
+    // We can't call the `setup` method of the base class, otherwise it will 
cause a partition leak.
+    // The reason is that this partition will be registered to the partition 
manager during
+    // `super.setup()`.
+    // Since this is a cluster/remote partition(i.e. resources are not stored 
on the Flink TM),
+    // Flink does not trigger the resource releasing over TM. Therefore, the 
partition object is
+    // leaked.
+    // So we copy the logic of `setup` but don't register partition to 
partition manager.
+    checkState(
+        this.bufferPool == null,
+        "Bug in result partition setup logic: Already registered buffer 
pool.");
+    this.bufferPool = checkNotNull(bufferPoolFactory.get());
+    // this is an empty method, but still call it in case of we implement it 
in the future.
+    setupInternal();
     BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
     delegation.setup(
         bufferPool, bufferCompressor, this::canBeCompressed, 
this::checkInProduceState);

Reply via email to