This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9331e7502e9e8bda10f149e03280e26d5344e471
Author: Weijie Guo <[email protected]>
AuthorDate: Mon Jul 25 13:40:59 2022 +0800

    [FLINK-27908] HsBufferContext ignore repeatedly startSpilling and release 
instead of checkState.
---
 .../network/partition/hybrid/HsBufferContext.java  | 22 +++++++++++----
 .../hybrid/HsSubpartitionMemoryDataManager.java    | 33 ++++++++++++++--------
 .../partition/hybrid/HsBufferContextTest.java      | 17 +++++------
 3 files changed, 44 insertions(+), 28 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContext.java
index 8feb6fd0c41..4a5f5fd9651 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContext.java
@@ -97,24 +97,34 @@ public class HsBufferContext {
         return Optional.ofNullable(spilledFuture);
     }
 
+    /** Mark buffer status to release. */
     public void release() {
-        checkState(!released, "Release buffer repeatedly is unexpected.");
+        if (isReleased()) {
+            return;
+        }
         released = true;
         // decrease ref count when buffer is released from memory.
         buffer.recycleBuffer();
     }
 
-    public void startSpilling(CompletableFuture<Void> spilledFuture) {
-        checkState(!released, "Buffer is already released.");
-        checkState(
-                !spillStarted && this.spilledFuture == null,
-                "Spill buffer repeatedly is unexpected.");
+    /**
+     * Mark buffer status to startSpilling.
+     *
+     * @param spilledFuture completable future of this buffer's spilling 
operation.
+     * @return false, if spilling of the buffer has been started before or the 
buffer has been
+     *     released already; true, otherwise.
+     */
+    public boolean startSpilling(CompletableFuture<Void> spilledFuture) {
+        if (isReleased() || isSpillStarted()) {
+            return false;
+        }
         spillStarted = true;
         this.spilledFuture = spilledFuture;
         // increase ref count when buffer is decided to spill.
         buffer.retainBuffer();
         // decrease ref count when buffer spilling is finished.
         spilledFuture.thenRun(buffer::recycleBuffer);
+        return true;
     }
 
     public void consumed() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
index 56814024911..8084a8883f3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
@@ -214,14 +214,16 @@ public class HsSubpartitionMemoryDataManager {
                                 .map(
                                         indexAndChannel -> {
                                             int bufferIndex = 
indexAndChannel.getBufferIndex();
-                                            HsBufferContext bufferContext =
-                                                    startSpillingBuffer(
-                                                            bufferIndex, 
spillDoneFuture);
-                                            return new BufferWithIdentity(
-                                                    bufferContext.getBuffer(),
-                                                    bufferIndex,
-                                                    targetChannel);
+                                            return 
startSpillingBuffer(bufferIndex, spillDoneFuture)
+                                                    .map(
+                                                            (context) ->
+                                                                    new 
BufferWithIdentity(
+                                                                            
context.getBuffer(),
+                                                                            
bufferIndex,
+                                                                            
targetChannel));
                                         })
+                                .filter(Optional::isPresent)
+                                .map(Optional::get)
                                 .collect(Collectors.toList()));
     }
 
@@ -385,18 +387,25 @@ public class HsSubpartitionMemoryDataManager {
 
     @GuardedBy("subpartitionLock")
     private void releaseBuffer(int bufferIndex) {
-        HsBufferContext bufferContext = 
checkNotNull(bufferIndexToContexts.remove(bufferIndex));
+        HsBufferContext bufferContext = 
bufferIndexToContexts.remove(bufferIndex);
+        if (bufferContext == null) {
+            return;
+        }
         bufferContext.release();
         // remove released buffers from head lazy.
         trimHeadingReleasedBuffers(allBuffers);
     }
 
     @GuardedBy("subpartitionLock")
-    private HsBufferContext startSpillingBuffer(
+    private Optional<HsBufferContext> startSpillingBuffer(
             int bufferIndex, CompletableFuture<Void> spillFuture) {
-        HsBufferContext bufferContext = 
checkNotNull(bufferIndexToContexts.get(bufferIndex));
-        bufferContext.startSpilling(spillFuture);
-        return bufferContext;
+        HsBufferContext bufferContext = bufferIndexToContexts.get(bufferIndex);
+        if (bufferContext == null) {
+            return Optional.empty();
+        }
+        return bufferContext.startSpilling(spillFuture)
+                ? Optional.of(bufferContext)
+                : Optional.empty();
     }
 
     @GuardedBy("subpartitionLock")
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContextTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContextTest.java
index a16b811b68f..960357d4127 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContextTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContextTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture;
 
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createBuffer;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link HsBufferContext}. */
@@ -57,10 +58,8 @@ class HsBufferContextTest {
 
     @Test
     void testBufferStartSpillingRepeatedly() {
-        bufferContext.startSpilling(new CompletableFuture<>());
-        assertThatThrownBy(() -> bufferContext.startSpilling(new 
CompletableFuture<>()))
-                .isInstanceOf(IllegalStateException.class)
-                .hasMessageContaining("Spill buffer repeatedly is 
unexpected.");
+        assertThat(bufferContext.startSpilling(new 
CompletableFuture<>())).isTrue();
+        assertThat(bufferContext.startSpilling(new 
CompletableFuture<>())).isFalse();
     }
 
     @Test
@@ -75,9 +74,9 @@ class HsBufferContextTest {
     @Test
     void testBufferReleaseRepeatedly() {
         bufferContext.release();
-        assertThatThrownBy(() -> bufferContext.release())
-                .isInstanceOf(IllegalStateException.class)
-                .hasMessageContaining("Release buffer repeatedly is 
unexpected.");
+        assertThatNoException()
+                .as("repeatedly release should only recycle buffer once.")
+                .isThrownBy(() -> bufferContext.release());
     }
 
     @Test
@@ -99,9 +98,7 @@ class HsBufferContextTest {
     @Test
     void testBufferStartSpillOrConsumedAfterReleased() {
         bufferContext.release();
-        assertThatThrownBy(() -> bufferContext.startSpilling(new 
CompletableFuture<>()))
-                .isInstanceOf(IllegalStateException.class)
-                .hasMessageContaining("Buffer is already released.");
+        assertThat(bufferContext.startSpilling(new 
CompletableFuture<>())).isFalse();
         assertThatThrownBy(() -> bufferContext.consumed())
                 .isInstanceOf(IllegalStateException.class)
                 .hasMessageContaining("Buffer is already released.");

Reply via email to