XComp commented on code in PR #20919:
URL: https://github.com/apache/flink/pull/20919#discussion_r1053184018


##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -123,46 +126,47 @@ void testExceptionDuringReload() throws Exception {
                 () -> {
                     throw exception;
                 };
-        InputFormatCacheLoader cacheLoader = createCacheLoader(0, 
reloadAction);
-        InterceptingCacheMetricGroup metricGroup = new 
InterceptingCacheMetricGroup();
-        cacheLoader.open(metricGroup);
-        assertThatThrownBy(cacheLoader::run).hasRootCause(exception);
-        assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1);
+        try (InputFormatCacheLoader cacheLoader =
+                createCacheLoader(DEFAULT_NUM_SPLITS, 
DEFAULT_DELTA_NUM_SPLITS, reloadAction)) {
+            InterceptingCacheMetricGroup metricGroup = new 
InterceptingCacheMetricGroup();
+            cacheLoader.initializeMetrics(metricGroup);
+            assertThatThrownBy(() -> run(cacheLoader)).hasRootCause(exception);
+            
assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1);
+        }
     }
 
-    @Test
-    void testCloseAndInterruptDuringReload() throws Exception {
-        AtomicInteger sleepCounter = new AtomicInteger(0);
-        int totalSleepCount = TestCacheLoader.DATA.size() + 1; // equals to 
number of all rows
+    /**
+     * Cache loader creates additional threads in case of multiple input 
splits. In both cases cache
+     * loader must correctly react on close and interrupt all threads.
+     */
+    @ParameterizedTest
+    @MethodSource("numSplits")
+    void testCloseDuringReload(int numSplits) throws Exception {
+        OneShotLatch reloadLatch = new OneShotLatch();
         Runnable reloadAction =
-                ThrowingRunnable.unchecked(
-                        () -> {
-                            sleepCounter.incrementAndGet();
-                            Thread.sleep(1000);
-                        });
-        InputFormatCacheLoader cacheLoader = createCacheLoader(0, 
reloadAction);
+                () -> {
+                    reloadLatch.trigger();
+                    assertThatThrownBy(() -> new OneShotLatch().await())
+                            .as("Wait should be interrupted if everything 
works ok")
+                            .isInstanceOf(InterruptedException.class);
+                    Thread.currentThread().interrupt(); // restore interrupted 
status
+                };
         InterceptingCacheMetricGroup metricGroup = new 
InterceptingCacheMetricGroup();
-        cacheLoader.open(metricGroup);
-
-        // check interruption
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
-        Future<?> future = executorService.submit(cacheLoader);
-        executorService.shutdownNow(); // internally interrupts a thread
-        assertThatNoException().isThrownBy(future::get); // wait for the end
-        // check that we didn't process all elements, but reacted on 
interruption
-        assertThat(sleepCounter).hasValueLessThan(totalSleepCount);
+        CompletableFuture<Void> future;
+        try (InputFormatCacheLoader cacheLoader =
+                createCacheLoader(numSplits, DEFAULT_DELTA_NUM_SPLITS, 
reloadAction)) {
+            cacheLoader.initializeMetrics(metricGroup);
+            future = cacheLoader.reloadAsync();
+            reloadLatch.await(5000, TimeUnit.MILLISECONDS);

Review Comment:
   ```suggestion
               reloadLatch.await();
   ```
   The Flink community agreed on not using timeouts in unit tests (see 
[docs](https://flink.apache.org/contributing/code-style-and-quality-common.html#avoid-timeouts-in-junit-tests)).
 This enables us to retrieve the thread dump at the end of the test suite if a 
test actually times out. Therefore, removing the timeout here should be fine. 
:-)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to