This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7f338e87a777abd102719dcb772b184770b62b88
Author: Weijie Guo <[email protected]>
AuthorDate: Wed Oct 19 19:44:16 2022 +0800

    [FLINK-28889] Full spilling strategy is no longer consider consuming 
progress.
---
 .../partition/hybrid/HsFullSpillingStrategy.java   | 58 ++++++++--------------
 .../hybrid/HsFullSpillingStrategyTest.java         | 39 +--------------
 2 files changed, 21 insertions(+), 76 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
index f0339ee152a..ac2d86fcc30 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
@@ -22,14 +22,10 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvid
 import 
org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.SpillStatus;
 
 import java.util.ArrayDeque;
-import java.util.ArrayList;
 import java.util.Deque;
-import java.util.List;
 import java.util.Optional;
 import java.util.TreeMap;
 
-import static 
org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyUtils.getBuffersByConsumptionPriorityInOrder;
-
 /** A special implementation of {@link HsSpillingStrategy} that spilled all 
buffers to disk. */
 public class HsFullSpillingStrategy implements HsSpillingStrategy {
     private final float numBuffersTriggerSpillingRatio;
@@ -118,6 +114,11 @@ public class HsFullSpillingStrategy implements 
HsSpillingStrategy {
         }
     }
 
+    /**
+     * Release subpartition's spilled buffer from head. Each subpartition 
fairly retains a fixed
+     * number of buffers, and all the remaining buffers are released. If this 
subpartition does not
+     * have so many qualified buffers, all of them will be retained.
+     */
     private void checkRelease(
             HsSpillingInfoProvider spillingInfoProvider, int poolSize, 
Decision.Builder builder) {
         if (spillingInfoProvider.getNumTotalRequestedBuffers() < poolSize * 
releaseThreshold) {
@@ -125,47 +126,28 @@ public class HsFullSpillingStrategy implements 
HsSpillingStrategy {
             return;
         }
 
-        int releaseNum = (int) (spillingInfoProvider.getPoolSize() * 
releaseBufferRatio);
+        int survivedNum = (int) (poolSize - poolSize * releaseBufferRatio);
+        int numSubpartitions = spillingInfoProvider.getNumSubpartitions();
+        int subpartitionSurvivedNum = survivedNum / numSubpartitions;
 
-        // first, release all consumed buffers
-        TreeMap<Integer, Deque<BufferIndexAndChannel>> 
consumedBuffersToRelease = new TreeMap<>();
-        int numConsumedBuffers = 0;
-        for (int subpartitionId = 0;
-                subpartitionId < spillingInfoProvider.getNumSubpartitions();
-                subpartitionId++) {
+        TreeMap<Integer, Deque<BufferIndexAndChannel>> bufferToRelease = new 
TreeMap<>();
 
-            Deque<BufferIndexAndChannel> consumedSpillSubpartitionBuffers =
+        for (int subpartitionId = 0; subpartitionId < numSubpartitions; 
subpartitionId++) {
+            Deque<BufferIndexAndChannel> buffersInOrder =
                     spillingInfoProvider.getBuffersInOrder(
-                            subpartitionId, SpillStatus.SPILL, 
ConsumeStatus.CONSUMED);
-            numConsumedBuffers += consumedSpillSubpartitionBuffers.size();
-            consumedBuffersToRelease.put(subpartitionId, 
consumedSpillSubpartitionBuffers);
-        }
-
-        // make up the releaseNum with unconsumed buffers, if needed, w.r.t. 
the consuming priority
-        TreeMap<Integer, List<BufferIndexAndChannel>> 
unconsumedBufferToRelease = new TreeMap<>();
-        if (releaseNum > numConsumedBuffers) {
-            TreeMap<Integer, Deque<BufferIndexAndChannel>> unconsumedBuffers = 
new TreeMap<>();
-            for (int subpartitionId = 0;
-                    subpartitionId < 
spillingInfoProvider.getNumSubpartitions();
-                    subpartitionId++) {
-                unconsumedBuffers.put(
-                        subpartitionId,
-                        spillingInfoProvider.getBuffersInOrder(
-                                subpartitionId, SpillStatus.SPILL, 
ConsumeStatus.NOT_CONSUMED));
+                            subpartitionId, SpillStatus.SPILL, 
ConsumeStatus.ALL);
+            // if the number of subpartition buffers less than survived 
buffers, reserved all of
+            // them.
+            int releaseNum = Math.max(0, buffersInOrder.size() - 
subpartitionSurvivedNum);
+            while (releaseNum-- != 0) {
+                buffersInOrder.pollLast();
             }
-            unconsumedBufferToRelease.putAll(
-                    getBuffersByConsumptionPriorityInOrder(
-                            spillingInfoProvider.getNextBufferIndexToConsume(),
-                            unconsumedBuffers,
-                            releaseNum - numConsumedBuffers));
+            bufferToRelease.put(subpartitionId, buffersInOrder);
         }
 
         // collect results in order
-        for (int i = 0; i < spillingInfoProvider.getNumSubpartitions(); i++) {
-            List<BufferIndexAndChannel> toRelease = new ArrayList<>();
-            toRelease.addAll(consumedBuffersToRelease.getOrDefault(i, new 
ArrayDeque<>()));
-            toRelease.addAll(unconsumedBufferToRelease.getOrDefault(i, new 
ArrayList<>()));
-            builder.addBufferToRelease(i, toRelease);
+        for (int i = 0; i < numSubpartitions; i++) {
+            builder.addBufferToRelease(i, bufferToRelease.getOrDefault(i, new 
ArrayDeque<>()));
         }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
index 3602b15b2d1..e88918b2c0d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
@@ -32,7 +32,6 @@ import java.util.Optional;
 
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createBufferIndexAndChannelsList;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.entry;
 
 /** Tests for {@link HsFullSpillingStrategy}. */
 class HsFullSpillingStrategyTest {
@@ -129,9 +128,7 @@ class HsFullSpillingStrategyTest {
                         .addSubpartitionBuffers(subpartition1, 
subpartitionBuffers1)
                         .addSubpartitionBuffers(subpartition2, 
subpartitionBuffers2)
                         .addSpillBuffers(subpartition1, Arrays.asList(0, 1, 2, 
3))
-                        .addConsumedBuffers(subpartition1, Arrays.asList(0, 1))
                         .addSpillBuffers(subpartition2, Arrays.asList(1, 2, 3))
-                        .addConsumedBuffers(subpartition2, Arrays.asList(0, 1))
                         .setGetNumTotalUnSpillBuffersSupplier(
                                 () -> (int) (10 * 
NUM_BUFFERS_TRIGGER_SPILLING_RATIO))
                         .setGetNumTotalRequestedBuffersSupplier(() -> 10)
@@ -151,47 +148,13 @@ class HsFullSpillingStrategyTest {
         
assertThat(decision.getBufferToSpill()).isEqualTo(expectedSpillBuffers);
 
         Map<Integer, List<BufferIndexAndChannel>> expectedReleaseBuffers = new 
HashMap<>();
-        // all consumed spill buffers should release.
         expectedReleaseBuffers.put(
                 subpartition1, new ArrayList<>(subpartitionBuffers1.subList(0, 
2)));
-        // priority higher buffers should release.
-        
expectedReleaseBuffers.get(subpartition1).addAll(subpartitionBuffers1.subList(3,
 4));
-        // all consumed spill buffers should release.
         expectedReleaseBuffers.put(
-                subpartition2, new ArrayList<>(subpartitionBuffers2.subList(1, 
2)));
-        // priority higher buffers should release.
-        
expectedReleaseBuffers.get(subpartition2).addAll(subpartitionBuffers2.subList(2,
 4));
+                subpartition2, new ArrayList<>(subpartitionBuffers2.subList(1, 
3)));
         
assertThat(decision.getBufferToRelease()).isEqualTo(expectedReleaseBuffers);
     }
 
-    /** All consumed buffers that already spill should release regardless of 
the release ratio. */
-    @Test
-    void testDecideActionWithGlobalInfoAllConsumedSpillBufferShouldRelease() {
-        final int subpartitionId = 0;
-        List<BufferIndexAndChannel> subpartitionBuffers =
-                createBufferIndexAndChannelsList(subpartitionId, 0, 1, 2, 3, 
4);
-
-        final int poolSize = 5;
-        TestingSpillingInfoProvider spillInfoProvider =
-                TestingSpillingInfoProvider.builder()
-                        .setGetNumSubpartitionsSupplier(() -> 1)
-                        .addSubpartitionBuffers(subpartitionId, 
subpartitionBuffers)
-                        .addSpillBuffers(subpartitionId, Arrays.asList(0, 1, 
2, 3, 4))
-                        .addConsumedBuffers(subpartitionId, Arrays.asList(0, 
1, 2, 3))
-                        .setGetNumTotalUnSpillBuffersSupplier(() -> 0)
-                        .setGetNumTotalRequestedBuffersSupplier(() -> poolSize)
-                        .setGetPoolSizeSupplier(() -> poolSize)
-                        .build();
-
-        int numReleaseBuffer = (int) (poolSize * FULL_SPILL_RELEASE_RATIO);
-        Decision decision = 
spillStrategy.decideActionWithGlobalInfo(spillInfoProvider);
-        assertThat(decision.getBufferToSpill()).isEmpty();
-        assertThat(decision.getBufferToRelease())
-                .containsOnly(entry(subpartitionId, 
subpartitionBuffers.subList(0, 4)))
-                .extractingByKey(subpartitionId)
-                .satisfies((buffers) -> 
assertThat(buffers).hasSizeGreaterThan(numReleaseBuffer));
-    }
-
     @Test
     void testOnResultPartitionClosed() {
         final int subpartition1 = 0;

Reply via email to