This is an automated email from the ASF dual-hosted git repository. nicholasjiang pushed a commit to branch branch-0.5 in repository https://gitbox.apache.org/repos/asf/celeborn.git
commit d78fad697d383658f99174dc20b2690807c00d35 Author: Weijie Guo <[email protected]> AuthorDate: Tue Aug 27 15:19:22 2024 +0800 [CELEBORN-1579] Fix the memory leak of result partition --- .../plugin/flink/RemoteShuffleResultPartition.java | 18 +++++++++++++++++- .../plugin/flink/RemoteShuffleResultPartition.java | 18 +++++++++++++++++- .../plugin/flink/RemoteShuffleResultPartition.java | 20 +++++++++++++++++++- .../plugin/flink/RemoteShuffleResultPartition.java | 20 +++++++++++++++++++- .../plugin/flink/RemoteShuffleResultPartition.java | 20 +++++++++++++++++++- 5 files changed, 91 insertions(+), 5 deletions(-) diff --git a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java index fa8b8692a..286a7677f 100644 --- a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java +++ b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java @@ -17,6 +17,9 @@ package org.apache.celeborn.plugin.flink; +import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull; +import static org.apache.celeborn.plugin.flink.utils.Utils.checkState; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; @@ -55,6 +58,8 @@ public class RemoteShuffleResultPartition extends ResultPartition { RemoteShuffleResultPartitionDelegation delegation; + private final SupplierWithException<BufferPool, IOException> bufferPoolFactory; + public RemoteShuffleResultPartition( String owningTaskName, int partitionIndex, @@ -82,11 +87,22 @@ public class RemoteShuffleResultPartition extends ResultPartition { delegation = new RemoteShuffleResultPartitionDelegation( networkBufferSize, outputGate, this::updateStatistics, numSubpartitions); + this.bufferPoolFactory = bufferPoolFactory; } @Override public void setup() throws IOException { - super.setup(); + // We can't call the `setup` method of the base class, otherwise it will cause a partition leak. + // The reason is that this partition will be registered to the partition manager during + // `super.setup()`. + // Since this is a cluster/remote partition(i.e. resources are not stored on the Flink TM), + // Flink does not trigger the resource releasing over TM. Therefore, the partition object is + // leaked. + // So we copy the logic of `setup` but don't register partition to partition manager. + checkState( + this.bufferPool == null, + "Bug in result partition setup logic: Already registered buffer pool."); + this.bufferPool = checkNotNull(bufferPoolFactory.get()); BufferUtils.reserveNumRequiredBuffers(bufferPool, 1); delegation.setup( bufferPool, bufferCompressor, this::canBeCompressed, this::checkInProduceState); diff --git a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java index 3e35b788f..e507526f5 100644 --- a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java +++ b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java @@ -17,6 +17,9 @@ package org.apache.celeborn.plugin.flink; +import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull; +import static org.apache.celeborn.plugin.flink.utils.Utils.checkState; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; @@ -51,6 +54,8 @@ public class RemoteShuffleResultPartition extends ResultPartition { private final RemoteShuffleResultPartitionDelegation delegation; + private final SupplierWithException<BufferPool, IOException> bufferPoolFactory; + public RemoteShuffleResultPartition( String owningTaskName, int partitionIndex, @@ -78,11 +83,22 @@ public class RemoteShuffleResultPartition extends ResultPartition { delegation = new RemoteShuffleResultPartitionDelegation( networkBufferSize, outputGate, this::updateStatistics, numSubpartitions); + this.bufferPoolFactory = bufferPoolFactory; } @Override public void setup() throws IOException { - super.setup(); + // We can't call the `setup` method of the base class, otherwise it will cause a partition leak. + // The reason is that this partition will be registered to the partition manager during + // `super.setup()`. + // Since this is a cluster/remote partition(i.e. resources are not stored on the Flink TM), + // Flink does not trigger the resource releasing over TM. Therefore, the partition object is + // leaked. + // So we copy the logic of `setup` but don't register partition to partition manager. + checkState( + this.bufferPool == null, + "Bug in result partition setup logic: Already registered buffer pool."); + this.bufferPool = checkNotNull(bufferPoolFactory.get()); BufferUtils.reserveNumRequiredBuffers(bufferPool, 1); delegation.setup( bufferPool, bufferCompressor, this::canBeCompressed, this::checkInProduceState); diff --git a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java index 37ff33903..8aaa013c5 100644 --- a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java +++ b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java @@ -17,6 +17,9 @@ package org.apache.celeborn.plugin.flink; +import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull; +import static org.apache.celeborn.plugin.flink.utils.Utils.checkState; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; @@ -52,6 +55,8 @@ public class RemoteShuffleResultPartition extends ResultPartition { private final RemoteShuffleResultPartitionDelegation delegation; + private final SupplierWithException<BufferPool, IOException> bufferPoolFactory; + public RemoteShuffleResultPartition( String owningTaskName, int partitionIndex, @@ -79,11 +84,24 @@ public class RemoteShuffleResultPartition extends ResultPartition { delegation = new RemoteShuffleResultPartitionDelegation( networkBufferSize, outputGate, this::updateStatistics, numSubpartitions); + this.bufferPoolFactory = bufferPoolFactory; } @Override public void setup() throws IOException { - super.setup(); + // We can't call the `setup` method of the base class, otherwise it will cause a partition leak. + // The reason is that this partition will be registered to the partition manager during + // `super.setup()`. + // Since this is a cluster/remote partition(i.e. resources are not stored on the Flink TM), + // Flink does not trigger the resource releasing over TM. Therefore, the partition object is + // leaked. + // So we copy the logic of `setup` but don't register partition to partition manager. + checkState( + this.bufferPool == null, + "Bug in result partition setup logic: Already registered buffer pool."); + this.bufferPool = checkNotNull(bufferPoolFactory.get()); + // this is an empty method, but still call it in case of we implement it in the future. + setupInternal(); BufferUtils.reserveNumRequiredBuffers(bufferPool, 1); delegation.setup( bufferPool, bufferCompressor, this::canBeCompressed, this::checkInProduceState); diff --git a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java index 37ff33903..8aaa013c5 100644 --- a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java +++ b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java @@ -17,6 +17,9 @@ package org.apache.celeborn.plugin.flink; +import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull; +import static org.apache.celeborn.plugin.flink.utils.Utils.checkState; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; @@ -52,6 +55,8 @@ public class RemoteShuffleResultPartition extends ResultPartition { private final RemoteShuffleResultPartitionDelegation delegation; + private final SupplierWithException<BufferPool, IOException> bufferPoolFactory; + public RemoteShuffleResultPartition( String owningTaskName, int partitionIndex, @@ -79,11 +84,24 @@ public class RemoteShuffleResultPartition extends ResultPartition { delegation = new RemoteShuffleResultPartitionDelegation( networkBufferSize, outputGate, this::updateStatistics, numSubpartitions); + this.bufferPoolFactory = bufferPoolFactory; } @Override public void setup() throws IOException { - super.setup(); + // We can't call the `setup` method of the base class, otherwise it will cause a partition leak. + // The reason is that this partition will be registered to the partition manager during + // `super.setup()`. + // Since this is a cluster/remote partition(i.e. resources are not stored on the Flink TM), + // Flink does not trigger the resource releasing over TM. Therefore, the partition object is + // leaked. + // So we copy the logic of `setup` but don't register partition to partition manager. + checkState( + this.bufferPool == null, + "Bug in result partition setup logic: Already registered buffer pool."); + this.bufferPool = checkNotNull(bufferPoolFactory.get()); + // this is an empty method, but still call it in case of we implement it in the future. + setupInternal(); BufferUtils.reserveNumRequiredBuffers(bufferPool, 1); delegation.setup( bufferPool, bufferCompressor, this::canBeCompressed, this::checkInProduceState); diff --git a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java index 37ff33903..8aaa013c5 100644 --- a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java +++ b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java @@ -17,6 +17,9 @@ package org.apache.celeborn.plugin.flink; +import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull; +import static org.apache.celeborn.plugin.flink.utils.Utils.checkState; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; @@ -52,6 +55,8 @@ public class RemoteShuffleResultPartition extends ResultPartition { private final RemoteShuffleResultPartitionDelegation delegation; + private final SupplierWithException<BufferPool, IOException> bufferPoolFactory; + public RemoteShuffleResultPartition( String owningTaskName, int partitionIndex, @@ -79,11 +84,24 @@ public class RemoteShuffleResultPartition extends ResultPartition { delegation = new RemoteShuffleResultPartitionDelegation( networkBufferSize, outputGate, this::updateStatistics, numSubpartitions); + this.bufferPoolFactory = bufferPoolFactory; } @Override public void setup() throws IOException { - super.setup(); + // We can't call the `setup` method of the base class, otherwise it will cause a partition leak. + // The reason is that this partition will be registered to the partition manager during + // `super.setup()`. + // Since this is a cluster/remote partition(i.e. resources are not stored on the Flink TM), + // Flink does not trigger the resource releasing over TM. Therefore, the partition object is + // leaked. + // So we copy the logic of `setup` but don't register partition to partition manager. + checkState( + this.bufferPool == null, + "Bug in result partition setup logic: Already registered buffer pool."); + this.bufferPool = checkNotNull(bufferPoolFactory.get()); + // this is an empty method, but still call it in case of we implement it in the future. + setupInternal(); BufferUtils.reserveNumRequiredBuffers(bufferPool, 1); delegation.setup( bufferPool, bufferCompressor, this::canBeCompressed, this::checkInProduceState);
