Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6081#discussion_r195125463
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
---
@@ -806,6 +811,44 @@ public void testOperatorClosingBeforeStopRunning()
throws Throwable {
}
}
+ /**
+ * Test set user code ClassLoader before calling ProcessingTimeCallback.
+ */
+ @Test
+ public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws
Throwable {
+ syncLatch = new OneShotLatch();
+
+ try (MockEnvironment mockEnvironment =
+ new MockEnvironmentBuilder()
+ .setUserCodeClassLoader(new
TestUserCodeClassLoader())
+ .build()) {
+ TimeServiceTask timerServiceTask = new
TimeServiceTask(mockEnvironment);
+
+ final AtomicReference<Throwable> atomicThrowable = new
AtomicReference<>(null);
+
+ CompletableFuture<Void> invokeFuture =
CompletableFuture.runAsync(
+ () -> {
+ try {
+ timerServiceTask.invoke();
+ } catch (Exception e) {
+ atomicThrowable.set(e);
--- End diff --
you can fail this with a `CompletionException` instead, then we don't need
the atomic reference and will fail at `invokeFuture.get`
---