a-siuniaev commented on a change in pull request #9767: 
[FLINK-14120][API/Datastream] Fix testImmediateShutdown
URL: https://github.com/apache/flink/pull/9767#discussion_r328582079
 
 

 ##########
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
 ##########
 @@ -154,27 +152,25 @@ public void testImmediateShutdown() throws Exception {
                        // the task should trigger immediately and should block 
until terminated with interruption
                        timer.registerTimer(System.currentTimeMillis(), 
timestamp -> {
                                synchronized (lock) {
-                                       latch.trigger();
-                                       Thread.sleep(100000000);
+                                       try {
+                                               latch.trigger();
+                                               Thread.sleep(100000000);
+                                               fail("should be interrupted");
+                                       } catch (InterruptedException ignored) {
+                                       }
                                }
                        });
 
                        latch.await();
                        timer.shutdownService();
+
+                       // can only enter this scope after the task is 
interrupted
                        synchronized (lock) {
                                assertTrue(timer.isTerminated());
-
-                               // The shutdownService() may not necessary wait 
for active tasks to finish properly.
-                               // From the ScheduledThreadPoolExecutor Java 
docs:
-                               //      There are no guarantees beyond 
best-effort attempts to stop processing actively executing tasks.
-                               //      This implementation cancels tasks via 
{@link Thread#interrupt}, so any task that
-                               //      fails to respond to interrupts may 
never terminate.
-                               assertThat(errorRef.get(), 
is(anyOf(nullValue(), instanceOf(InterruptedException.class))));
 
 Review comment:
   Well, the problem is related to `synchronized` blocks: as soon as the sleep 
is interrupted, the monitor in the `synchronized` block in the task (line 154) 
is released **immediately**  allowing the main thread enter it's `synchronized` 
block (line 168) not waiting for exception being handled inside the pool 
thread. So sometimes when the main thread reaches assertion on line 172, the 
pool thread is still not finished with putting `InterruptedException` inside 
`errorRef`. Thus `is(anyOf...` assertion is needed.
    
   The main problem is that sometimes (very rarely) the exception handling 
inside the pool thread is so slow, that `errorRef` gets updated with 
`InterruptedException` **after** line 176 (`errorRef.set(null);`) and thus 
making the assertion on line 196 fail. 
   
   My changes ensure that the exception will be handled inside the task in the 
`synchronized` block thus releasing the monitor only after exception handling 
is finished. In this case the `synchronized` blocks will work as expected (as I 
see it) and the two threads will be really synchronized. This means that 
`errorRef` will always contain null. That's why I removed  `is(anyOf...` 
assertion here.
   
   Another possible solution is to leave everything as it was and only change 
the following assertions (on line 196 and the next one) to `is(anyOf...` type. 
But in this case I don't really understand what the intention of 'synchronized` 
blocks is - because they doesn't really help with thread synchronization... 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to