wsry commented on a change in pull request #16844:
URL: https://github.com/apache/flink/pull/16844#discussion_r690857837
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
##########
@@ -250,15 +250,20 @@ public void release(Throwable cause) {
/** Releases all produced data including both those stored in memory and
persisted on disk. */
protected abstract void releaseInternal();
- @Override
- public void close() {
+ private void closeBufferPool() {
if (bufferPool != null) {
bufferPool.lazyDestroy();
}
}
+ @Override
+ public void close() {
+ closeBufferPool();
+ }
+
@Override
public void fail(@Nullable Throwable throwable) {
+ closeBufferPool();
Review comment:
In my understanding the close method of result partition will be always
called by the task thread to release the network resources regardless of
success or failure. I think for network resources releasing, it is enough. I
guess the only reason to call the close method in the TaskCanceler thread is as
the comment suggested:
>>> // Early release of input and output buffer pools. We do
this
>>> // in order to unblock async Threads, which
produce/consume the
>>> // intermediate streams outside of the main Task Thread
(like
>>> // the Kafka consumer).
I did not look into this and just moved the BufferPool close operation to
the fail method to avoid breaking this behavior.
About the fail method, I think it more likes another release method without
knowing the partition id.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -972,24 +972,28 @@ private void releaseResources() {
for (ResultPartitionWriter partitionWriter :
consumableNotifyingPartitionWriters) {
taskEventDispatcher.unregisterPartition(partitionWriter.getPartitionId());
- if (isCanceledOrFailed()) {
- partitionWriter.fail(getFailureCause());
- }
}
- closeNetworkResources();
+ if (isCanceledOrFailed()) {
+ failAllResultPartitions();
+ }
+ closeAllResultPartitions();
Review comment:
In my understanding the fail method more likes release which mean the
result partition can not be consumed and the close method closes all network
resources (network buffers). We can consume a ResultPartition after the network
buffers are recycled, for example, the blocking partition.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]