This is an automated email from the ASF dual-hosted git repository.
yuanmei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 08f98b7 [FLINK-23724][network] Fix the network buffer leak when
ResultPartition is released (#16844)
08f98b7 is described below
commit 08f98b748a203097064028e0ea066939d63b18f0
Author: caoyingjie <[email protected]>
AuthorDate: Fri Aug 20 11:10:45 2021 +0800
[FLINK-23724][network] Fix the network buffer leak when ResultPartition is
released (#16844)
* [FLINK-23724][network][refactor] Make TaskCanceler call
ResultPartitionWriter#fail instead of ResultPartitionWriter#close
Originally, the TaskCanceler calls the ResultPartitionWriter#close method
to early release input and output buffer pools. However, the the
ResultPartitionWriter#close method can also be called by the task thread to
release other network resources which may lead to race conditions. This patch
makes TaskCanceler call ResultPartitionWriter#fail instead of
ResultPartitionWriter#close and close the output buffer pool in
ResultPartitionWriter#fail which avoids the potential race conditions.
This closes #16844.
* [FLINK-23724][network] Fix the network buffer leak when ResultPartition
is released
This patch fixes the network buffer leak issue by closing all
BufferBuilders in the BufferWritingResultPartition#close method.
This closes #16844.
---
.../partition/BufferWritingResultPartition.java | 17 ++++++++
.../io/network/partition/ResultPartition.java | 10 ++++-
.../org/apache/flink/runtime/taskmanager/Task.java | 46 ++++++++++++----------
.../io/network/partition/ResultPartitionTest.java | 39 ++++++------------
4 files changed, 64 insertions(+), 48 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
index b4f2a1e..81e5a72 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
@@ -235,6 +235,23 @@ public abstract class BufferWritingResultPartition extends
ResultPartition {
}
}
+ @Override
+ public void close() {
+ // We can not close these buffers in the release method because of the
potential race
+ // condition. This close method will be only called from the Task
thread itself.
+ if (broadcastBufferBuilder != null) {
+ broadcastBufferBuilder.close();
+ broadcastBufferBuilder = null;
+ }
+ for (int i = 0; i < unicastBufferBuilders.length; ++i) {
+ if (unicastBufferBuilders[i] != null) {
+ unicastBufferBuilders[i].close();
+ unicastBufferBuilders[i] = null;
+ }
+ }
+ super.close();
+ }
+
private BufferBuilder appendUnicastDataForNewRecord(
final ByteBuffer record, final int targetSubpartition) throws
IOException {
if (targetSubpartition < 0 || targetSubpartition >
unicastBufferBuilders.length) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index dc6365c..90d0d92 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -250,15 +250,21 @@ public abstract class ResultPartition implements
ResultPartitionWriter {
/** Releases all produced data including both those stored in memory and
persisted on disk. */
protected abstract void releaseInternal();
- @Override
- public void close() {
+ private void closeBufferPool() {
if (bufferPool != null) {
bufferPool.lazyDestroy();
}
}
@Override
+ public void close() {
+ closeBufferPool();
+ }
+
+ @Override
public void fail(@Nullable Throwable throwable) {
+ // the task canceler thread will call this method to early release the
output buffer pool
+ closeBufferPool();
partitionManager.releasePartition(partitionId, throwable);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index b0891f8..bde0ca6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -972,12 +972,15 @@ public class Task
for (ResultPartitionWriter partitionWriter :
consumableNotifyingPartitionWriters) {
taskEventDispatcher.unregisterPartition(partitionWriter.getPartitionId());
- if (isCanceledOrFailed()) {
- partitionWriter.fail(getFailureCause());
- }
}
- closeNetworkResources();
+ // close network resources
+ if (isCanceledOrFailed()) {
+ failAllResultPartitions();
+ }
+ closeAllResultPartitions();
+ closeAllInputGates();
+
try {
taskStateManager.close();
} catch (Exception e) {
@@ -985,11 +988,13 @@ public class Task
}
}
- /**
- * There are two scenarios to close the network resources. One is from
{@link TaskCanceler} to
- * early release partitions and gates. Another is from task thread during
task exiting.
- */
- private void closeNetworkResources() {
+ private void failAllResultPartitions() {
+ for (ResultPartitionWriter partitionWriter :
consumableNotifyingPartitionWriters) {
+ partitionWriter.fail(getFailureCause());
+ }
+ }
+
+ private void closeAllResultPartitions() {
for (ResultPartitionWriter partitionWriter :
consumableNotifyingPartitionWriters) {
try {
partitionWriter.close();
@@ -999,14 +1004,14 @@ public class Task
"Failed to release result partition for task {}.",
taskNameWithSubtask, t);
}
}
+ }
+ private void closeAllInputGates() {
AbstractInvokable invokable = this.invokable;
if (invokable == null || !invokable.isUsingNonBlockingInput()) {
- // Cleanup resources instead of invokable if it is null,
- // or prevent it from being blocked on input,
- // or interrupt if it is already blocked.
- // Not needed for StreamTask (which does NOT use blocking input);
for which this could
- // cause race conditions
+ // Cleanup resources instead of invokable if it is null, or
prevent it from being
+ // blocked on input, or interrupt if it is already blocked. Not
needed for StreamTask
+ // (which does NOT use blocking input); for which this could cause
race conditions
for (InputGate inputGate : inputGates) {
try {
inputGate.close();
@@ -1182,7 +1187,6 @@ public class Task
Runnable canceler =
new TaskCanceler(
LOG,
- this::closeNetworkResources,
taskCancellationTimeout > 0
? taskCancellationTimeout
:
TaskManagerOptions.TASK_CANCELLATION_TIMEOUT
@@ -1588,10 +1592,9 @@ public class Task
* This runner calls cancel() on the invokable, closes input-/output
resources, and initially
* interrupts the task thread.
*/
- private static class TaskCanceler implements Runnable {
+ private class TaskCanceler implements Runnable {
private final Logger logger;
- private final Runnable networkResourcesCloser;
/** Time to wait after cancellation and interruption before releasing
network resources. */
private final long taskCancellationTimeout;
@@ -1601,13 +1604,11 @@ public class Task
TaskCanceler(
Logger logger,
- Runnable networkResourcesCloser,
long taskCancellationTimeout,
AbstractInvokable invokable,
Thread executer,
String taskName) {
this.logger = logger;
- this.networkResourcesCloser = networkResourcesCloser;
this.taskCancellationTimeout = taskCancellationTimeout;
this.invokable = invokable;
this.executer = executer;
@@ -1640,7 +1641,12 @@ public class Task
// in order to unblock async Threads, which produce/consume the
// intermediate streams outside of the main Task Thread (like
// the Kafka consumer).
- networkResourcesCloser.run();
+ // Notes: 1) This does not mean to release all network
resources,
+ // the task thread itself will release them; 2) We can not
close
+ // ResultPartitions here because of possible race conditions
with
+ // Task thread so we just call the fail here.
+ failAllResultPartitions();
+ closeAllInputGates();
} catch (Throwable t) {
ExceptionUtils.rethrowIfFatalError(t);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index b299edc..7feb002 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -343,50 +343,37 @@ public class ResultPartitionTest {
}
}
- @Test
- public void testReleaseMemoryOnPipelinedPartition() throws Exception {
- testReleaseMemory(ResultPartitionType.PIPELINED);
- }
-
/**
- * Tests {@link ResultPartition#releaseMemory(int)} on a working partition.
- *
- * @param resultPartitionType the result partition type to set up
+ * Tests {@link ResultPartition#close()} and {@link
ResultPartition#release()} on a working
+ * pipelined partition.
*/
- private void testReleaseMemory(final ResultPartitionType
resultPartitionType) throws Exception {
+ @Test
+ public void testReleaseMemoryOnPipelinedPartition() throws Exception {
final int numAllBuffers = 10;
final NettyShuffleEnvironment network =
new NettyShuffleEnvironmentBuilder()
.setNumNetworkBuffers(numAllBuffers)
.setBufferSize(bufferSize)
.build();
- final ResultPartition resultPartition = createPartition(network,
resultPartitionType, 1);
+ final ResultPartition resultPartition =
+ createPartition(network, ResultPartitionType.PIPELINED, 1);
try {
resultPartition.setup();
// take all buffers (more than the minimum required)
for (int i = 0; i < numAllBuffers; ++i) {
- resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
+ resultPartition.emitRecord(ByteBuffer.allocate(bufferSize -
1), 0);
}
- resultPartition.finish();
-
assertEquals(0,
resultPartition.getBufferPool().getNumberOfAvailableMemorySegments());
- // reset the pool size less than the number of requested buffers
- final int numLocalBuffers = 4;
- resultPartition.getBufferPool().setNumBuffers(numLocalBuffers);
+ resultPartition.close();
+ assertTrue(resultPartition.getBufferPool().isDestroyed());
+ assertEquals(
+ numAllBuffers,
network.getNetworkBufferPool().getNumberOfUsedMemorySegments());
- // partition with blocking type should release excess buffers
- if (!resultPartitionType.hasBackPressure()) {
- assertEquals(
- numLocalBuffers,
-
resultPartition.getBufferPool().getNumberOfAvailableMemorySegments());
- } else {
- assertEquals(
- 0,
resultPartition.getBufferPool().getNumberOfAvailableMemorySegments());
- }
- } finally {
resultPartition.release();
+ assertEquals(0,
network.getNetworkBufferPool().getNumberOfUsedMemorySegments());
+ } finally {
network.close();
}
}