This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0912765acc2 [FLINK-28077][checkpoint] Allow buffers to be discarded 
asynchronously
0912765acc2 is described below

commit 0912765acc25f179169f8e371683acaf3e9133a2
Author: 1996fanrui <[email protected]>
AuthorDate: Tue Jun 21 20:23:47 2022 +0800

    [FLINK-28077][checkpoint] Allow buffers to be discarded asynchronously
    
    The dataFuture may never be completed, so waiting for that might cause the 
task to get stuck.
    
    We now chain the cleanup to the dataFuture instead; if it is complete it 
will run immediately, otherwise it _may_ run at a later point if the future 
ever completes.
    The latter is a safeguard against potential buffer leaks.
---
 .../channel/ChannelStateWriteRequest.java          | 24 ++++++++++++-----
 .../ChannelStateWriteRequestExecutorImplTest.java  | 30 +++++++++++++++++++---
 2 files changed, 45 insertions(+), 9 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
index 91a96a1d886..4b706045d0b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
@@ -24,6 +24,9 @@ import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.ThrowingConsumer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -40,6 +43,9 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 interface ChannelStateWriteRequest {
+
+    Logger LOG = LoggerFactory.getLogger(ChannelStateWriteRequest.class);
+
     long getCheckpointId();
 
     void cancel(Throwable cause) throws Exception;
@@ -108,12 +114,18 @@ interface ChannelStateWriteRequest {
                         bufferConsumer.accept(writer, buffer);
                     }
                 },
-                throwable -> {
-                    try {
-                        CloseableIterator.fromList(dataFuture.get(), 
Buffer::recycleBuffer).close();
-                    } catch (ExecutionException ignored) {
-                    }
-                },
+                throwable ->
+                        dataFuture.thenAccept(
+                                buffers -> {
+                                    try {
+                                        CloseableIterator.fromList(buffers, 
Buffer::recycleBuffer)
+                                                .close();
+                                    } catch (Exception e) {
+                                        LOG.error(
+                                                "Failed to recycle the output 
buffer of channel state.",
+                                                e);
+                                    }
+                                }),
                 false);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java
index db0dcffcd7b..1d199985d96 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.checkpoint.channel;
 
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.function.BiConsumerWithException;
 
@@ -26,9 +27,11 @@ import javax.annotation.Nonnull;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingDeque;
 
 import static 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcher.NO_OP;
+import static 
org.apache.flink.runtime.state.ChannelPersistenceITCase.getStreamFactoryFactory;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -129,11 +132,32 @@ public class ChannelStateWriteRequestExecutorImplTest {
     }
 
     @Test
-    public void testCanBeClosed() throws IOException {
-        TestRequestDispatcher requestProcessor = new TestRequestDispatcher();
+    public void testCanBeClosed() throws Exception {
+        long checkpointId = 1L;
+        ChannelStateWriteRequestDispatcher processor =
+                new ChannelStateWriteRequestDispatcherImpl(
+                        "dummy task",
+                        0,
+                        getStreamFactoryFactory(),
+                        new ChannelStateSerializerImpl());
         try (ChannelStateWriteRequestExecutorImpl worker =
-                new ChannelStateWriteRequestExecutorImpl(TASK_NAME, 
requestProcessor)) {
+                new ChannelStateWriteRequestExecutorImpl(TASK_NAME, 
processor)) {
             worker.start();
+            worker.submit(
+                    new CheckpointStartRequest(
+                            checkpointId,
+                            new ChannelStateWriter.ChannelStateWriteResult(),
+                            CheckpointStorageLocationReference.getDefault()));
+            worker.submit(
+                    ChannelStateWriteRequest.write(
+                            checkpointId,
+                            new ResultSubpartitionInfo(0, 0),
+                            new CompletableFuture<>()));
+            worker.submit(
+                    ChannelStateWriteRequest.write(
+                            checkpointId,
+                            new ResultSubpartitionInfo(0, 0),
+                            new CompletableFuture<>()));
         }
     }
 

Reply via email to