This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e930077b9057aae6f634e8638f38949299881887 Author: Weijie Guo <[email protected]> AuthorDate: Thu Jul 28 16:54:04 2022 +0800 [FLINK-27908] ResultPartition's subclass using setupInternal instead of setup to do initialization work. --- .../runtime/io/network/partition/BufferWritingResultPartition.java | 4 +--- .../apache/flink/runtime/io/network/partition/ResultPartition.java | 4 ++++ .../flink/runtime/io/network/partition/SortMergeResultPartition.java | 4 +--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java index 2422f3ad808..8b36a7587d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java @@ -94,9 +94,7 @@ public abstract class BufferWritingResultPartition extends ResultPartition { } @Override - public void setup() throws IOException { - super.setup(); - + protected void setupInternal() throws IOException { checkState( bufferPool.getNumberOfRequiredMemorySegments() >= getNumberOfSubpartitions(), "Bug in result partition setup logic: Buffer pool has not enough guaranteed buffers for" diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index ccdf8f4a138..e3b2e2ff74e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -158,9 +158,13 @@ public abstract class ResultPartition implements ResultPartitionWriter { "Bug in result partition setup logic: Already registered buffer pool."); this.bufferPool = checkNotNull(bufferPoolFactory.get()); + setupInternal(); partitionManager.registerResultPartition(this); } + /** Do the subclass's own setup operation. */ + protected abstract void setupInternal() throws IOException; + public String getOwningTaskName() { return owningTaskName; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java index 1620c36beea..a8c678db053 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java @@ -173,7 +173,7 @@ public class SortMergeResultPartition extends ResultPartition { } @Override - public void setup() throws IOException { + protected void setupInternal() throws IOException { synchronized (lock) { if (isReleased()) { throw new IOException("Result partition has been released."); @@ -189,8 +189,6 @@ public class SortMergeResultPartition extends ResultPartition { // initialize the buffer pool eagerly to avoid reporting errors such as OOM too late readBufferPool.initialize(); - super.setup(); - LOG.info("Sort-merge partition {} initialized.", getPartitionId()); }
