This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 828b3a58aca91e21b2fd328448b977d460e6e369 Author: Weijie Guo <[email protected]> AuthorDate: Thu Jul 28 18:31:48 2022 +0800 [FLINK-27908] Extends onResultPartitionClosed to HsSpillingStrategy. --- .../partition/hybrid/HsFullSpillingStrategy.java | 20 +++++++++++++ .../hybrid/HsSelectiveSpillingStrategy.java | 20 +++++++++++++ .../partition/hybrid/HsSpillingStrategy.java | 15 ++++++++++ .../hybrid/HsFullSpillingStrategyTest.java | 33 ++++++++++++++++++++++ .../hybrid/HsSelectiveSpillingStrategyTest.java | 32 +++++++++++++++++++++ .../partition/hybrid/TestingSpillingStrategy.java | 23 +++++++++++++-- 6 files changed, 141 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java index bc2889627d8..cfc737d2efd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java @@ -79,6 +79,26 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy { return builder.build(); } + @Override + public Decision onResultPartitionClosed(HsSpillingInfoProvider spillingInfoProvider) { + Decision.Builder builder = Decision.builder(); + for (int subpartitionId = 0; + subpartitionId < spillingInfoProvider.getNumSubpartitions(); + subpartitionId++) { + builder.addBufferToSpill( + subpartitionId, + // get all not start spilling buffers. + spillingInfoProvider.getBuffersInOrder( + subpartitionId, SpillStatus.NOT_SPILL, ConsumeStatus.ALL)) + .addBufferToRelease( + subpartitionId, + // get all not released buffers. + spillingInfoProvider.getBuffersInOrder( + subpartitionId, SpillStatus.ALL, ConsumeStatus.ALL)); + } + return builder.build(); + } + private void checkSpill(HsSpillingInfoProvider spillingInfoProvider, Decision.Builder builder) { if (spillingInfoProvider.getNumTotalUnSpillBuffers() < numBuffersTriggerSpilling) { // In case situation changed since onBufferFinished() returns Optional#empty() diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java index 3f173e50f2a..dcd53393bdf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java @@ -102,4 +102,24 @@ public class HsSelectiveSpillingStrategy implements HsSpillingStrategy { }); return builder.build(); } + + @Override + public Decision onResultPartitionClosed(HsSpillingInfoProvider spillingInfoProvider) { + Decision.Builder builder = Decision.builder(); + for (int subpartitionId = 0; + subpartitionId < spillingInfoProvider.getNumSubpartitions(); + subpartitionId++) { + builder.addBufferToSpill( + subpartitionId, + // get all not start spilling buffers. + spillingInfoProvider.getBuffersInOrder( + subpartitionId, SpillStatus.NOT_SPILL, ConsumeStatus.ALL)) + .addBufferToRelease( + subpartitionId, + // get all not released buffers. + spillingInfoProvider.getBuffersInOrder( + subpartitionId, SpillStatus.ALL, ConsumeStatus.ALL)); + } + return builder.build(); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java index 4525b8fd442..fb4a5ab58d5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java @@ -71,6 +71,15 @@ public interface HsSpillingStrategy { */ Decision decideActionWithGlobalInfo(HsSpillingInfoProvider spillingInfoProvider); + /** + * Make a decision when result partition is closed. Because this method will directly touch the + * {@link HsSpillingInfoProvider}, the caller should take care of the thread safety. + * + * @param spillingInfoProvider that provides information about the current status. + * @return A {@link Decision} based on the global information. + */ + Decision onResultPartitionClosed(HsSpillingInfoProvider spillingInfoProvider); + /** * This class represents the spill and release decision made by {@link HsSpillingStrategy}, in * other words, which data is to be spilled and which data is to be released. @@ -143,6 +152,12 @@ public interface HsSpillingStrategy { return this; } + public Builder addBufferToRelease( + int subpartitionId, Deque<BufferIndexAndChannel> buffers) { + bufferToRelease.computeIfAbsent(subpartitionId, ArrayList::new).addAll(buffers); + return this; + } + public Decision build() { return new Decision(bufferToSpill, bufferToRelease); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java index b8d1289efb8..44701cbab76 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java @@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -182,4 +183,36 @@ class HsFullSpillingStrategyTest { .extractingByKey(subpartitionId) .satisfies((buffers) -> assertThat(buffers).hasSizeGreaterThan(numReleaseBuffer)); } + + @Test + void testOnResultPartitionClosed() { + final int subpartition1 = 0; + final int subpartition2 = 1; + + List<BufferIndexAndChannel> subpartitionBuffer1 = + createBufferIndexAndChannelsList(subpartition1, 0, 1, 2, 3); + List<BufferIndexAndChannel> subpartitionBuffer2 = + createBufferIndexAndChannelsList(subpartition2, 0, 1, 2); + TestingSpillingInfoProvider spillInfoProvider = + TestingSpillingInfoProvider.builder() + .setGetNumSubpartitionsSupplier(() -> 2) + .addSubpartitionBuffers(subpartition1, subpartitionBuffer1) + .addSubpartitionBuffers(subpartition2, subpartitionBuffer2) + .addSpillBuffers(subpartition1, Arrays.asList(2, 3)) + .addConsumedBuffers(subpartition1, Collections.singletonList(0)) + .addSpillBuffers(subpartition2, Collections.singletonList(2)) + .build(); + + Decision decision = spillStrategy.onResultPartitionClosed(spillInfoProvider); + + Map<Integer, List<BufferIndexAndChannel>> expectedToSpillBuffers = new HashMap<>(); + expectedToSpillBuffers.put(subpartition1, subpartitionBuffer1.subList(0, 2)); + expectedToSpillBuffers.put(subpartition2, subpartitionBuffer2.subList(0, 2)); + assertThat(decision.getBufferToSpill()).isEqualTo(expectedToSpillBuffers); + + Map<Integer, List<BufferIndexAndChannel>> expectedToReleaseBuffers = new HashMap<>(); + expectedToReleaseBuffers.put(subpartition1, subpartitionBuffer1.subList(0, 4)); + expectedToReleaseBuffers.put(subpartition2, subpartitionBuffer2.subList(0, 3)); + assertThat(decision.getBufferToRelease()).isEqualTo(expectedToReleaseBuffers); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java index 6862c4c6e4a..dfeb7a0f425 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java @@ -133,4 +133,36 @@ class HsSelectiveSpillingStrategyTest { assertThat(globalDecision.getBufferToSpill()).isEqualTo(expectedBuffers); assertThat(globalDecision.getBufferToRelease()).isEqualTo(expectedBuffers); } + + @Test + void testOnResultPartitionClosed() { + final int subpartition1 = 0; + final int subpartition2 = 1; + + List<BufferIndexAndChannel> subpartitionBuffer1 = + createBufferIndexAndChannelsList(subpartition1, 0, 1, 2, 3); + List<BufferIndexAndChannel> subpartitionBuffer2 = + createBufferIndexAndChannelsList(subpartition2, 0, 1, 2); + TestingSpillingInfoProvider spillInfoProvider = + TestingSpillingInfoProvider.builder() + .setGetNumSubpartitionsSupplier(() -> 2) + .addSubpartitionBuffers(subpartition1, subpartitionBuffer1) + .addSubpartitionBuffers(subpartition2, subpartitionBuffer2) + .addSpillBuffers(subpartition1, Arrays.asList(2, 3)) + .addConsumedBuffers(subpartition1, Collections.singletonList(0)) + .addSpillBuffers(subpartition2, Collections.singletonList(2)) + .build(); + + Decision decision = spillStrategy.onResultPartitionClosed(spillInfoProvider); + + Map<Integer, List<BufferIndexAndChannel>> expectedToSpillBuffers = new HashMap<>(); + expectedToSpillBuffers.put(subpartition1, subpartitionBuffer1.subList(0, 2)); + expectedToSpillBuffers.put(subpartition2, subpartitionBuffer2.subList(0, 2)); + assertThat(decision.getBufferToSpill()).isEqualTo(expectedToSpillBuffers); + + Map<Integer, List<BufferIndexAndChannel>> expectedToReleaseBuffers = new HashMap<>(); + expectedToReleaseBuffers.put(subpartition1, subpartitionBuffer1.subList(0, 4)); + expectedToReleaseBuffers.put(subpartition2, subpartitionBuffer2.subList(0, 3)); + assertThat(decision.getBufferToRelease()).isEqualTo(expectedToReleaseBuffers); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingStrategy.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingStrategy.java index 4cce53db0a9..61a730fc5c8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingStrategy.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingStrategy.java @@ -32,15 +32,19 @@ public class TestingSpillingStrategy implements HsSpillingStrategy { private final Function<HsSpillingInfoProvider, Decision> decideActionWithGlobalInfoFunction; + private final Function<HsSpillingInfoProvider, Decision> onResultPartitionClosedFunction; + private TestingSpillingStrategy( BiFunction<Integer, Integer, Optional<Decision>> onMemoryUsageChangedFunction, Function<Integer, Optional<Decision>> onBufferFinishedFunction, Function<BufferIndexAndChannel, Optional<Decision>> onBufferConsumedFunction, - Function<HsSpillingInfoProvider, Decision> decideActionWithGlobalInfoFunction) { + Function<HsSpillingInfoProvider, Decision> decideActionWithGlobalInfoFunction, + Function<HsSpillingInfoProvider, Decision> onResultPartitionClosedFunction) { this.onMemoryUsageChangedFunction = onMemoryUsageChangedFunction; this.onBufferFinishedFunction = onBufferFinishedFunction; this.onBufferConsumedFunction = onBufferConsumedFunction; this.decideActionWithGlobalInfoFunction = decideActionWithGlobalInfoFunction; + this.onResultPartitionClosedFunction = onResultPartitionClosedFunction; } @Override @@ -64,6 +68,11 @@ public class TestingSpillingStrategy implements HsSpillingStrategy { return decideActionWithGlobalInfoFunction.apply(spillingInfoProvider); } + @Override + public Decision onResultPartitionClosed(HsSpillingInfoProvider spillingInfoProvider) { + return onResultPartitionClosedFunction.apply(spillingInfoProvider); + } + public static Builder builder() { return new Builder(); } @@ -82,6 +91,9 @@ public class TestingSpillingStrategy implements HsSpillingStrategy { private Function<HsSpillingInfoProvider, Decision> decideActionWithGlobalInfoFunction = (ignore) -> Decision.NO_ACTION; + private Function<HsSpillingInfoProvider, Decision> onResultPartitionClosedFunction = + (ignore) -> Decision.NO_ACTION; + private Builder() {} public Builder setOnMemoryUsageChangedFunction( @@ -108,12 +120,19 @@ public class TestingSpillingStrategy implements HsSpillingStrategy { return this; } + public Builder setOnResultPartitionClosedFunction( + Function<HsSpillingInfoProvider, Decision> onResultPartitionClosedFunction) { + this.onResultPartitionClosedFunction = onResultPartitionClosedFunction; + return this; + } + public TestingSpillingStrategy build() { return new TestingSpillingStrategy( onMemoryUsageChangedFunction, onBufferFinishedFunction, onBufferConsumedFunction, - decideActionWithGlobalInfoFunction); + decideActionWithGlobalInfoFunction, + onResultPartitionClosedFunction); } } }
