This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new 848e00fed [CELEBORN-1579] Fix the memory leak of result partition
848e00fed is described below

commit 848e00fedbd192327d1e4d9c4e6334e6e9d65f95
Author: SteNicholas <[email protected]>
AuthorDate: Tue Aug 27 15:23:34 2024 +0800

    [CELEBORN-1579] Fix the memory leak of result partition
---
 .../plugin/flink/RemoteShuffleResultPartition.java   | 18 +++++++++++++++++-
 .../plugin/flink/RemoteShuffleResultPartition.java   | 18 +++++++++++++++++-
 .../plugin/flink/RemoteShuffleResultPartition.java   | 20 +++++++++++++++++++-
 .../plugin/flink/RemoteShuffleResultPartition.java   | 20 +++++++++++++++++++-
 .../plugin/flink/RemoteShuffleResultPartition.java   | 20 +++++++++++++++++++-
 5 files changed, 91 insertions(+), 5 deletions(-)

diff --git 
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index fa8b8692a..286a7677f 100644
--- 
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++ 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -17,6 +17,9 @@
 
 package org.apache.celeborn.plugin.flink;
 
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
@@ -55,6 +58,8 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
 
   RemoteShuffleResultPartitionDelegation delegation;
 
+  private final SupplierWithException<BufferPool, IOException> 
bufferPoolFactory;
+
   public RemoteShuffleResultPartition(
       String owningTaskName,
       int partitionIndex,
@@ -82,11 +87,22 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
     delegation =
         new RemoteShuffleResultPartitionDelegation(
             networkBufferSize, outputGate, this::updateStatistics, 
numSubpartitions);
+    this.bufferPoolFactory = bufferPoolFactory;
   }
 
   @Override
   public void setup() throws IOException {
-    super.setup();
+    // We can't call the `setup` method of the base class, otherwise it will 
cause a partition leak.
+    // The reason is that this partition will be registered to the partition 
manager during
+    // `super.setup()`.
+    // Since this is a cluster/remote partition(i.e. resources are not stored 
on the Flink TM),
+    // Flink does not trigger the resource releasing over TM. Therefore, the 
partition object is
+    // leaked.
+    // So we copy the logic of `setup` but don't register partition to 
partition manager.
+    checkState(
+        this.bufferPool == null,
+        "Bug in result partition setup logic: Already registered buffer 
pool.");
+    this.bufferPool = checkNotNull(bufferPoolFactory.get());
     BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
     delegation.setup(
         bufferPool, bufferCompressor, this::canBeCompressed, 
this::checkInProduceState);
diff --git 
a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
 
b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index 3e35b788f..e507526f5 100644
--- 
a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++ 
b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -17,6 +17,9 @@
 
 package org.apache.celeborn.plugin.flink;
 
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
@@ -51,6 +54,8 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
 
   private final RemoteShuffleResultPartitionDelegation delegation;
 
+  private final SupplierWithException<BufferPool, IOException> 
bufferPoolFactory;
+
   public RemoteShuffleResultPartition(
       String owningTaskName,
       int partitionIndex,
@@ -78,11 +83,22 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
     delegation =
         new RemoteShuffleResultPartitionDelegation(
             networkBufferSize, outputGate, this::updateStatistics, 
numSubpartitions);
+    this.bufferPoolFactory = bufferPoolFactory;
   }
 
   @Override
   public void setup() throws IOException {
-    super.setup();
+    // We can't call the `setup` method of the base class, otherwise it will 
cause a partition leak.
+    // The reason is that this partition will be registered to the partition 
manager during
+    // `super.setup()`.
+    // Since this is a cluster/remote partition(i.e. resources are not stored 
on the Flink TM),
+    // Flink does not trigger the resource releasing over TM. Therefore, the 
partition object is
+    // leaked.
+    // So we copy the logic of `setup` but don't register partition to 
partition manager.
+    checkState(
+        this.bufferPool == null,
+        "Bug in result partition setup logic: Already registered buffer 
pool.");
+    this.bufferPool = checkNotNull(bufferPoolFactory.get());
     BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
     delegation.setup(
         bufferPool, bufferCompressor, this::canBeCompressed, 
this::checkInProduceState);
diff --git 
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
 
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index 37ff33903..8aaa013c5 100644
--- 
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++ 
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -17,6 +17,9 @@
 
 package org.apache.celeborn.plugin.flink;
 
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
@@ -52,6 +55,8 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
 
   private final RemoteShuffleResultPartitionDelegation delegation;
 
+  private final SupplierWithException<BufferPool, IOException> 
bufferPoolFactory;
+
   public RemoteShuffleResultPartition(
       String owningTaskName,
       int partitionIndex,
@@ -79,11 +84,24 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
     delegation =
         new RemoteShuffleResultPartitionDelegation(
             networkBufferSize, outputGate, this::updateStatistics, 
numSubpartitions);
+    this.bufferPoolFactory = bufferPoolFactory;
   }
 
   @Override
   public void setup() throws IOException {
-    super.setup();
+    // We can't call the `setup` method of the base class, otherwise it will 
cause a partition leak.
+    // The reason is that this partition will be registered to the partition 
manager during
+    // `super.setup()`.
+    // Since this is a cluster/remote partition(i.e. resources are not stored 
on the Flink TM),
+    // Flink does not trigger the resource releasing over TM. Therefore, the 
partition object is
+    // leaked.
+    // So we copy the logic of `setup` but don't register partition to 
partition manager.
+    checkState(
+        this.bufferPool == null,
+        "Bug in result partition setup logic: Already registered buffer 
pool.");
+    this.bufferPool = checkNotNull(bufferPoolFactory.get());
+    // this is an empty method, but still call it in case of we implement it 
in the future.
+    setupInternal();
     BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
     delegation.setup(
         bufferPool, bufferCompressor, this::canBeCompressed, 
this::checkInProduceState);
diff --git 
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index 37ff33903..8aaa013c5 100644
--- 
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++ 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -17,6 +17,9 @@
 
 package org.apache.celeborn.plugin.flink;
 
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
@@ -52,6 +55,8 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
 
   private final RemoteShuffleResultPartitionDelegation delegation;
 
+  private final SupplierWithException<BufferPool, IOException> 
bufferPoolFactory;
+
   public RemoteShuffleResultPartition(
       String owningTaskName,
       int partitionIndex,
@@ -79,11 +84,24 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
     delegation =
         new RemoteShuffleResultPartitionDelegation(
             networkBufferSize, outputGate, this::updateStatistics, 
numSubpartitions);
+    this.bufferPoolFactory = bufferPoolFactory;
   }
 
   @Override
   public void setup() throws IOException {
-    super.setup();
+    // We can't call the `setup` method of the base class, otherwise it will 
cause a partition leak.
+    // The reason is that this partition will be registered to the partition 
manager during
+    // `super.setup()`.
+    // Since this is a cluster/remote partition(i.e. resources are not stored 
on the Flink TM),
+    // Flink does not trigger the resource releasing over TM. Therefore, the 
partition object is
+    // leaked.
+    // So we copy the logic of `setup` but don't register partition to 
partition manager.
+    checkState(
+        this.bufferPool == null,
+        "Bug in result partition setup logic: Already registered buffer 
pool.");
+    this.bufferPool = checkNotNull(bufferPoolFactory.get());
+    // this is an empty method, but still call it in case of we implement it 
in the future.
+    setupInternal();
     BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
     delegation.setup(
         bufferPool, bufferCompressor, this::canBeCompressed, 
this::checkInProduceState);
diff --git 
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
 
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index 37ff33903..8aaa013c5 100644
--- 
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++ 
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -17,6 +17,9 @@
 
 package org.apache.celeborn.plugin.flink;
 
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
@@ -52,6 +55,8 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
 
   private final RemoteShuffleResultPartitionDelegation delegation;
 
+  private final SupplierWithException<BufferPool, IOException> 
bufferPoolFactory;
+
   public RemoteShuffleResultPartition(
       String owningTaskName,
       int partitionIndex,
@@ -79,11 +84,24 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
     delegation =
         new RemoteShuffleResultPartitionDelegation(
             networkBufferSize, outputGate, this::updateStatistics, 
numSubpartitions);
+    this.bufferPoolFactory = bufferPoolFactory;
   }
 
   @Override
   public void setup() throws IOException {
-    super.setup();
+    // We can't call the `setup` method of the base class, otherwise it will 
cause a partition leak.
+    // The reason is that this partition will be registered to the partition 
manager during
+    // `super.setup()`.
+    // Since this is a cluster/remote partition(i.e. resources are not stored 
on the Flink TM),
+    // Flink does not trigger the resource releasing over TM. Therefore, the 
partition object is
+    // leaked.
+    // So we copy the logic of `setup` but don't register partition to 
partition manager.
+    checkState(
+        this.bufferPool == null,
+        "Bug in result partition setup logic: Already registered buffer 
pool.");
+    this.bufferPool = checkNotNull(bufferPoolFactory.get());
+    // this is an empty method, but still call it in case of we implement it 
in the future.
+    setupInternal();
     BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
     delegation.setup(
         bufferPool, bufferCompressor, this::canBeCompressed, 
this::checkInProduceState);

Reply via email to