1996fanrui commented on code in PR #19723:
URL: https://github.com/apache/flink/pull/19723#discussion_r878825033


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java:
##########
@@ -103,18 +104,21 @@ static ChannelStateWriteRequest buildFutureWriteRequest(
                             checkBufferIsBuffer(buffer);
                             bufferConsumer.accept(writer, buffer);
                         }
-                    } catch (Throwable e) {
+                    } catch (ExecutionException e) {
                         writer.fail(e);
                     }
                 },
                 throwable -> {
-                    List<Buffer> buffers = dataFuture.get();
-                    if (buffers == null || buffers.isEmpty()) {
-                        return;
+                    try {
+                        List<Buffer> buffers = dataFuture.get();
+                        if (buffers == null || buffers.isEmpty()) {
+                            return;
+                        }
+                        CloseableIterator<Buffer> iterator =
+                                CloseableIterator.fromList(buffers, 
Buffer::recycleBuffer);
+                        iterator.close();
+                    } catch (ExecutionException ignored) {

Review Comment:
   Hi @pnowojski .
   
   ## Let me describe the conclusion first: we must add catch here.
   
   ChannelStateFuture.completeExceptionally(e) will be executed when abort 
checkpoint, meanwhile, dataFuture.get() will throw ExecutionException.
   
   If there is no catch here, the action throws an ExecutionException, causing 
the ChannelStateWriteThread to end. ChannelStateWriteThread is shared, and once 
it ends, subsequent CPs cannot continue.
   Related code:
   - `ChannelStateWriteRequestExecutorImpl#run`
   - `ChannelStateWriteRequestExecutorImplTest#testRecordsException`
   
   _**So channelStateFuture.completeExceptionally should only cause this CP to 
fail and should not affect the job. If remove this catch, some ITCases will 
fail too.**_
   
   ## Solution
   Actually, ChannelStateCheckpointWriter#runWithChecks has similar logic to 
avoid the ChannelStateWriteThread to end. When writing data throws Exception, 
execute writer.fail(e), if not IOException, continue to throw.
   
   ChannelStateCheckpointWriter#runWithChecks code:
   ```java
   private void runWithChecks(RunnableWithException r) {
       try {
           checkState(!result.isDone(), "result is already completed", result);
           r.run();
       } catch (Exception e) {
           fail(e);
           if (!findThrowable(e, IOException.class).isPresent()) {
               rethrow(e);
           }
       }
   }
   ```
   
   So, here are two options:
   
   Option A: catch Exception, if not ExecutionException, continue to throw.
   
   ```java
   writer -> {
       try {
           for (Buffer buffer : dataFuture.get()) {
               checkBufferIsBuffer(buffer);
               bufferConsumer.accept(writer, buffer);
           }
       } catch (Exception e) {
           writer.fail(e);
           if (!findThrowable(e, ExecutionException.class).isPresent()) {
               rethrow(e);
           }
       }
   },
   throwable -> {
       try {
           CloseableIterator.fromList(dataFuture.get(), 
Buffer::recycleBuffer).close();
       } catch (Exception e) {
           if (!findThrowable(e, ExecutionException.class).isPresent()) {
               rethrow(e);
           }
       }
   },
   ```
   
   Option B: just catch ExecutionException
   
   ```java
   writer -> {
       try {
           for (Buffer buffer : dataFuture.get()) {
               checkBufferIsBuffer(buffer);
               bufferConsumer.accept(writer, buffer);
           }
       } catch (ExecutionException e) {
           writer.fail(e);
       }
   },
   throwable -> {
       try {
           CloseableIterator.fromList(dataFuture.get(), 
Buffer::recycleBuffer).close();
       } catch (ExecutionException ignored) {
       }
   },
   ```
   
   The difference between the two is that OptionA calls fail(e) when it catches 
an exception other than ExecutionException . But I think these exceptions 
already call fail(e) in ChannelStateCheckpointWriter#runWithChecks , so it 
doesn't need to be considered here, so I choose OptionB in this PR. 
   
   ## Improved Solution
   
   Or OptionB should also have some improvements, if 
`bufferConsumer.accept(writer, buffer);` throws ExecutionException because 
`ChannelStateCheckpointWriter#runWithChecks` throws ExecutionException, 
writer.fail(e) is called twice. So here try catch should only contain 
dataFuture.get().
   
   OptionC:
   ```java
   writer -> {
       List<Buffer> buffers;
       try {
           buffers = dataFuture.get();
       } catch (ExecutionException e) {
           writer.fail(e);
           return;
       }
       for (Buffer buffer : buffers) {
           checkBufferIsBuffer(buffer);
           bufferConsumer.accept(writer, buffer);
       }
   },
   throwable -> {
       try {
           CloseableIterator.fromList(dataFuture.get(), 
Buffer::recycleBuffer).close();
       } catch (ExecutionException ignored) {
       }
   },
   ```
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to