This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new 848e00fed [CELEBORN-1579] Fix the memory leak of result partition
848e00fed is described below
commit 848e00fedbd192327d1e4d9c4e6334e6e9d65f95
Author: SteNicholas <[email protected]>
AuthorDate: Tue Aug 27 15:23:34 2024 +0800
[CELEBORN-1579] Fix the memory leak of result partition
---
.../plugin/flink/RemoteShuffleResultPartition.java | 18 +++++++++++++++++-
.../plugin/flink/RemoteShuffleResultPartition.java | 18 +++++++++++++++++-
.../plugin/flink/RemoteShuffleResultPartition.java | 20 +++++++++++++++++++-
.../plugin/flink/RemoteShuffleResultPartition.java | 20 +++++++++++++++++++-
.../plugin/flink/RemoteShuffleResultPartition.java | 20 +++++++++++++++++++-
5 files changed, 91 insertions(+), 5 deletions(-)
diff --git
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index fa8b8692a..286a7677f 100644
---
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -17,6 +17,9 @@
package org.apache.celeborn.plugin.flink;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
@@ -55,6 +58,8 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
RemoteShuffleResultPartitionDelegation delegation;
+ private final SupplierWithException<BufferPool, IOException>
bufferPoolFactory;
+
public RemoteShuffleResultPartition(
String owningTaskName,
int partitionIndex,
@@ -82,11 +87,22 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
delegation =
new RemoteShuffleResultPartitionDelegation(
networkBufferSize, outputGate, this::updateStatistics,
numSubpartitions);
+ this.bufferPoolFactory = bufferPoolFactory;
}
@Override
public void setup() throws IOException {
- super.setup();
+ // We can't call the `setup` method of the base class, otherwise it will
cause a partition leak.
+ // The reason is that this partition will be registered to the partition
manager during
+ // `super.setup()`.
+ // Since this is a cluster/remote partition(i.e. resources are not stored
on the Flink TM),
+ // Flink does not trigger the resource releasing over TM. Therefore, the
partition object is
+ // leaked.
+ // So we copy the logic of `setup` but don't register partition to
partition manager.
+ checkState(
+ this.bufferPool == null,
+ "Bug in result partition setup logic: Already registered buffer
pool.");
+ this.bufferPool = checkNotNull(bufferPoolFactory.get());
BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
delegation.setup(
bufferPool, bufferCompressor, this::canBeCompressed,
this::checkInProduceState);
diff --git
a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index 3e35b788f..e507526f5 100644
---
a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++
b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -17,6 +17,9 @@
package org.apache.celeborn.plugin.flink;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
@@ -51,6 +54,8 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
private final RemoteShuffleResultPartitionDelegation delegation;
+ private final SupplierWithException<BufferPool, IOException>
bufferPoolFactory;
+
public RemoteShuffleResultPartition(
String owningTaskName,
int partitionIndex,
@@ -78,11 +83,22 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
delegation =
new RemoteShuffleResultPartitionDelegation(
networkBufferSize, outputGate, this::updateStatistics,
numSubpartitions);
+ this.bufferPoolFactory = bufferPoolFactory;
}
@Override
public void setup() throws IOException {
- super.setup();
+ // We can't call the `setup` method of the base class, otherwise it will
cause a partition leak.
+ // The reason is that this partition will be registered to the partition
manager during
+ // `super.setup()`.
+ // Since this is a cluster/remote partition(i.e. resources are not stored
on the Flink TM),
+ // Flink does not trigger the resource releasing over TM. Therefore, the
partition object is
+ // leaked.
+ // So we copy the logic of `setup` but don't register partition to
partition manager.
+ checkState(
+ this.bufferPool == null,
+ "Bug in result partition setup logic: Already registered buffer
pool.");
+ this.bufferPool = checkNotNull(bufferPoolFactory.get());
BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
delegation.setup(
bufferPool, bufferCompressor, this::canBeCompressed,
this::checkInProduceState);
diff --git
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index 37ff33903..8aaa013c5 100644
---
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -17,6 +17,9 @@
package org.apache.celeborn.plugin.flink;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
@@ -52,6 +55,8 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
private final RemoteShuffleResultPartitionDelegation delegation;
+ private final SupplierWithException<BufferPool, IOException>
bufferPoolFactory;
+
public RemoteShuffleResultPartition(
String owningTaskName,
int partitionIndex,
@@ -79,11 +84,24 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
delegation =
new RemoteShuffleResultPartitionDelegation(
networkBufferSize, outputGate, this::updateStatistics,
numSubpartitions);
+ this.bufferPoolFactory = bufferPoolFactory;
}
@Override
public void setup() throws IOException {
- super.setup();
+ // We can't call the `setup` method of the base class, otherwise it will
cause a partition leak.
+ // The reason is that this partition will be registered to the partition
manager during
+ // `super.setup()`.
+ // Since this is a cluster/remote partition(i.e. resources are not stored
on the Flink TM),
+ // Flink does not trigger the resource releasing over TM. Therefore, the
partition object is
+ // leaked.
+ // So we copy the logic of `setup` but don't register partition to
partition manager.
+ checkState(
+ this.bufferPool == null,
+ "Bug in result partition setup logic: Already registered buffer
pool.");
+ this.bufferPool = checkNotNull(bufferPoolFactory.get());
+ // this is an empty method, but still call it in case of we implement it
in the future.
+ setupInternal();
BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
delegation.setup(
bufferPool, bufferCompressor, this::canBeCompressed,
this::checkInProduceState);
diff --git
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index 37ff33903..8aaa013c5 100644
---
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -17,6 +17,9 @@
package org.apache.celeborn.plugin.flink;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
@@ -52,6 +55,8 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
private final RemoteShuffleResultPartitionDelegation delegation;
+ private final SupplierWithException<BufferPool, IOException>
bufferPoolFactory;
+
public RemoteShuffleResultPartition(
String owningTaskName,
int partitionIndex,
@@ -79,11 +84,24 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
delegation =
new RemoteShuffleResultPartitionDelegation(
networkBufferSize, outputGate, this::updateStatistics,
numSubpartitions);
+ this.bufferPoolFactory = bufferPoolFactory;
}
@Override
public void setup() throws IOException {
- super.setup();
+ // We can't call the `setup` method of the base class, otherwise it will
cause a partition leak.
+ // The reason is that this partition will be registered to the partition
manager during
+ // `super.setup()`.
+ // Since this is a cluster/remote partition(i.e. resources are not stored
on the Flink TM),
+ // Flink does not trigger the resource releasing over TM. Therefore, the
partition object is
+ // leaked.
+ // So we copy the logic of `setup` but don't register partition to
partition manager.
+ checkState(
+ this.bufferPool == null,
+ "Bug in result partition setup logic: Already registered buffer
pool.");
+ this.bufferPool = checkNotNull(bufferPoolFactory.get());
+ // this is an empty method, but still call it in case of we implement it
in the future.
+ setupInternal();
BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
delegation.setup(
bufferPool, bufferCompressor, this::canBeCompressed,
this::checkInProduceState);
diff --git
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index 37ff33903..8aaa013c5 100644
---
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -17,6 +17,9 @@
package org.apache.celeborn.plugin.flink;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
@@ -52,6 +55,8 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
private final RemoteShuffleResultPartitionDelegation delegation;
+ private final SupplierWithException<BufferPool, IOException>
bufferPoolFactory;
+
public RemoteShuffleResultPartition(
String owningTaskName,
int partitionIndex,
@@ -79,11 +84,24 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
delegation =
new RemoteShuffleResultPartitionDelegation(
networkBufferSize, outputGate, this::updateStatistics,
numSubpartitions);
+ this.bufferPoolFactory = bufferPoolFactory;
}
@Override
public void setup() throws IOException {
- super.setup();
+ // We can't call the `setup` method of the base class, otherwise it will
cause a partition leak.
+ // The reason is that this partition will be registered to the partition
manager during
+ // `super.setup()`.
+ // Since this is a cluster/remote partition(i.e. resources are not stored
on the Flink TM),
+ // Flink does not trigger the resource releasing over TM. Therefore, the
partition object is
+ // leaked.
+ // So we copy the logic of `setup` but don't register partition to
partition manager.
+ checkState(
+ this.bufferPool == null,
+ "Bug in result partition setup logic: Already registered buffer
pool.");
+ this.bufferPool = checkNotNull(bufferPoolFactory.get());
+ // this is an empty method, but still call it in case of we implement it
in the future.
+ setupInternal();
BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
delegation.setup(
bufferPool, bufferCompressor, this::canBeCompressed,
this::checkInProduceState);