This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new a577fe5d3 [CELEBORN-1579] Fix the memory leak of result partition
a577fe5d3 is described below
commit a577fe5d377ec9fd79def86a05ecb1bbf225597d
Author: Weijie Guo <[email protected]>
AuthorDate: Tue Aug 27 15:19:22 2024 +0800
[CELEBORN-1579] Fix the memory leak of result partition
### What changes were proposed in this pull request?
Don't register `RemoteShuffleResultPartition` to partition manager because
we don't store resource in TM.
In theory, we could also fix this on the Flink side by no longer
registering partitions to the partition manager for remote/cluster partitions,
but that will only be released after Flink 2.0 at the soon.
### Why are the changes needed?
`RemoteShuffleResultPartition` will be registered to the partition
manager(at setup phase). Since it's a cluster partition(resources are not
stored on the Flink TM), Flink does not trigger the resource releasing over TM.
In a session cluster, the partition object is leaked. As a more serious
consequence, the failure of the partition to release will result in the idle TM
not being reclaimed by the Flink resource manager.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Self tested in a session cluster.
Closes #2706 from reswqa/fix-leak.
Authored-by: Weijie Guo <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../plugin/flink/RemoteShuffleResultPartition.java | 18 +++++++++++++++++-
.../plugin/flink/RemoteShuffleResultPartition.java | 18 +++++++++++++++++-
.../plugin/flink/RemoteShuffleResultPartition.java | 20 +++++++++++++++++++-
.../plugin/flink/RemoteShuffleResultPartition.java | 20 +++++++++++++++++++-
.../plugin/flink/RemoteShuffleResultPartition.java | 20 +++++++++++++++++++-
.../plugin/flink/RemoteShuffleResultPartition.java | 20 +++++++++++++++++++-
.../plugin/flink/RemoteShuffleResultPartition.java | 20 +++++++++++++++++++-
7 files changed, 129 insertions(+), 7 deletions(-)
diff --git
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index fa8b8692a..286a7677f 100644
---
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -17,6 +17,9 @@
package org.apache.celeborn.plugin.flink;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
@@ -55,6 +58,8 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
RemoteShuffleResultPartitionDelegation delegation;
+ private final SupplierWithException<BufferPool, IOException>
bufferPoolFactory;
+
public RemoteShuffleResultPartition(
String owningTaskName,
int partitionIndex,
@@ -82,11 +87,22 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
delegation =
new RemoteShuffleResultPartitionDelegation(
networkBufferSize, outputGate, this::updateStatistics,
numSubpartitions);
+ this.bufferPoolFactory = bufferPoolFactory;
}
@Override
public void setup() throws IOException {
- super.setup();
+ // We can't call the `setup` method of the base class, otherwise it will
cause a partition leak.
+ // The reason is that this partition will be registered to the partition
manager during
+ // `super.setup()`.
+ // Since this is a cluster/remote partition(i.e. resources are not stored
on the Flink TM),
+ // Flink does not trigger the resource releasing over TM. Therefore, the
partition object is
+ // leaked.
+ // So we copy the logic of `setup` but don't register partition to
partition manager.
+ checkState(
+ this.bufferPool == null,
+ "Bug in result partition setup logic: Already registered buffer
pool.");
+ this.bufferPool = checkNotNull(bufferPoolFactory.get());
BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
delegation.setup(
bufferPool, bufferCompressor, this::canBeCompressed,
this::checkInProduceState);
diff --git
a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index 3e35b788f..e507526f5 100644
---
a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++
b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -17,6 +17,9 @@
package org.apache.celeborn.plugin.flink;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
@@ -51,6 +54,8 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
private final RemoteShuffleResultPartitionDelegation delegation;
+ private final SupplierWithException<BufferPool, IOException>
bufferPoolFactory;
+
public RemoteShuffleResultPartition(
String owningTaskName,
int partitionIndex,
@@ -78,11 +83,22 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
delegation =
new RemoteShuffleResultPartitionDelegation(
networkBufferSize, outputGate, this::updateStatistics,
numSubpartitions);
+ this.bufferPoolFactory = bufferPoolFactory;
}
@Override
public void setup() throws IOException {
- super.setup();
+ // We can't call the `setup` method of the base class, otherwise it will
cause a partition leak.
+ // The reason is that this partition will be registered to the partition
manager during
+ // `super.setup()`.
+ // Since this is a cluster/remote partition(i.e. resources are not stored
on the Flink TM),
+ // Flink does not trigger the resource releasing over TM. Therefore, the
partition object is
+ // leaked.
+ // So we copy the logic of `setup` but don't register partition to
partition manager.
+ checkState(
+ this.bufferPool == null,
+ "Bug in result partition setup logic: Already registered buffer
pool.");
+ this.bufferPool = checkNotNull(bufferPoolFactory.get());
BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
delegation.setup(
bufferPool, bufferCompressor, this::canBeCompressed,
this::checkInProduceState);
diff --git
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index eb46a290d..5aa057ae1 100644
---
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -17,6 +17,9 @@
package org.apache.celeborn.plugin.flink;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
@@ -52,6 +55,8 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
private final RemoteShuffleResultPartitionDelegation delegation;
+ private final SupplierWithException<BufferPool, IOException>
bufferPoolFactory;
+
public RemoteShuffleResultPartition(
String owningTaskName,
int partitionIndex,
@@ -82,11 +87,24 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
outputGate,
(bufferWithChannel, isBroadcast) ->
updateStatistics(bufferWithChannel, isBroadcast),
numSubpartitions);
+ this.bufferPoolFactory = bufferPoolFactory;
}
@Override
public void setup() throws IOException {
- super.setup();
+ // We can't call the `setup` method of the base class, otherwise it will
cause a partition leak.
+ // The reason is that this partition will be registered to the partition
manager during
+ // `super.setup()`.
+ // Since this is a cluster/remote partition(i.e. resources are not stored
on the Flink TM),
+ // Flink does not trigger the resource releasing over TM. Therefore, the
partition object is
+ // leaked.
+ // So we copy the logic of `setup` but don't register partition to
partition manager.
+ checkState(
+ this.bufferPool == null,
+ "Bug in result partition setup logic: Already registered buffer
pool.");
+ this.bufferPool = checkNotNull(bufferPoolFactory.get());
+ // this is an empty method, but still call it in case of we implement it
in the future.
+ setupInternal();
BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
delegation.setup(
bufferPool,
diff --git
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index 37ff33903..8aaa013c5 100644
---
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -17,6 +17,9 @@
package org.apache.celeborn.plugin.flink;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
@@ -52,6 +55,8 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
private final RemoteShuffleResultPartitionDelegation delegation;
+ private final SupplierWithException<BufferPool, IOException>
bufferPoolFactory;
+
public RemoteShuffleResultPartition(
String owningTaskName,
int partitionIndex,
@@ -79,11 +84,24 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
delegation =
new RemoteShuffleResultPartitionDelegation(
networkBufferSize, outputGate, this::updateStatistics,
numSubpartitions);
+ this.bufferPoolFactory = bufferPoolFactory;
}
@Override
public void setup() throws IOException {
- super.setup();
+ // We can't call the `setup` method of the base class, otherwise it will
cause a partition leak.
+ // The reason is that this partition will be registered to the partition
manager during
+ // `super.setup()`.
+ // Since this is a cluster/remote partition(i.e. resources are not stored
on the Flink TM),
+ // Flink does not trigger the resource releasing over TM. Therefore, the
partition object is
+ // leaked.
+ // So we copy the logic of `setup` but don't register partition to
partition manager.
+ checkState(
+ this.bufferPool == null,
+ "Bug in result partition setup logic: Already registered buffer
pool.");
+ this.bufferPool = checkNotNull(bufferPoolFactory.get());
+ // this is an empty method, but still call it in case of we implement it
in the future.
+ setupInternal();
BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
delegation.setup(
bufferPool, bufferCompressor, this::canBeCompressed,
this::checkInProduceState);
diff --git
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index 37ff33903..8aaa013c5 100644
---
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -17,6 +17,9 @@
package org.apache.celeborn.plugin.flink;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
@@ -52,6 +55,8 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
private final RemoteShuffleResultPartitionDelegation delegation;
+ private final SupplierWithException<BufferPool, IOException>
bufferPoolFactory;
+
public RemoteShuffleResultPartition(
String owningTaskName,
int partitionIndex,
@@ -79,11 +84,24 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
delegation =
new RemoteShuffleResultPartitionDelegation(
networkBufferSize, outputGate, this::updateStatistics,
numSubpartitions);
+ this.bufferPoolFactory = bufferPoolFactory;
}
@Override
public void setup() throws IOException {
- super.setup();
+ // We can't call the `setup` method of the base class, otherwise it will
cause a partition leak.
+ // The reason is that this partition will be registered to the partition
manager during
+ // `super.setup()`.
+ // Since this is a cluster/remote partition(i.e. resources are not stored
on the Flink TM),
+ // Flink does not trigger the resource releasing over TM. Therefore, the
partition object is
+ // leaked.
+ // So we copy the logic of `setup` but don't register partition to
partition manager.
+ checkState(
+ this.bufferPool == null,
+ "Bug in result partition setup logic: Already registered buffer
pool.");
+ this.bufferPool = checkNotNull(bufferPoolFactory.get());
+ // this is an empty method, but still call it in case of we implement it
in the future.
+ setupInternal();
BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
delegation.setup(
bufferPool, bufferCompressor, this::canBeCompressed,
this::checkInProduceState);
diff --git
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index 37ff33903..8aaa013c5 100644
---
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -17,6 +17,9 @@
package org.apache.celeborn.plugin.flink;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
@@ -52,6 +55,8 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
private final RemoteShuffleResultPartitionDelegation delegation;
+ private final SupplierWithException<BufferPool, IOException>
bufferPoolFactory;
+
public RemoteShuffleResultPartition(
String owningTaskName,
int partitionIndex,
@@ -79,11 +84,24 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
delegation =
new RemoteShuffleResultPartitionDelegation(
networkBufferSize, outputGate, this::updateStatistics,
numSubpartitions);
+ this.bufferPoolFactory = bufferPoolFactory;
}
@Override
public void setup() throws IOException {
- super.setup();
+ // We can't call the `setup` method of the base class, otherwise it will
cause a partition leak.
+ // The reason is that this partition will be registered to the partition
manager during
+ // `super.setup()`.
+ // Since this is a cluster/remote partition(i.e. resources are not stored
on the Flink TM),
+ // Flink does not trigger the resource releasing over TM. Therefore, the
partition object is
+ // leaked.
+ // So we copy the logic of `setup` but don't register partition to
partition manager.
+ checkState(
+ this.bufferPool == null,
+ "Bug in result partition setup logic: Already registered buffer
pool.");
+ this.bufferPool = checkNotNull(bufferPoolFactory.get());
+ // this is an empty method, but still call it in case of we implement it
in the future.
+ setupInternal();
BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
delegation.setup(
bufferPool, bufferCompressor, this::canBeCompressed,
this::checkInProduceState);
diff --git
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index 37ff33903..8aaa013c5 100644
---
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -17,6 +17,9 @@
package org.apache.celeborn.plugin.flink;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
@@ -52,6 +55,8 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
private final RemoteShuffleResultPartitionDelegation delegation;
+ private final SupplierWithException<BufferPool, IOException>
bufferPoolFactory;
+
public RemoteShuffleResultPartition(
String owningTaskName,
int partitionIndex,
@@ -79,11 +84,24 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
delegation =
new RemoteShuffleResultPartitionDelegation(
networkBufferSize, outputGate, this::updateStatistics,
numSubpartitions);
+ this.bufferPoolFactory = bufferPoolFactory;
}
@Override
public void setup() throws IOException {
- super.setup();
+ // We can't call the `setup` method of the base class, otherwise it will
cause a partition leak.
+ // The reason is that this partition will be registered to the partition
manager during
+ // `super.setup()`.
+ // Since this is a cluster/remote partition(i.e. resources are not stored
on the Flink TM),
+ // Flink does not trigger the resource releasing over TM. Therefore, the
partition object is
+ // leaked.
+ // So we copy the logic of `setup` but don't register partition to
partition manager.
+ checkState(
+ this.bufferPool == null,
+ "Bug in result partition setup logic: Already registered buffer
pool.");
+ this.bufferPool = checkNotNull(bufferPoolFactory.get());
+ // this is an empty method, but still call it in case of we implement it
in the future.
+ setupInternal();
BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
delegation.setup(
bufferPool, bufferCompressor, this::canBeCompressed,
this::checkInProduceState);