This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9331e7502e9e8bda10f149e03280e26d5344e471 Author: Weijie Guo <[email protected]> AuthorDate: Mon Jul 25 13:40:59 2022 +0800 [FLINK-27908] HsBufferContext ignore repeatedly startSpilling and release instead of checkState. --- .../network/partition/hybrid/HsBufferContext.java | 22 +++++++++++---- .../hybrid/HsSubpartitionMemoryDataManager.java | 33 ++++++++++++++-------- .../partition/hybrid/HsBufferContextTest.java | 17 +++++------ 3 files changed, 44 insertions(+), 28 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContext.java index 8feb6fd0c41..4a5f5fd9651 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContext.java @@ -97,24 +97,34 @@ public class HsBufferContext { return Optional.ofNullable(spilledFuture); } + /** Mark buffer status to release. */ public void release() { - checkState(!released, "Release buffer repeatedly is unexpected."); + if (isReleased()) { + return; + } released = true; // decrease ref count when buffer is released from memory. buffer.recycleBuffer(); } - public void startSpilling(CompletableFuture<Void> spilledFuture) { - checkState(!released, "Buffer is already released."); - checkState( - !spillStarted && this.spilledFuture == null, - "Spill buffer repeatedly is unexpected."); + /** + * Mark buffer status to startSpilling. + * + * @param spilledFuture completable future of this buffer's spilling operation. + * @return false, if spilling of the buffer has been started before or the buffer has been + * released already; true, otherwise. + */ + public boolean startSpilling(CompletableFuture<Void> spilledFuture) { + if (isReleased() || isSpillStarted()) { + return false; + } spillStarted = true; this.spilledFuture = spilledFuture; // increase ref count when buffer is decided to spill. buffer.retainBuffer(); // decrease ref count when buffer spilling is finished. spilledFuture.thenRun(buffer::recycleBuffer); + return true; } public void consumed() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java index 56814024911..8084a8883f3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java @@ -214,14 +214,16 @@ public class HsSubpartitionMemoryDataManager { .map( indexAndChannel -> { int bufferIndex = indexAndChannel.getBufferIndex(); - HsBufferContext bufferContext = - startSpillingBuffer( - bufferIndex, spillDoneFuture); - return new BufferWithIdentity( - bufferContext.getBuffer(), - bufferIndex, - targetChannel); + return startSpillingBuffer(bufferIndex, spillDoneFuture) + .map( + (context) -> + new BufferWithIdentity( + context.getBuffer(), + bufferIndex, + targetChannel)); }) + .filter(Optional::isPresent) + .map(Optional::get) .collect(Collectors.toList())); } @@ -385,18 +387,25 @@ public class HsSubpartitionMemoryDataManager { @GuardedBy("subpartitionLock") private void releaseBuffer(int bufferIndex) { - HsBufferContext bufferContext = checkNotNull(bufferIndexToContexts.remove(bufferIndex)); + HsBufferContext bufferContext = bufferIndexToContexts.remove(bufferIndex); + if (bufferContext == null) { + return; + } bufferContext.release(); // remove released buffers from head lazy. trimHeadingReleasedBuffers(allBuffers); } @GuardedBy("subpartitionLock") - private HsBufferContext startSpillingBuffer( + private Optional<HsBufferContext> startSpillingBuffer( int bufferIndex, CompletableFuture<Void> spillFuture) { - HsBufferContext bufferContext = checkNotNull(bufferIndexToContexts.get(bufferIndex)); - bufferContext.startSpilling(spillFuture); - return bufferContext; + HsBufferContext bufferContext = bufferIndexToContexts.get(bufferIndex); + if (bufferContext == null) { + return Optional.empty(); + } + return bufferContext.startSpilling(spillFuture) + ? Optional.of(bufferContext) + : Optional.empty(); } @GuardedBy("subpartitionLock") diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContextTest.java index a16b811b68f..960357d4127 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContextTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContextTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture; import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createBuffer; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link HsBufferContext}. */ @@ -57,10 +58,8 @@ class HsBufferContextTest { @Test void testBufferStartSpillingRepeatedly() { - bufferContext.startSpilling(new CompletableFuture<>()); - assertThatThrownBy(() -> bufferContext.startSpilling(new CompletableFuture<>())) - .isInstanceOf(IllegalStateException.class) - .hasMessageContaining("Spill buffer repeatedly is unexpected."); + assertThat(bufferContext.startSpilling(new CompletableFuture<>())).isTrue(); + assertThat(bufferContext.startSpilling(new CompletableFuture<>())).isFalse(); } @Test @@ -75,9 +74,9 @@ class HsBufferContextTest { @Test void testBufferReleaseRepeatedly() { bufferContext.release(); - assertThatThrownBy(() -> bufferContext.release()) - .isInstanceOf(IllegalStateException.class) - .hasMessageContaining("Release buffer repeatedly is unexpected."); + assertThatNoException() + .as("repeatedly release should only recycle buffer once.") + .isThrownBy(() -> bufferContext.release()); } @Test @@ -99,9 +98,7 @@ class HsBufferContextTest { @Test void testBufferStartSpillOrConsumedAfterReleased() { bufferContext.release(); - assertThatThrownBy(() -> bufferContext.startSpilling(new CompletableFuture<>())) - .isInstanceOf(IllegalStateException.class) - .hasMessageContaining("Buffer is already released."); + assertThat(bufferContext.startSpilling(new CompletableFuture<>())).isFalse(); assertThatThrownBy(() -> bufferContext.consumed()) .isInstanceOf(IllegalStateException.class) .hasMessageContaining("Buffer is already released.");
