This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4aa4507507b60ee4a2b7cc3624a9f36e4d3592ce
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Wed Jan 24 20:16:35 2024 +0800

    [hotfix][runtime] Fix resource releasing and recovery logic in shuffle
---
 .../io/network/partition/ResultPartition.java      | 13 +++++--
 .../partition/UnionResultSubpartitionView.java     |  7 ++++
 .../partition/consumer/SingleInputGate.java        |  4 +-
 .../io/network/partition/ResultPartitionTest.java  | 44 ++++++++++++++++++++++
 .../partition/UnionResultSubpartitionViewTest.java |  9 +++++
 5 files changed, 72 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index dfc1df56f60..3e98d6fb8eb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -321,11 +321,16 @@ public abstract class ResultPartition implements 
ResultPartitionWriter {
         } else {
             UnionResultSubpartitionView unionView =
                     new UnionResultSubpartitionView(availabilityListener);
-            for (int i : indexSet.values()) {
-                ResultSubpartitionView view = createSubpartitionView(i, 
unionView);
-                unionView.notifyViewCreated(i, view);
+            try {
+                for (int i : indexSet.values()) {
+                    ResultSubpartitionView view = createSubpartitionView(i, 
unionView);
+                    unionView.notifyViewCreated(i, view);
+                }
+                return unionView;
+            } catch (Exception e) {
+                unionView.releaseAllResources();
+                throw e;
             }
-            return unionView;
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java
index e9f87e8dfa5..cede2da3219 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java
@@ -177,6 +177,13 @@ public class UnionResultSubpartitionView
         for (ResultSubpartitionView view : allViews.values()) {
             view.releaseAllResources();
         }
+        allViews.clear();
+
+        for (Tuple2<ResultSubpartition.BufferAndBacklog, Integer> tuple2 : 
cachedBuffers) {
+            tuple2.f0.buffer().recycleBuffer();
+        }
+        cachedBuffers.clear();
+
         isReleased = true;
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 875ff7c8b71..6993b035e03 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -851,6 +851,7 @@ public class SingleInputGate extends IndexedInputGate {
                         case END_OF_DATA:
                             endOfDatas[inputChannel.getChannelIndex()]++;
                             if (endOfDatas[inputChannel.getChannelIndex()] < 
numSubpartitions) {
+                                buffer.get().recycleBuffer();
                                 continue;
                             }
                             break;
@@ -858,6 +859,7 @@ public class SingleInputGate extends IndexedInputGate {
                             endOfPartitions[inputChannel.getChannelIndex()]++;
                             if (endOfPartitions[inputChannel.getChannelIndex()]
                                     < numSubpartitions) {
+                                buffer.get().recycleBuffer();
                                 continue;
                             }
                             break;
@@ -887,7 +889,7 @@ public class SingleInputGate extends IndexedInputGate {
     private Optional<Buffer> readRecoveredOrNormalBuffer(InputChannel 
inputChannel)
             throws IOException, InterruptedException {
         // Firstly, read the buffers from the recovered channel
-        if (inputChannel instanceof RecoveredInputChannel) {
+        if (inputChannel instanceof RecoveredInputChannel && 
!inputChannel.isReleased()) {
             Optional<Buffer> buffer = readBufferFromInputChannel(inputChannel);
             if (!((RecoveredInputChannel) 
inputChannel).getStateConsumedFuture().isDone()) {
                 return buffer;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 4dd4c815475..8132f64897d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -41,6 +41,8 @@ import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 
@@ -880,6 +882,33 @@ class ResultPartitionTest {
         
assertThat(resultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(0);
     }
 
+    @Test
+    void testReleaseAllResourcesAtFailure() {
+        final int maxNumSubpartitions = 4;
+        final ResultSubpartitionIndexSet indexSet =
+                new ResultSubpartitionIndexSet(0, maxNumSubpartitions);
+        final BufferAvailabilityListener availabilityListener = 
(ResultSubpartitionView view) -> {};
+
+        for (int numSubpartitions = 1; numSubpartitions < maxNumSubpartitions; 
numSubpartitions++) {
+            List<ResultSubpartitionView> views = new ArrayList<>();
+            for (int i = 0; i < numSubpartitions; i++) {
+                views.add(new NoOpResultSubpartitionViewWithReleaseListener());
+            }
+
+            ResultPartition partition =
+                    TestingResultPartition.newBuilder()
+                            .setCreateSubpartitionViewFunction(
+                                    (index, listener) -> views.get(index))
+                            .build();
+
+            assertThatThrownBy(
+                            () -> partition.createSubpartitionView(indexSet, 
availabilityListener))
+                    .isInstanceOf(IndexOutOfBoundsException.class);
+
+            assertThat(views).allMatch(ResultSubpartitionView::isReleased);
+        }
+    }
+
     @NotNull
     private BufferBuilder getFinishedBufferBuilder(
             PipelinedResultPartition resultPartition, int bufferSize) throws 
Exception {
@@ -888,4 +917,19 @@ class ResultPartitionTest {
         bufferBuilder.finish();
         return bufferBuilder;
     }
+
+    private static class NoOpResultSubpartitionViewWithReleaseListener
+            extends NoOpResultSubpartitionView {
+        private boolean isReleased = false;
+
+        @Override
+        public void releaseAllResources() {
+            isReleased = true;
+        }
+
+        @Override
+        public boolean isReleased() {
+            return isReleased;
+        }
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java
index 737a8cb294e..9649dd6d501 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionViewTest.java
@@ -123,12 +123,19 @@ class UnionResultSubpartitionViewTest {
         assertThat(view.isReleased()).isFalse();
         assertThat(view0.isReleased()).isFalse();
         assertThat(view1.isReleased()).isFalse();
+        assertThat(buffers0).allMatch(x -> !x.isRecycled());
+        assertThat(buffers1).allMatch(x -> !x.isRecycled());
+
+        // Verifies that cached buffers are also recycled.
+        view0.notifyDataAvailable();
 
         view.releaseAllResources();
 
         assertThat(view.isReleased()).isTrue();
         assertThat(view0.isReleased()).isTrue();
         assertThat(view1.isReleased()).isTrue();
+        assertThat(buffers0).allMatch(Buffer::isRecycled);
+        assertThat(buffers1).allMatch(Buffer::isRecycled);
     }
 
     private static class TestingResultSubpartitionView extends 
NoOpResultSubpartitionView {
@@ -174,6 +181,8 @@ class UnionResultSubpartitionViewTest {
 
         @Override
         public void releaseAllResources() {
+            buffers.forEach(Buffer::recycleBuffer);
+            buffers.clear();
             isReleased = true;
         }
 

Reply via email to