reswqa commented on code in PR #23781:
URL: https://github.com/apache/flink/pull/23781#discussion_r1402971255
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java:
##########
@@ -302,18 +299,23 @@ private void triggerScheduling() {
<= maxRequestedBuffers
&& numRequestedBuffers <
bufferPool.getAverageBuffersPerRequester()) {
isRunning = true;
- ioExecutor.execute(
- () -> {
- try {
- run();
- } catch (Throwable throwable) {
- LOG.error("Failed to read data.", throwable);
- // handle un-expected exception as
unhandledExceptionHandler is not
- // worked for ScheduledExecutorService.
-
FatalExitExceptionHandler.INSTANCE.uncaughtException(
- Thread.currentThread(), throwable);
- }
- });
+ try {
+ ioExecutor.execute(
+ () -> {
+ try {
+ run();
+ } catch (Throwable throwable) {
+ LOG.error("Failed to read data.",
throwable);
+ // handle un-expected exception as
unhandledExceptionHandler is
+ // not
+ // worked for ScheduledExecutorService.
Review Comment:
Why this is a new line.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/DiskIOSchedulerTest.java:
##########
@@ -257,6 +265,25 @@ public void go() throws Exception {
assertThat(waitFuture2).isDone();
}
+ /**
+ * The executor service for batch shuffle read shouldn't throw {@link
+ * RejectedExecutionException} if the worker thread is attempting to add
new jobs when the
+ * service is already shutdown. This behavior is important to ensure
graceful handling of
+ * scenarios where worker threads couldn't be immediately aware of the
TaskManager's shutdown
+ * status.
+ */
+ @Test
+ void testRejectedExecutionIsIgnoredOnShutdown() {
+ TestingNettyConnectionWriter nettyConnectionWriter =
+ new TestingNettyConnectionWriter.Builder().build();
+ diskIOScheduler.connectionEstablished(DEFAULT_SUBPARTITION_ID,
nettyConnectionWriter);
+ assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1);
+ assertThatNoException().isThrownBy(() -> ioExecutor.trigger());
+ assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1);
+ ioExecutor.shutdown();
+ assertThatNoException().isThrownBy(() -> ioExecutor.trigger());
Review Comment:
Have you checked that there must be a fatal error here before this PR?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]