This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 50dc51191f5613fea97ba63e3973db8f29346505
Author: Weijie Guo <[email protected]>
AuthorDate: Tue Jul 19 01:46:38 2022 +0800

    [hotfix] HsSpillingStrategy.Decision return Map group by subpartition id 
instead of List
---
 .../partition/hybrid/HsFullSpillingStrategy.java   |  9 +++--
 .../hybrid/HsSelectiveSpillingStrategy.java        | 12 +++----
 .../partition/hybrid/HsSpillingStrategy.java       | 42 ++++++++++++++--------
 .../hybrid/HsFullSpillingStrategyTest.java         | 29 +++++++++------
 .../hybrid/HsSelectiveSpillingStrategyTest.java    | 19 ++++++----
 5 files changed, 66 insertions(+), 45 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
index 3c4d58369e0..bc2889627d8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
@@ -85,13 +85,12 @@ public class HsFullSpillingStrategy implements 
HsSpillingStrategy {
             return;
         }
         // Spill all not spill buffers.
-        List<BufferIndexAndChannel> unSpillBuffers = new ArrayList<>();
         for (int i = 0; i < spillingInfoProvider.getNumSubpartitions(); i++) {
-            unSpillBuffers.addAll(
+            builder.addBufferToSpill(
+                    i,
                     spillingInfoProvider.getBuffersInOrder(
                             i, SpillStatus.NOT_SPILL, ConsumeStatus.ALL));
         }
-        builder.addBufferToSpill(unSpillBuffers);
     }
 
     private void checkRelease(
@@ -138,11 +137,11 @@ public class HsFullSpillingStrategy implements 
HsSpillingStrategy {
         }
 
         // collect results in order
-        List<BufferIndexAndChannel> toRelease = new ArrayList<>();
         for (int i = 0; i < spillingInfoProvider.getNumSubpartitions(); i++) {
+            List<BufferIndexAndChannel> toRelease = new ArrayList<>();
             toRelease.addAll(consumedBuffersToRelease.getOrDefault(i, new 
ArrayDeque<>()));
             toRelease.addAll(unconsumedBufferToRelease.getOrDefault(i, new 
ArrayList<>()));
+            builder.addBufferToRelease(i, toRelease);
         }
-        builder.addBufferToRelease(toRelease);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java
index cd8c6edb8ae..3f173e50f2a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java
@@ -95,13 +95,11 @@ public class HsSelectiveSpillingStrategy implements 
HsSpillingStrategy {
                         spillNum);
 
         Decision.Builder builder = Decision.builder();
-        subpartitionToHighPriorityBuffers
-                .values()
-                .forEach(
-                        buffers -> {
-                            builder.addBufferToSpill(buffers);
-                            builder.addBufferToRelease(buffers);
-                        });
+        subpartitionToHighPriorityBuffers.forEach(
+                (subpartitionId, buffers) -> {
+                    builder.addBufferToSpill(subpartitionId, buffers);
+                    builder.addBufferToRelease(subpartitionId, buffers);
+                });
         return builder.build();
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
index 01041aaed26..4525b8fd442 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
@@ -20,7 +20,10 @@ package org.apache.flink.runtime.io.network.partition.hybrid;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -74,26 +77,26 @@ public interface HsSpillingStrategy {
      */
     class Decision {
         /** A collection of buffer that needs to be spilled to disk. */
-        private final List<BufferIndexAndChannel> bufferToSpill;
+        private final Map<Integer, List<BufferIndexAndChannel>> bufferToSpill;
 
         /** A collection of buffer that needs to be released. */
-        private final List<BufferIndexAndChannel> bufferToRelease;
+        private final Map<Integer, List<BufferIndexAndChannel>> 
bufferToRelease;
 
         public static final Decision NO_ACTION =
-                new Decision(Collections.emptyList(), Collections.emptyList());
+                new Decision(Collections.emptyMap(), Collections.emptyMap());
 
         private Decision(
-                List<BufferIndexAndChannel> bufferToSpill,
-                List<BufferIndexAndChannel> bufferToRelease) {
+                Map<Integer, List<BufferIndexAndChannel>> bufferToSpill,
+                Map<Integer, List<BufferIndexAndChannel>> bufferToRelease) {
             this.bufferToSpill = bufferToSpill;
             this.bufferToRelease = bufferToRelease;
         }
 
-        public List<BufferIndexAndChannel> getBufferToSpill() {
+        public Map<Integer, List<BufferIndexAndChannel>> getBufferToSpill() {
             return bufferToSpill;
         }
 
-        public List<BufferIndexAndChannel> getBufferToRelease() {
+        public Map<Integer, List<BufferIndexAndChannel>> getBufferToRelease() {
             return bufferToRelease;
         }
 
@@ -104,30 +107,39 @@ public interface HsSpillingStrategy {
         /** Builder for {@link Decision}. */
         static class Builder {
             /** A collection of buffer that needs to be spilled to disk. */
-            private final List<BufferIndexAndChannel> bufferToSpill = new 
ArrayList<>();
+            private final Map<Integer, List<BufferIndexAndChannel>> 
bufferToSpill = new HashMap<>();
 
             /** A collection of buffer that needs to be released. */
-            private final List<BufferIndexAndChannel> bufferToRelease = new 
ArrayList<>();
+            private final Map<Integer, List<BufferIndexAndChannel>> 
bufferToRelease =
+                    new HashMap<>();
 
             private Builder() {}
 
             public Builder addBufferToSpill(BufferIndexAndChannel buffer) {
-                bufferToSpill.add(buffer);
+                bufferToSpill.computeIfAbsent(buffer.getChannel(), 
ArrayList::new).add(buffer);
                 return this;
             }
 
-            public Builder addBufferToSpill(List<BufferIndexAndChannel> 
buffers) {
-                bufferToSpill.addAll(buffers);
+            public Builder addBufferToSpill(
+                    int subpartitionId, List<BufferIndexAndChannel> buffers) {
+                bufferToSpill.computeIfAbsent(subpartitionId, 
ArrayList::new).addAll(buffers);
+                return this;
+            }
+
+            public Builder addBufferToSpill(
+                    int subpartitionId, Deque<BufferIndexAndChannel> buffers) {
+                bufferToSpill.computeIfAbsent(subpartitionId, 
ArrayList::new).addAll(buffers);
                 return this;
             }
 
             public Builder addBufferToRelease(BufferIndexAndChannel buffer) {
-                bufferToRelease.add(buffer);
+                bufferToRelease.computeIfAbsent(buffer.getChannel(), 
ArrayList::new).add(buffer);
                 return this;
             }
 
-            public Builder addBufferToRelease(List<BufferIndexAndChannel> 
buffers) {
-                bufferToRelease.addAll(buffers);
+            public Builder addBufferToRelease(
+                    int subpartitionId, List<BufferIndexAndChannel> buffers) {
+                bufferToRelease.computeIfAbsent(subpartitionId, 
ArrayList::new).addAll(buffers);
                 return this;
             }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
index eee3ecb3165..2f7674aa4af 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
@@ -24,11 +24,14 @@ import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferIndexAndChannelsList;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.entry;
 
 /** Tests for {@link HsFullSpillingStrategy}. */
 class HsFullSpillingStrategyTest {
@@ -131,21 +134,24 @@ class HsFullSpillingStrategyTest {
         Decision decision = 
spillStrategy.decideActionWithGlobalInfo(spillInfoProvider);
 
         // all not spilled buffers need to spill.
-        ArrayList<BufferIndexAndChannel> expectedSpillBuffers =
-                new ArrayList<>(subpartitionBuffers1.subList(4, 5));
-        expectedSpillBuffers.add(subpartitionBuffers2.get(0));
-        expectedSpillBuffers.addAll(subpartitionBuffers2.subList(4, 5));
+        Map<Integer, List<BufferIndexAndChannel>> expectedSpillBuffers = new 
HashMap<>();
+        expectedSpillBuffers.put(subpartition1, 
subpartitionBuffers1.subList(4, 5));
+        expectedSpillBuffers.put(
+                subpartition2, new ArrayList<>(subpartitionBuffers2.subList(0, 
1)));
+        
expectedSpillBuffers.get(subpartition2).addAll(subpartitionBuffers2.subList(4, 
5));
         
assertThat(decision.getBufferToSpill()).isEqualTo(expectedSpillBuffers);
 
-        ArrayList<BufferIndexAndChannel> expectedReleaseBuffers = new 
ArrayList<>();
+        Map<Integer, List<BufferIndexAndChannel>> expectedReleaseBuffers = new 
HashMap<>();
         // all consumed spill buffers should release.
-        expectedReleaseBuffers.addAll(subpartitionBuffers1.subList(0, 2));
+        expectedReleaseBuffers.put(
+                subpartition1, new ArrayList<>(subpartitionBuffers1.subList(0, 
2)));
         // priority higher buffers should release.
-        expectedReleaseBuffers.addAll(subpartitionBuffers1.subList(3, 4));
+        
expectedReleaseBuffers.get(subpartition1).addAll(subpartitionBuffers1.subList(3,
 4));
         // all consumed spill buffers should release.
-        expectedReleaseBuffers.addAll(subpartitionBuffers2.subList(1, 2));
+        expectedReleaseBuffers.put(
+                subpartition2, new ArrayList<>(subpartitionBuffers2.subList(1, 
2)));
         // priority higher buffers should release.
-        expectedReleaseBuffers.addAll(subpartitionBuffers2.subList(2, 4));
+        
expectedReleaseBuffers.get(subpartition2).addAll(subpartitionBuffers2.subList(2,
 4));
         
assertThat(decision.getBufferToRelease()).isEqualTo(expectedReleaseBuffers);
     }
 
@@ -172,7 +178,8 @@ class HsFullSpillingStrategyTest {
         Decision decision = 
spillStrategy.decideActionWithGlobalInfo(spillInfoProvider);
         assertThat(decision.getBufferToSpill()).isEmpty();
         assertThat(decision.getBufferToRelease())
-                .isEqualTo(subpartitionBuffers.subList(0, 4))
-                .hasSizeGreaterThan(numReleaseBuffer);
+                .containsOnly(entry(subpartitionId, 
subpartitionBuffers.subList(0, 4)))
+                .extractingByKey(subpartitionId)
+                .satisfies((buffers) -> 
assertThat(buffers).hasSizeGreaterThan(numReleaseBuffer));
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java
index eee7be9f409..9e7d7254dba 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java
@@ -22,10 +22,11 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy.D
 
 import org.junit.jupiter.api.Test;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferIndexAndChannelsList;
@@ -61,8 +62,12 @@ class HsSelectiveSpillingStrategyTest {
                         (decision -> {
                             assertThat(decision.getBufferToRelease())
                                     .hasSize(1)
-                                    .element(0)
-                                    .isEqualTo(bufferIndexAndChannel);
+                                    .hasEntrySatisfying(
+                                            0,
+                                            (list) ->
+                                                    assertThat(list)
+                                                            .containsExactly(
+                                                                    
bufferIndexAndChannel));
                             assertThat(decision.getBufferToSpill()).isEmpty();
                         }));
     }
@@ -120,10 +125,10 @@ class HsSelectiveSpillingStrategyTest {
         // progress1 + 9 has the highest priority, but it cannot be decided to 
spill, as its
         // spillStatus is SPILL. expected buffer's index : progress1 + 6, 
progress2 + 7, progress3 +
         // 8
-        List<BufferIndexAndChannel> expectedBuffers = new ArrayList<>();
-        expectedBuffers.addAll(subpartitionBuffer1.subList(2, 3));
-        expectedBuffers.addAll(subpartitionBuffer2.subList(2, 3));
-        expectedBuffers.addAll(subpartitionBuffer3.subList(2, 3));
+        Map<Integer, List<BufferIndexAndChannel>> expectedBuffers = new 
HashMap<>();
+        expectedBuffers.put(subpartition1, subpartitionBuffer1.subList(2, 3));
+        expectedBuffers.put(subpartition2, subpartitionBuffer2.subList(2, 3));
+        expectedBuffers.put(subpartition3, subpartitionBuffer3.subList(2, 3));
 
         
assertThat(globalDecision.getBufferToSpill()).isEqualTo(expectedBuffers);
         
assertThat(globalDecision.getBufferToRelease()).isEqualTo(expectedBuffers);

Reply via email to