Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5962#discussion_r188006850
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
---
@@ -504,7 +475,101 @@ public void testShutdownAndWaitPending() {
Assert.fail("Unexpected interruption.");
}
- Assert.assertTrue(check.get());
+ Assert.assertTrue(timerExecutionFinished.get());
+ Assert.assertTrue(timeService.isTerminated());
+ }
+
+ @Test
+ public void testShutdownServiceUninterruptible() {
+ final Object lock = new Object();
+ final OneShotLatch blockUntilTriggered = new OneShotLatch();
+ final AtomicBoolean timerFinished = new AtomicBoolean(false);
+
+ final SystemProcessingTimeService timeService =
+ createBlockingSystemProcessingTimeService(lock,
blockUntilTriggered, timerFinished);
+
+ Assert.assertFalse(timeService.isTerminated());
+
+ final Thread interruptTarget = Thread.currentThread();
+ final AtomicBoolean runInterrupts = new AtomicBoolean(true);
+ final Thread interruptCallerThread = new Thread(() -> {
+ while (runInterrupts.get()) {
+ interruptTarget.interrupt();
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ignore) {
+ }
+ }
+ });
+
+ interruptCallerThread.start();
+
+ final long timeoutMs = 1000L;
--- End diff --
ð
---