This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4aa4507507b60ee4a2b7cc3624a9f36e4d3592ce Author: yunfengzhou-hub <[email protected]> AuthorDate: Wed Jan 24 20:16:35 2024 +0800 [hotfix][runtime] Fix resource releasing and recovery logic in shuffle --- .../io/network/partition/ResultPartition.java | 13 +++++-- .../partition/UnionResultSubpartitionView.java | 7 ++++ .../partition/consumer/SingleInputGate.java | 4 +- .../io/network/partition/ResultPartitionTest.java | 44 ++++++++++++++++++++++ .../partition/UnionResultSubpartitionViewTest.java | 9 +++++ 5 files changed, 72 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index dfc1df56f60..3e98d6fb8eb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -321,11 +321,16 @@ public abstract class ResultPartition implements ResultPartitionWriter { } else { UnionResultSubpartitionView unionView = new UnionResultSubpartitionView(availabilityListener); - for (int i : indexSet.values()) { - ResultSubpartitionView view = createSubpartitionView(i, unionView); - unionView.notifyViewCreated(i, view); + try { + for (int i : indexSet.values()) { + ResultSubpartitionView view = createSubpartitionView(i, unionView); + unionView.notifyViewCreated(i, view); + } + return unionView; + } catch (Exception e) { + unionView.releaseAllResources(); + throw e; } - return unionView; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java index e9f87e8dfa5..cede2da3219 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java @@ -177,6 +177,13 @@ public class UnionResultSubpartitionView for (ResultSubpartitionView view : allViews.values()) { view.releaseAllResources(); } + allViews.clear(); + + for (Tuple2<ResultSubpartition.BufferAndBacklog, Integer> tuple2 : cachedBuffers) { + tuple2.f0.buffer().recycleBuffer(); + } + cachedBuffers.clear(); + isReleased = true; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 875ff7c8b71..6993b035e03 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -851,6 +851,7 @@ public class SingleInputGate extends IndexedInputGate { case END_OF_DATA: endOfDatas[inputChannel.getChannelIndex()]++; if (endOfDatas[inputChannel.getChannelIndex()] < numSubpartitions) { + buffer.get().recycleBuffer(); continue; } break; @@ -858,6 +859,7 @@ public class SingleInputGate extends IndexedInputGate { endOfPartitions[inputChannel.getChannelIndex()]++; if (endOfPartitions[inputChannel.getChannelIndex()] < numSubpartitions) { + buffer.get().recycleBuffer(); continue; } break; @@ -887,7 +889,7 @@ public class SingleInputGate extends IndexedInputGate { private Optional<Buffer> readRecoveredOrNormalBuffer(InputChannel inputChannel) throws IOException, InterruptedException { // Firstly, read the buffers from the recovered channel - if (inputChannel instanceof RecoveredInputChannel) { + if (inputChannel instanceof RecoveredInputChannel && !inputChannel.isReleased()) { Optional<Buffer> buffer = readBufferFromInputChannel(inputChannel); if (!((RecoveredInputChannel) inputChannel).getStateConsumedFuture().isDone()) { return buffer; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java index 4dd4c815475..8132f64897d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java @@ -41,6 +41,8 @@ import org.junit.jupiter.api.Test; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -880,6 +882,33 @@ class ResultPartitionTest { assertThat(resultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(0); } + @Test + void testReleaseAllResourcesAtFailure() { + final int maxNumSubpartitions = 4; + final ResultSubpartitionIndexSet indexSet = + new ResultSubpartitionIndexSet(0, maxNumSubpartitions); + final BufferAvailabilityListener availabilityListener = (ResultSubpartitionView view) -> {}; + + for (int numSubpartitions = 1; numSubpartitions < maxNumSubpartitions; numSubpartitions++) { + List<ResultSubpartitionView> views = new ArrayList<>(); + for (int i = 0; i < numSubpartitions; i++) { + views.add(new NoOpResultSubpartitionViewWithReleaseListener()); + } + + ResultPartition partition = + TestingResultPartition.newBuilder() + .setCreateSubpartitionViewFunction( + (index, listener) -> views.get(index)) + .build(); + + assertThatThrownBy( + () -> partition.createSubpartitionView(indexSet, availabilityListener)) + .isInstanceOf(IndexOutOfBoundsException.class); + + assertThat(views).allMatch(ResultSubpartitionView::isReleased); + } + } + @NotNull private BufferBuilder getFinishedBufferBuilder( PipelinedResultPartition resultPartition, int bufferSize) throws Exception { @@ -888,4 +917,19 @@ class ResultPartitionTest { bufferBuilder.finish(); return bufferBuilder; } + + private static class NoOpResultSubpartitionViewWithReleaseListener + extends NoOpResultSubpartitionView { + private boolean isReleased = false; + + @Override + public void releaseAllResources() { + isReleased = true; + } + + @Override + public boolean isReleased() { + return isReleased; + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java index 737a8cb294e..9649dd6d501 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java @@ -123,12 +123,19 @@ class UnionResultSubpartitionViewTest { assertThat(view.isReleased()).isFalse(); assertThat(view0.isReleased()).isFalse(); assertThat(view1.isReleased()).isFalse(); + assertThat(buffers0).allMatch(x -> !x.isRecycled()); + assertThat(buffers1).allMatch(x -> !x.isRecycled()); + + // Verifies that cached buffers are also recycled. + view0.notifyDataAvailable(); view.releaseAllResources(); assertThat(view.isReleased()).isTrue(); assertThat(view0.isReleased()).isTrue(); assertThat(view1.isReleased()).isTrue(); + assertThat(buffers0).allMatch(Buffer::isRecycled); + assertThat(buffers1).allMatch(Buffer::isRecycled); } private static class TestingResultSubpartitionView extends NoOpResultSubpartitionView { @@ -174,6 +181,8 @@ class UnionResultSubpartitionViewTest { @Override public void releaseAllResources() { + buffers.forEach(Buffer::recycleBuffer); + buffers.clear(); isReleased = true; }
