Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6081#discussion_r195123173
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
---
@@ -806,6 +811,44 @@ public void testOperatorClosingBeforeStopRunning()
throws Throwable {
}
}
+ /**
+ * Test set user code ClassLoader before calling ProcessingTimeCallback.
+ */
+ @Test
+ public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws
Throwable {
+ syncLatch = new OneShotLatch();
+
+ try (MockEnvironment mockEnvironment =
+ new MockEnvironmentBuilder()
+ .setUserCodeClassLoader(new
TestUserCodeClassLoader())
+ .build()) {
+ TimeServiceTask timerServiceTask = new
TimeServiceTask(mockEnvironment);
+
+ final AtomicReference<Throwable> atomicThrowable = new
AtomicReference<>(null);
+
+ CompletableFuture<Void> invokeFuture =
CompletableFuture.runAsync(
+ () -> {
+ try {
+ timerServiceTask.invoke();
+ } catch (Exception e) {
+ atomicThrowable.set(e);
+ }
+ },
+ TestingUtils.defaultExecutor());
+
+ // wait until the invoke is complete
+ invokeFuture.get();
+
+ assertThat(timerServiceTask.getClassLoaders(),
hasSize(greaterThanOrEqualTo(1)));
+ assertThat(timerServiceTask.getClassLoaders(),
everyItem(instanceOf(TestUserCodeClassLoader.class)));
+
+ // check if an exception occurred
--- End diff --
tiny nits: you could drop this (and the one above) comment. The code is
pretty self explanatory on it's own.
---