This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 50dc51191f5613fea97ba63e3973db8f29346505 Author: Weijie Guo <[email protected]> AuthorDate: Tue Jul 19 01:46:38 2022 +0800 [hotfix] HsSpillingStrategy.Decision return Map group by subpartition id instead of List --- .../partition/hybrid/HsFullSpillingStrategy.java | 9 +++-- .../hybrid/HsSelectiveSpillingStrategy.java | 12 +++---- .../partition/hybrid/HsSpillingStrategy.java | 42 ++++++++++++++-------- .../hybrid/HsFullSpillingStrategyTest.java | 29 +++++++++------ .../hybrid/HsSelectiveSpillingStrategyTest.java | 19 ++++++---- 5 files changed, 66 insertions(+), 45 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java index 3c4d58369e0..bc2889627d8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java @@ -85,13 +85,12 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy { return; } // Spill all not spill buffers. - List<BufferIndexAndChannel> unSpillBuffers = new ArrayList<>(); for (int i = 0; i < spillingInfoProvider.getNumSubpartitions(); i++) { - unSpillBuffers.addAll( + builder.addBufferToSpill( + i, spillingInfoProvider.getBuffersInOrder( i, SpillStatus.NOT_SPILL, ConsumeStatus.ALL)); } - builder.addBufferToSpill(unSpillBuffers); } private void checkRelease( @@ -138,11 +137,11 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy { } // collect results in order - List<BufferIndexAndChannel> toRelease = new ArrayList<>(); for (int i = 0; i < spillingInfoProvider.getNumSubpartitions(); i++) { + List<BufferIndexAndChannel> toRelease = new ArrayList<>(); toRelease.addAll(consumedBuffersToRelease.getOrDefault(i, new ArrayDeque<>())); toRelease.addAll(unconsumedBufferToRelease.getOrDefault(i, new ArrayList<>())); + builder.addBufferToRelease(i, toRelease); } - builder.addBufferToRelease(toRelease); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java index cd8c6edb8ae..3f173e50f2a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java @@ -95,13 +95,11 @@ public class HsSelectiveSpillingStrategy implements HsSpillingStrategy { spillNum); Decision.Builder builder = Decision.builder(); - subpartitionToHighPriorityBuffers - .values() - .forEach( - buffers -> { - builder.addBufferToSpill(buffers); - builder.addBufferToRelease(buffers); - }); + subpartitionToHighPriorityBuffers.forEach( + (subpartitionId, buffers) -> { + builder.addBufferToSpill(subpartitionId, buffers); + builder.addBufferToRelease(subpartitionId, buffers); + }); return builder.build(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java index 01041aaed26..4525b8fd442 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java @@ -20,7 +20,10 @@ package org.apache.flink.runtime.io.network.partition.hybrid; import java.util.ArrayList; import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; /** @@ -74,26 +77,26 @@ public interface HsSpillingStrategy { */ class Decision { /** A collection of buffer that needs to be spilled to disk. */ - private final List<BufferIndexAndChannel> bufferToSpill; + private final Map<Integer, List<BufferIndexAndChannel>> bufferToSpill; /** A collection of buffer that needs to be released. */ - private final List<BufferIndexAndChannel> bufferToRelease; + private final Map<Integer, List<BufferIndexAndChannel>> bufferToRelease; public static final Decision NO_ACTION = - new Decision(Collections.emptyList(), Collections.emptyList()); + new Decision(Collections.emptyMap(), Collections.emptyMap()); private Decision( - List<BufferIndexAndChannel> bufferToSpill, - List<BufferIndexAndChannel> bufferToRelease) { + Map<Integer, List<BufferIndexAndChannel>> bufferToSpill, + Map<Integer, List<BufferIndexAndChannel>> bufferToRelease) { this.bufferToSpill = bufferToSpill; this.bufferToRelease = bufferToRelease; } - public List<BufferIndexAndChannel> getBufferToSpill() { + public Map<Integer, List<BufferIndexAndChannel>> getBufferToSpill() { return bufferToSpill; } - public List<BufferIndexAndChannel> getBufferToRelease() { + public Map<Integer, List<BufferIndexAndChannel>> getBufferToRelease() { return bufferToRelease; } @@ -104,30 +107,39 @@ public interface HsSpillingStrategy { /** Builder for {@link Decision}. */ static class Builder { /** A collection of buffer that needs to be spilled to disk. */ - private final List<BufferIndexAndChannel> bufferToSpill = new ArrayList<>(); + private final Map<Integer, List<BufferIndexAndChannel>> bufferToSpill = new HashMap<>(); /** A collection of buffer that needs to be released. */ - private final List<BufferIndexAndChannel> bufferToRelease = new ArrayList<>(); + private final Map<Integer, List<BufferIndexAndChannel>> bufferToRelease = + new HashMap<>(); private Builder() {} public Builder addBufferToSpill(BufferIndexAndChannel buffer) { - bufferToSpill.add(buffer); + bufferToSpill.computeIfAbsent(buffer.getChannel(), ArrayList::new).add(buffer); return this; } - public Builder addBufferToSpill(List<BufferIndexAndChannel> buffers) { - bufferToSpill.addAll(buffers); + public Builder addBufferToSpill( + int subpartitionId, List<BufferIndexAndChannel> buffers) { + bufferToSpill.computeIfAbsent(subpartitionId, ArrayList::new).addAll(buffers); + return this; + } + + public Builder addBufferToSpill( + int subpartitionId, Deque<BufferIndexAndChannel> buffers) { + bufferToSpill.computeIfAbsent(subpartitionId, ArrayList::new).addAll(buffers); return this; } public Builder addBufferToRelease(BufferIndexAndChannel buffer) { - bufferToRelease.add(buffer); + bufferToRelease.computeIfAbsent(buffer.getChannel(), ArrayList::new).add(buffer); return this; } - public Builder addBufferToRelease(List<BufferIndexAndChannel> buffers) { - bufferToRelease.addAll(buffers); + public Builder addBufferToRelease( + int subpartitionId, List<BufferIndexAndChannel> buffers) { + bufferToRelease.computeIfAbsent(subpartitionId, ArrayList::new).addAll(buffers); return this; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java index eee3ecb3165..2f7674aa4af 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java @@ -24,11 +24,14 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferIndexAndChannelsList; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.entry; /** Tests for {@link HsFullSpillingStrategy}. */ class HsFullSpillingStrategyTest { @@ -131,21 +134,24 @@ class HsFullSpillingStrategyTest { Decision decision = spillStrategy.decideActionWithGlobalInfo(spillInfoProvider); // all not spilled buffers need to spill. - ArrayList<BufferIndexAndChannel> expectedSpillBuffers = - new ArrayList<>(subpartitionBuffers1.subList(4, 5)); - expectedSpillBuffers.add(subpartitionBuffers2.get(0)); - expectedSpillBuffers.addAll(subpartitionBuffers2.subList(4, 5)); + Map<Integer, List<BufferIndexAndChannel>> expectedSpillBuffers = new HashMap<>(); + expectedSpillBuffers.put(subpartition1, subpartitionBuffers1.subList(4, 5)); + expectedSpillBuffers.put( + subpartition2, new ArrayList<>(subpartitionBuffers2.subList(0, 1))); + expectedSpillBuffers.get(subpartition2).addAll(subpartitionBuffers2.subList(4, 5)); assertThat(decision.getBufferToSpill()).isEqualTo(expectedSpillBuffers); - ArrayList<BufferIndexAndChannel> expectedReleaseBuffers = new ArrayList<>(); + Map<Integer, List<BufferIndexAndChannel>> expectedReleaseBuffers = new HashMap<>(); // all consumed spill buffers should release. - expectedReleaseBuffers.addAll(subpartitionBuffers1.subList(0, 2)); + expectedReleaseBuffers.put( + subpartition1, new ArrayList<>(subpartitionBuffers1.subList(0, 2))); // priority higher buffers should release. - expectedReleaseBuffers.addAll(subpartitionBuffers1.subList(3, 4)); + expectedReleaseBuffers.get(subpartition1).addAll(subpartitionBuffers1.subList(3, 4)); // all consumed spill buffers should release. - expectedReleaseBuffers.addAll(subpartitionBuffers2.subList(1, 2)); + expectedReleaseBuffers.put( + subpartition2, new ArrayList<>(subpartitionBuffers2.subList(1, 2))); // priority higher buffers should release. - expectedReleaseBuffers.addAll(subpartitionBuffers2.subList(2, 4)); + expectedReleaseBuffers.get(subpartition2).addAll(subpartitionBuffers2.subList(2, 4)); assertThat(decision.getBufferToRelease()).isEqualTo(expectedReleaseBuffers); } @@ -172,7 +178,8 @@ class HsFullSpillingStrategyTest { Decision decision = spillStrategy.decideActionWithGlobalInfo(spillInfoProvider); assertThat(decision.getBufferToSpill()).isEmpty(); assertThat(decision.getBufferToRelease()) - .isEqualTo(subpartitionBuffers.subList(0, 4)) - .hasSizeGreaterThan(numReleaseBuffer); + .containsOnly(entry(subpartitionId, subpartitionBuffers.subList(0, 4))) + .extractingByKey(subpartitionId) + .satisfies((buffers) -> assertThat(buffers).hasSizeGreaterThan(numReleaseBuffer)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java index eee7be9f409..9e7d7254dba 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java @@ -22,10 +22,11 @@ import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy.D import org.junit.jupiter.api.Test; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferIndexAndChannelsList; @@ -61,8 +62,12 @@ class HsSelectiveSpillingStrategyTest { (decision -> { assertThat(decision.getBufferToRelease()) .hasSize(1) - .element(0) - .isEqualTo(bufferIndexAndChannel); + .hasEntrySatisfying( + 0, + (list) -> + assertThat(list) + .containsExactly( + bufferIndexAndChannel)); assertThat(decision.getBufferToSpill()).isEmpty(); })); } @@ -120,10 +125,10 @@ class HsSelectiveSpillingStrategyTest { // progress1 + 9 has the highest priority, but it cannot be decided to spill, as its // spillStatus is SPILL. expected buffer's index : progress1 + 6, progress2 + 7, progress3 + // 8 - List<BufferIndexAndChannel> expectedBuffers = new ArrayList<>(); - expectedBuffers.addAll(subpartitionBuffer1.subList(2, 3)); - expectedBuffers.addAll(subpartitionBuffer2.subList(2, 3)); - expectedBuffers.addAll(subpartitionBuffer3.subList(2, 3)); + Map<Integer, List<BufferIndexAndChannel>> expectedBuffers = new HashMap<>(); + expectedBuffers.put(subpartition1, subpartitionBuffer1.subList(2, 3)); + expectedBuffers.put(subpartition2, subpartitionBuffer2.subList(2, 3)); + expectedBuffers.put(subpartition3, subpartitionBuffer3.subList(2, 3)); assertThat(globalDecision.getBufferToSpill()).isEqualTo(expectedBuffers); assertThat(globalDecision.getBufferToRelease()).isEqualTo(expectedBuffers);
