1996fanrui commented on code in PR #19723:
URL: https://github.com/apache/flink/pull/19723#discussion_r878825033
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java:
##########
@@ -103,18 +104,21 @@ static ChannelStateWriteRequest buildFutureWriteRequest(
checkBufferIsBuffer(buffer);
bufferConsumer.accept(writer, buffer);
}
- } catch (Throwable e) {
+ } catch (ExecutionException e) {
writer.fail(e);
}
},
throwable -> {
- List<Buffer> buffers = dataFuture.get();
- if (buffers == null || buffers.isEmpty()) {
- return;
+ try {
+ List<Buffer> buffers = dataFuture.get();
+ if (buffers == null || buffers.isEmpty()) {
+ return;
+ }
+ CloseableIterator<Buffer> iterator =
+ CloseableIterator.fromList(buffers,
Buffer::recycleBuffer);
+ iterator.close();
+ } catch (ExecutionException ignored) {
Review Comment:
Hi @pnowojski .
## Let me describe the conclusion first: we must add catch here.
ChannelStateFuture.completeExceptionally(e) will be executed when abort
checkpoint, meanwhile, dataFuture.get() will throw ExecutionException.
If there is no catch here, the action throws an ExecutionException, causing
the ChannelStateWriteThread to end. ChannelStateWriteThread is shared, and once
it ends, subsequent CPs cannot continue.
Related code:
- `ChannelStateWriteRequestExecutorImpl#run`
- `ChannelStateWriteRequestExecutorImplTest#testRecordsException`
_**So channelStateFuture.completeExceptionally should only cause this CP to
fail and should not affect the job. If remove this catch, some ITCases will
fail too.**_
## Solution
Actually, ChannelStateCheckpointWriter#runWithChecks has similar logic to
avoid the ChannelStateWriteThread to end. When writing data throws Exception,
execute writer.fail(e), if not IOException, continue to throw.
ChannelStateCheckpointWriter#runWithChecks code:
```java
private void runWithChecks(RunnableWithException r) {
try {
checkState(!result.isDone(), "result is already completed", result);
r.run();
} catch (Exception e) {
fail(e);
if (!findThrowable(e, IOException.class).isPresent()) {
rethrow(e);
}
}
}
```
So, here are two options:
Option A: catch Exception, if not ExecutionException, continue to throw.
```java
writer -> {
try {
for (Buffer buffer : dataFuture.get()) {
checkBufferIsBuffer(buffer);
bufferConsumer.accept(writer, buffer);
}
} catch (Exception e) {
writer.fail(e);
if (!findThrowable(e, ExecutionException.class).isPresent()) {
rethrow(e);
}
}
},
throwable -> {
try {
CloseableIterator.fromList(dataFuture.get(),
Buffer::recycleBuffer).close();
} catch (Exception e) {
if (!findThrowable(e, ExecutionException.class).isPresent()) {
rethrow(e);
}
}
},
```
Option B: just catch ExecutionException
```java
writer -> {
try {
for (Buffer buffer : dataFuture.get()) {
checkBufferIsBuffer(buffer);
bufferConsumer.accept(writer, buffer);
}
} catch (ExecutionException e) {
writer.fail(e);
}
},
throwable -> {
try {
CloseableIterator.fromList(dataFuture.get(),
Buffer::recycleBuffer).close();
} catch (ExecutionException ignored) {
}
},
```
The difference between the two is that OptionA calls fail(e) when it catches
an exception other than ExecutionException . But I think these exceptions
already call fail(e) in ChannelStateCheckpointWriter#runWithChecks , so it
doesn't need to be considered here, so I choose OptionB in this PR.
## Improved Solution
Or OptionB should also have some improvements, if
`bufferConsumer.accept(writer, buffer);` throws ExecutionException because
`ChannelStateCheckpointWriter#runWithChecks` throws ExecutionException,
writer.fail(e) is called twice. So here try catch should only contain
dataFuture.get().
OptionC:
```java
writer -> {
List<Buffer> buffers;
try {
buffers = dataFuture.get();
} catch (ExecutionException e) {
writer.fail(e);
return;
}
for (Buffer buffer : buffers) {
checkBufferIsBuffer(buffer);
bufferConsumer.accept(writer, buffer);
}
},
throwable -> {
try {
CloseableIterator.fromList(dataFuture.get(),
Buffer::recycleBuffer).close();
} catch (ExecutionException ignored) {
}
},
```
What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]