WencongLiu commented on code in PR #23781:
URL: https://github.com/apache/flink/pull/23781#discussion_r1403015620


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/DiskIOSchedulerTest.java:
##########
@@ -257,6 +265,25 @@ public void go() throws Exception {
         assertThat(waitFuture2).isDone();
     }
 
+    /**
+     * The executor service for batch shuffle read shouldn't throw {@link
+     * RejectedExecutionException} if the worker thread is attempting to add 
new jobs when the
+     * service is already shutdown. This behavior is important to ensure 
graceful handling of
+     * scenarios where worker threads couldn't be immediately aware of the 
TaskManager's shutdown
+     * status.
+     */
+    @Test
+    void testRejectedExecutionIsIgnoredOnShutdown() {
+        TestingNettyConnectionWriter nettyConnectionWriter =
+                new TestingNettyConnectionWriter.Builder().build();
+        diskIOScheduler.connectionEstablished(DEFAULT_SUBPARTITION_ID, 
nettyConnectionWriter);
+        assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1);
+        assertThatNoException().isThrownBy(() -> ioExecutor.trigger());
+        assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1);
+        ioExecutor.shutdown();
+        assertThatNoException().isThrownBy(() -> ioExecutor.trigger());

Review Comment:
   Good point! 🤔 After the `ManuallyTriggeredScheduledExecutorService` is 
replaced by `TestingScheduledExecutorService` that could throw 
`RejectedExecutionException`, a fatal error will happen before pr's modication 
in this test.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/DiskIOSchedulerTest.java:
##########
@@ -257,6 +265,25 @@ public void go() throws Exception {
         assertThat(waitFuture2).isDone();
     }
 
+    /**
+     * The executor service for batch shuffle read shouldn't throw {@link
+     * RejectedExecutionException} if the worker thread is attempting to add 
new jobs when the
+     * service is already shutdown. This behavior is important to ensure 
graceful handling of
+     * scenarios where worker threads couldn't be immediately aware of the 
TaskManager's shutdown
+     * status.
+     */
+    @Test
+    void testRejectedExecutionIsIgnoredOnShutdown() {
+        TestingNettyConnectionWriter nettyConnectionWriter =
+                new TestingNettyConnectionWriter.Builder().build();
+        diskIOScheduler.connectionEstablished(DEFAULT_SUBPARTITION_ID, 
nettyConnectionWriter);
+        assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1);
+        assertThatNoException().isThrownBy(() -> ioExecutor.trigger());
+        assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1);
+        ioExecutor.shutdown();
+        assertThatNoException().isThrownBy(() -> ioExecutor.trigger());

Review Comment:
   Good point! 🤔 After the `ManuallyTriggeredScheduledExecutorService` is 
replaced by `TestingScheduledExecutorService` that could throw 
`RejectedExecutionException`, a fatal error will happen before pr's modication 
in this test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to