This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 828b3a58aca91e21b2fd328448b977d460e6e369
Author: Weijie Guo <[email protected]>
AuthorDate: Thu Jul 28 18:31:48 2022 +0800

    [FLINK-27908] Extends onResultPartitionClosed to HsSpillingStrategy.
---
 .../partition/hybrid/HsFullSpillingStrategy.java   | 20 +++++++++++++
 .../hybrid/HsSelectiveSpillingStrategy.java        | 20 +++++++++++++
 .../partition/hybrid/HsSpillingStrategy.java       | 15 ++++++++++
 .../hybrid/HsFullSpillingStrategyTest.java         | 33 ++++++++++++++++++++++
 .../hybrid/HsSelectiveSpillingStrategyTest.java    | 32 +++++++++++++++++++++
 .../partition/hybrid/TestingSpillingStrategy.java  | 23 +++++++++++++--
 6 files changed, 141 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
index bc2889627d8..cfc737d2efd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
@@ -79,6 +79,26 @@ public class HsFullSpillingStrategy implements 
HsSpillingStrategy {
         return builder.build();
     }
 
+    @Override
+    public Decision onResultPartitionClosed(HsSpillingInfoProvider 
spillingInfoProvider) {
+        Decision.Builder builder = Decision.builder();
+        for (int subpartitionId = 0;
+                subpartitionId < spillingInfoProvider.getNumSubpartitions();
+                subpartitionId++) {
+            builder.addBufferToSpill(
+                            subpartitionId,
+                            // get all not start spilling buffers.
+                            spillingInfoProvider.getBuffersInOrder(
+                                    subpartitionId, SpillStatus.NOT_SPILL, 
ConsumeStatus.ALL))
+                    .addBufferToRelease(
+                            subpartitionId,
+                            // get all not released buffers.
+                            spillingInfoProvider.getBuffersInOrder(
+                                    subpartitionId, SpillStatus.ALL, 
ConsumeStatus.ALL));
+        }
+        return builder.build();
+    }
+
     private void checkSpill(HsSpillingInfoProvider spillingInfoProvider, 
Decision.Builder builder) {
         if (spillingInfoProvider.getNumTotalUnSpillBuffers() < 
numBuffersTriggerSpilling) {
             // In case situation changed since onBufferFinished() returns 
Optional#empty()
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java
index 3f173e50f2a..dcd53393bdf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java
@@ -102,4 +102,24 @@ public class HsSelectiveSpillingStrategy implements 
HsSpillingStrategy {
                 });
         return builder.build();
     }
+
+    @Override
+    public Decision onResultPartitionClosed(HsSpillingInfoProvider 
spillingInfoProvider) {
+        Decision.Builder builder = Decision.builder();
+        for (int subpartitionId = 0;
+                subpartitionId < spillingInfoProvider.getNumSubpartitions();
+                subpartitionId++) {
+            builder.addBufferToSpill(
+                            subpartitionId,
+                            // get all not start spilling buffers.
+                            spillingInfoProvider.getBuffersInOrder(
+                                    subpartitionId, SpillStatus.NOT_SPILL, 
ConsumeStatus.ALL))
+                    .addBufferToRelease(
+                            subpartitionId,
+                            // get all not released buffers.
+                            spillingInfoProvider.getBuffersInOrder(
+                                    subpartitionId, SpillStatus.ALL, 
ConsumeStatus.ALL));
+        }
+        return builder.build();
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
index 4525b8fd442..fb4a5ab58d5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
@@ -71,6 +71,15 @@ public interface HsSpillingStrategy {
      */
     Decision decideActionWithGlobalInfo(HsSpillingInfoProvider 
spillingInfoProvider);
 
+    /**
+     * Make a decision when result partition is closed. Because this method 
will directly touch the
+     * {@link HsSpillingInfoProvider}, the caller should take care of the 
thread safety.
+     *
+     * @param spillingInfoProvider that provides information about the current 
status.
+     * @return A {@link Decision} based on the global information.
+     */
+    Decision onResultPartitionClosed(HsSpillingInfoProvider 
spillingInfoProvider);
+
     /**
      * This class represents the spill and release decision made by {@link 
HsSpillingStrategy}, in
      * other words, which data is to be spilled and which data is to be 
released.
@@ -143,6 +152,12 @@ public interface HsSpillingStrategy {
                 return this;
             }
 
+            public Builder addBufferToRelease(
+                    int subpartitionId, Deque<BufferIndexAndChannel> buffers) {
+                bufferToRelease.computeIfAbsent(subpartitionId, 
ArrayList::new).addAll(buffers);
+                return this;
+            }
+
             public Decision build() {
                 return new Decision(bufferToSpill, bufferToRelease);
             }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
index b8d1289efb8..44701cbab76 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
@@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -182,4 +183,36 @@ class HsFullSpillingStrategyTest {
                 .extractingByKey(subpartitionId)
                 .satisfies((buffers) -> 
assertThat(buffers).hasSizeGreaterThan(numReleaseBuffer));
     }
+
+    @Test
+    void testOnResultPartitionClosed() {
+        final int subpartition1 = 0;
+        final int subpartition2 = 1;
+
+        List<BufferIndexAndChannel> subpartitionBuffer1 =
+                createBufferIndexAndChannelsList(subpartition1, 0, 1, 2, 3);
+        List<BufferIndexAndChannel> subpartitionBuffer2 =
+                createBufferIndexAndChannelsList(subpartition2, 0, 1, 2);
+        TestingSpillingInfoProvider spillInfoProvider =
+                TestingSpillingInfoProvider.builder()
+                        .setGetNumSubpartitionsSupplier(() -> 2)
+                        .addSubpartitionBuffers(subpartition1, 
subpartitionBuffer1)
+                        .addSubpartitionBuffers(subpartition2, 
subpartitionBuffer2)
+                        .addSpillBuffers(subpartition1, Arrays.asList(2, 3))
+                        .addConsumedBuffers(subpartition1, 
Collections.singletonList(0))
+                        .addSpillBuffers(subpartition2, 
Collections.singletonList(2))
+                        .build();
+
+        Decision decision = 
spillStrategy.onResultPartitionClosed(spillInfoProvider);
+
+        Map<Integer, List<BufferIndexAndChannel>> expectedToSpillBuffers = new 
HashMap<>();
+        expectedToSpillBuffers.put(subpartition1, 
subpartitionBuffer1.subList(0, 2));
+        expectedToSpillBuffers.put(subpartition2, 
subpartitionBuffer2.subList(0, 2));
+        
assertThat(decision.getBufferToSpill()).isEqualTo(expectedToSpillBuffers);
+
+        Map<Integer, List<BufferIndexAndChannel>> expectedToReleaseBuffers = 
new HashMap<>();
+        expectedToReleaseBuffers.put(subpartition1, 
subpartitionBuffer1.subList(0, 4));
+        expectedToReleaseBuffers.put(subpartition2, 
subpartitionBuffer2.subList(0, 3));
+        
assertThat(decision.getBufferToRelease()).isEqualTo(expectedToReleaseBuffers);
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java
index 6862c4c6e4a..dfeb7a0f425 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java
@@ -133,4 +133,36 @@ class HsSelectiveSpillingStrategyTest {
         
assertThat(globalDecision.getBufferToSpill()).isEqualTo(expectedBuffers);
         
assertThat(globalDecision.getBufferToRelease()).isEqualTo(expectedBuffers);
     }
+
+    @Test
+    void testOnResultPartitionClosed() {
+        final int subpartition1 = 0;
+        final int subpartition2 = 1;
+
+        List<BufferIndexAndChannel> subpartitionBuffer1 =
+                createBufferIndexAndChannelsList(subpartition1, 0, 1, 2, 3);
+        List<BufferIndexAndChannel> subpartitionBuffer2 =
+                createBufferIndexAndChannelsList(subpartition2, 0, 1, 2);
+        TestingSpillingInfoProvider spillInfoProvider =
+                TestingSpillingInfoProvider.builder()
+                        .setGetNumSubpartitionsSupplier(() -> 2)
+                        .addSubpartitionBuffers(subpartition1, 
subpartitionBuffer1)
+                        .addSubpartitionBuffers(subpartition2, 
subpartitionBuffer2)
+                        .addSpillBuffers(subpartition1, Arrays.asList(2, 3))
+                        .addConsumedBuffers(subpartition1, 
Collections.singletonList(0))
+                        .addSpillBuffers(subpartition2, 
Collections.singletonList(2))
+                        .build();
+
+        Decision decision = 
spillStrategy.onResultPartitionClosed(spillInfoProvider);
+
+        Map<Integer, List<BufferIndexAndChannel>> expectedToSpillBuffers = new 
HashMap<>();
+        expectedToSpillBuffers.put(subpartition1, 
subpartitionBuffer1.subList(0, 2));
+        expectedToSpillBuffers.put(subpartition2, 
subpartitionBuffer2.subList(0, 2));
+        
assertThat(decision.getBufferToSpill()).isEqualTo(expectedToSpillBuffers);
+
+        Map<Integer, List<BufferIndexAndChannel>> expectedToReleaseBuffers = 
new HashMap<>();
+        expectedToReleaseBuffers.put(subpartition1, 
subpartitionBuffer1.subList(0, 4));
+        expectedToReleaseBuffers.put(subpartition2, 
subpartitionBuffer2.subList(0, 3));
+        
assertThat(decision.getBufferToRelease()).isEqualTo(expectedToReleaseBuffers);
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingStrategy.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingStrategy.java
index 4cce53db0a9..61a730fc5c8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingStrategy.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingStrategy.java
@@ -32,15 +32,19 @@ public class TestingSpillingStrategy implements 
HsSpillingStrategy {
 
     private final Function<HsSpillingInfoProvider, Decision> 
decideActionWithGlobalInfoFunction;
 
+    private final Function<HsSpillingInfoProvider, Decision> 
onResultPartitionClosedFunction;
+
     private TestingSpillingStrategy(
             BiFunction<Integer, Integer, Optional<Decision>> 
onMemoryUsageChangedFunction,
             Function<Integer, Optional<Decision>> onBufferFinishedFunction,
             Function<BufferIndexAndChannel, Optional<Decision>> 
onBufferConsumedFunction,
-            Function<HsSpillingInfoProvider, Decision> 
decideActionWithGlobalInfoFunction) {
+            Function<HsSpillingInfoProvider, Decision> 
decideActionWithGlobalInfoFunction,
+            Function<HsSpillingInfoProvider, Decision> 
onResultPartitionClosedFunction) {
         this.onMemoryUsageChangedFunction = onMemoryUsageChangedFunction;
         this.onBufferFinishedFunction = onBufferFinishedFunction;
         this.onBufferConsumedFunction = onBufferConsumedFunction;
         this.decideActionWithGlobalInfoFunction = 
decideActionWithGlobalInfoFunction;
+        this.onResultPartitionClosedFunction = onResultPartitionClosedFunction;
     }
 
     @Override
@@ -64,6 +68,11 @@ public class TestingSpillingStrategy implements 
HsSpillingStrategy {
         return decideActionWithGlobalInfoFunction.apply(spillingInfoProvider);
     }
 
+    @Override
+    public Decision onResultPartitionClosed(HsSpillingInfoProvider 
spillingInfoProvider) {
+        return onResultPartitionClosedFunction.apply(spillingInfoProvider);
+    }
+
     public static Builder builder() {
         return new Builder();
     }
@@ -82,6 +91,9 @@ public class TestingSpillingStrategy implements 
HsSpillingStrategy {
         private Function<HsSpillingInfoProvider, Decision> 
decideActionWithGlobalInfoFunction =
                 (ignore) -> Decision.NO_ACTION;
 
+        private Function<HsSpillingInfoProvider, Decision> 
onResultPartitionClosedFunction =
+                (ignore) -> Decision.NO_ACTION;
+
         private Builder() {}
 
         public Builder setOnMemoryUsageChangedFunction(
@@ -108,12 +120,19 @@ public class TestingSpillingStrategy implements 
HsSpillingStrategy {
             return this;
         }
 
+        public Builder setOnResultPartitionClosedFunction(
+                Function<HsSpillingInfoProvider, Decision> 
onResultPartitionClosedFunction) {
+            this.onResultPartitionClosedFunction = 
onResultPartitionClosedFunction;
+            return this;
+        }
+
         public TestingSpillingStrategy build() {
             return new TestingSpillingStrategy(
                     onMemoryUsageChangedFunction,
                     onBufferFinishedFunction,
                     onBufferConsumedFunction,
-                    decideActionWithGlobalInfoFunction);
+                    decideActionWithGlobalInfoFunction,
+                    onResultPartitionClosedFunction);
         }
     }
 }

Reply via email to