This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 133ffab4f19b91d5a5fba58684e2a30a6b8b31c7 Author: Weijie Guo <[email protected]> AuthorDate: Tue Nov 1 10:42:38 2022 +0800 [FLINK-29818] Calculate the release number instead of the survival number for full spilling strategy --- .../partition/hybrid/HsFullSpillingStrategy.java | 20 ++++++++++------- .../hybrid/HsFullSpillingStrategyTest.java | 25 ++++------------------ .../partition/hybrid/HybridShuffleTestUtils.java | 4 ---- 3 files changed, 16 insertions(+), 33 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java index 94a067fbaeb..1d0315a0364 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java @@ -128,20 +128,21 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy { return; } - int survivedNum = (int) (poolSize - poolSize * releaseBufferRatio); + int releaseNum = (int) (poolSize * releaseBufferRatio); int numSubpartitions = spillingInfoProvider.getNumSubpartitions(); - int subpartitionSurvivedNum = survivedNum / numSubpartitions; - + int expectedSubpartitionReleaseNum = releaseNum / numSubpartitions; TreeMap<Integer, Deque<BufferIndexAndChannel>> bufferToRelease = new TreeMap<>(); for (int subpartitionId = 0; subpartitionId < numSubpartitions; subpartitionId++) { Deque<BufferIndexAndChannel> buffersInOrder = spillingInfoProvider.getBuffersInOrder( subpartitionId, SpillStatus.SPILL, ConsumeStatusWithId.ALL_ANY); - // if the number of subpartition buffers less than survived buffers, reserved all of - // them. - int releaseNum = Math.max(0, buffersInOrder.size() - subpartitionSurvivedNum); - while (releaseNum-- != 0) { + // if the number of subpartition spilling buffers less than expected release number, + // release all of them. + int subpartitionReleaseNum = + Math.min(buffersInOrder.size(), expectedSubpartitionReleaseNum); + int subpartitionSurvivedNum = buffersInOrder.size() - subpartitionReleaseNum; + while (subpartitionSurvivedNum-- != 0) { buffersInOrder.pollLast(); } bufferToRelease.put(subpartitionId, buffersInOrder); @@ -149,7 +150,10 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy { // collect results in order for (int i = 0; i < numSubpartitions; i++) { - builder.addBufferToRelease(i, bufferToRelease.getOrDefault(i, new ArrayDeque<>())); + Deque<BufferIndexAndChannel> bufferIndexAndChannels = bufferToRelease.get(i); + if (bufferIndexAndChannels != null && !bufferIndexAndChannels.isEmpty()) { + builder.addBufferToRelease(i, bufferToRelease.getOrDefault(i, new ArrayDeque<>())); + } } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java index e88918b2c0d..ca4ea9348dd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java @@ -102,25 +102,10 @@ class HsFullSpillingStrategyTest { final int subpartition1 = 0; final int subpartition2 = 1; - final int progress1 = 10; - final int progress2 = 20; - List<BufferIndexAndChannel> subpartitionBuffers1 = - createBufferIndexAndChannelsList( - subpartition1, - progress1, - progress1 + 2, - progress1 + 4, - progress1 + 6, - progress1 + 8); + createBufferIndexAndChannelsList(subpartition1, 1, 2, 3, 4, 5); List<BufferIndexAndChannel> subpartitionBuffers2 = - createBufferIndexAndChannelsList( - subpartition2, - progress2 + 1, - progress2 + 3, - progress2 + 5, - progress2 + 7, - progress2 + 9); + createBufferIndexAndChannelsList(subpartition2, 1, 2, 3, 4, 5); TestingSpillingInfoProvider spillInfoProvider = TestingSpillingInfoProvider.builder() @@ -133,8 +118,6 @@ class HsFullSpillingStrategyTest { () -> (int) (10 * NUM_BUFFERS_TRIGGER_SPILLING_RATIO)) .setGetNumTotalRequestedBuffersSupplier(() -> 10) .setGetPoolSizeSupplier(() -> 10) - .setGetNextBufferIndexToConsumeSupplier( - () -> Arrays.asList(progress1, progress2)) .build(); Decision decision = spillStrategy.decideActionWithGlobalInfo(spillInfoProvider); @@ -149,9 +132,9 @@ class HsFullSpillingStrategyTest { Map<Integer, List<BufferIndexAndChannel>> expectedReleaseBuffers = new HashMap<>(); expectedReleaseBuffers.put( - subpartition1, new ArrayList<>(subpartitionBuffers1.subList(0, 2))); + subpartition1, new ArrayList<>(subpartitionBuffers1.subList(0, 3))); expectedReleaseBuffers.put( - subpartition2, new ArrayList<>(subpartitionBuffers2.subList(1, 3))); + subpartition2, new ArrayList<>(subpartitionBuffers2.subList(1, 4))); assertThat(decision.getBufferToRelease()).isEqualTo(expectedReleaseBuffers); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java index f32e3179563..bd26b6d56a8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.io.network.partition.hybrid; -import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.metrics.util.TestCounter; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -39,9 +38,6 @@ public class HybridShuffleTestUtils { int subpartitionId, int... bufferIndexes) { List<BufferIndexAndChannel> bufferIndexAndChannels = new ArrayList<>(); for (int bufferIndex : bufferIndexes) { - MemorySegment segment = - MemorySegmentFactory.allocateUnpooledSegment(MEMORY_SEGMENT_SIZE); - NetworkBuffer buffer = new NetworkBuffer(segment, (ignore) -> {}); bufferIndexAndChannels.add(new BufferIndexAndChannel(bufferIndex, subpartitionId)); } return bufferIndexAndChannels;
