This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7f338e87a777abd102719dcb772b184770b62b88 Author: Weijie Guo <[email protected]> AuthorDate: Wed Oct 19 19:44:16 2022 +0800 [FLINK-28889] Full spilling strategy is no longer consider consuming progress. --- .../partition/hybrid/HsFullSpillingStrategy.java | 58 ++++++++-------------- .../hybrid/HsFullSpillingStrategyTest.java | 39 +-------------- 2 files changed, 21 insertions(+), 76 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java index f0339ee152a..ac2d86fcc30 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java @@ -22,14 +22,10 @@ import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvid import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.SpillStatus; import java.util.ArrayDeque; -import java.util.ArrayList; import java.util.Deque; -import java.util.List; import java.util.Optional; import java.util.TreeMap; -import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyUtils.getBuffersByConsumptionPriorityInOrder; - /** A special implementation of {@link HsSpillingStrategy} that spilled all buffers to disk. */ public class HsFullSpillingStrategy implements HsSpillingStrategy { private final float numBuffersTriggerSpillingRatio; @@ -118,6 +114,11 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy { } } + /** + * Release subpartition's spilled buffer from head. Each subpartition fairly retains a fixed + * number of buffers, and all the remaining buffers are released. If this subpartition does not + * have so many qualified buffers, all of them will be retained. + */ private void checkRelease( HsSpillingInfoProvider spillingInfoProvider, int poolSize, Decision.Builder builder) { if (spillingInfoProvider.getNumTotalRequestedBuffers() < poolSize * releaseThreshold) { @@ -125,47 +126,28 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy { return; } - int releaseNum = (int) (spillingInfoProvider.getPoolSize() * releaseBufferRatio); + int survivedNum = (int) (poolSize - poolSize * releaseBufferRatio); + int numSubpartitions = spillingInfoProvider.getNumSubpartitions(); + int subpartitionSurvivedNum = survivedNum / numSubpartitions; - // first, release all consumed buffers - TreeMap<Integer, Deque<BufferIndexAndChannel>> consumedBuffersToRelease = new TreeMap<>(); - int numConsumedBuffers = 0; - for (int subpartitionId = 0; - subpartitionId < spillingInfoProvider.getNumSubpartitions(); - subpartitionId++) { + TreeMap<Integer, Deque<BufferIndexAndChannel>> bufferToRelease = new TreeMap<>(); - Deque<BufferIndexAndChannel> consumedSpillSubpartitionBuffers = + for (int subpartitionId = 0; subpartitionId < numSubpartitions; subpartitionId++) { + Deque<BufferIndexAndChannel> buffersInOrder = spillingInfoProvider.getBuffersInOrder( - subpartitionId, SpillStatus.SPILL, ConsumeStatus.CONSUMED); - numConsumedBuffers += consumedSpillSubpartitionBuffers.size(); - consumedBuffersToRelease.put(subpartitionId, consumedSpillSubpartitionBuffers); - } - - // make up the releaseNum with unconsumed buffers, if needed, w.r.t. the consuming priority - TreeMap<Integer, List<BufferIndexAndChannel>> unconsumedBufferToRelease = new TreeMap<>(); - if (releaseNum > numConsumedBuffers) { - TreeMap<Integer, Deque<BufferIndexAndChannel>> unconsumedBuffers = new TreeMap<>(); - for (int subpartitionId = 0; - subpartitionId < spillingInfoProvider.getNumSubpartitions(); - subpartitionId++) { - unconsumedBuffers.put( - subpartitionId, - spillingInfoProvider.getBuffersInOrder( - subpartitionId, SpillStatus.SPILL, ConsumeStatus.NOT_CONSUMED)); + subpartitionId, SpillStatus.SPILL, ConsumeStatus.ALL); + // if the number of subpartition buffers less than survived buffers, reserved all of + // them. + int releaseNum = Math.max(0, buffersInOrder.size() - subpartitionSurvivedNum); + while (releaseNum-- != 0) { + buffersInOrder.pollLast(); } - unconsumedBufferToRelease.putAll( - getBuffersByConsumptionPriorityInOrder( - spillingInfoProvider.getNextBufferIndexToConsume(), - unconsumedBuffers, - releaseNum - numConsumedBuffers)); + bufferToRelease.put(subpartitionId, buffersInOrder); } // collect results in order - for (int i = 0; i < spillingInfoProvider.getNumSubpartitions(); i++) { - List<BufferIndexAndChannel> toRelease = new ArrayList<>(); - toRelease.addAll(consumedBuffersToRelease.getOrDefault(i, new ArrayDeque<>())); - toRelease.addAll(unconsumedBufferToRelease.getOrDefault(i, new ArrayList<>())); - builder.addBufferToRelease(i, toRelease); + for (int i = 0; i < numSubpartitions; i++) { + builder.addBufferToRelease(i, bufferToRelease.getOrDefault(i, new ArrayDeque<>())); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java index 3602b15b2d1..e88918b2c0d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java @@ -32,7 +32,6 @@ import java.util.Optional; import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createBufferIndexAndChannelsList; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.entry; /** Tests for {@link HsFullSpillingStrategy}. */ class HsFullSpillingStrategyTest { @@ -129,9 +128,7 @@ class HsFullSpillingStrategyTest { .addSubpartitionBuffers(subpartition1, subpartitionBuffers1) .addSubpartitionBuffers(subpartition2, subpartitionBuffers2) .addSpillBuffers(subpartition1, Arrays.asList(0, 1, 2, 3)) - .addConsumedBuffers(subpartition1, Arrays.asList(0, 1)) .addSpillBuffers(subpartition2, Arrays.asList(1, 2, 3)) - .addConsumedBuffers(subpartition2, Arrays.asList(0, 1)) .setGetNumTotalUnSpillBuffersSupplier( () -> (int) (10 * NUM_BUFFERS_TRIGGER_SPILLING_RATIO)) .setGetNumTotalRequestedBuffersSupplier(() -> 10) @@ -151,47 +148,13 @@ class HsFullSpillingStrategyTest { assertThat(decision.getBufferToSpill()).isEqualTo(expectedSpillBuffers); Map<Integer, List<BufferIndexAndChannel>> expectedReleaseBuffers = new HashMap<>(); - // all consumed spill buffers should release. expectedReleaseBuffers.put( subpartition1, new ArrayList<>(subpartitionBuffers1.subList(0, 2))); - // priority higher buffers should release. - expectedReleaseBuffers.get(subpartition1).addAll(subpartitionBuffers1.subList(3, 4)); - // all consumed spill buffers should release. expectedReleaseBuffers.put( - subpartition2, new ArrayList<>(subpartitionBuffers2.subList(1, 2))); - // priority higher buffers should release. - expectedReleaseBuffers.get(subpartition2).addAll(subpartitionBuffers2.subList(2, 4)); + subpartition2, new ArrayList<>(subpartitionBuffers2.subList(1, 3))); assertThat(decision.getBufferToRelease()).isEqualTo(expectedReleaseBuffers); } - /** All consumed buffers that already spill should release regardless of the release ratio. */ - @Test - void testDecideActionWithGlobalInfoAllConsumedSpillBufferShouldRelease() { - final int subpartitionId = 0; - List<BufferIndexAndChannel> subpartitionBuffers = - createBufferIndexAndChannelsList(subpartitionId, 0, 1, 2, 3, 4); - - final int poolSize = 5; - TestingSpillingInfoProvider spillInfoProvider = - TestingSpillingInfoProvider.builder() - .setGetNumSubpartitionsSupplier(() -> 1) - .addSubpartitionBuffers(subpartitionId, subpartitionBuffers) - .addSpillBuffers(subpartitionId, Arrays.asList(0, 1, 2, 3, 4)) - .addConsumedBuffers(subpartitionId, Arrays.asList(0, 1, 2, 3)) - .setGetNumTotalUnSpillBuffersSupplier(() -> 0) - .setGetNumTotalRequestedBuffersSupplier(() -> poolSize) - .setGetPoolSizeSupplier(() -> poolSize) - .build(); - - int numReleaseBuffer = (int) (poolSize * FULL_SPILL_RELEASE_RATIO); - Decision decision = spillStrategy.decideActionWithGlobalInfo(spillInfoProvider); - assertThat(decision.getBufferToSpill()).isEmpty(); - assertThat(decision.getBufferToRelease()) - .containsOnly(entry(subpartitionId, subpartitionBuffers.subList(0, 4))) - .extractingByKey(subpartitionId) - .satisfies((buffers) -> assertThat(buffers).hasSizeGreaterThan(numReleaseBuffer)); - } - @Test void testOnResultPartitionClosed() { final int subpartition1 = 0;
