SmirAlex commented on code in PR #20919:
URL: https://github.com/apache/flink/pull/20919#discussion_r1089061320
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/FullCacheTestInputFormat.java:
##########
@@ -41,18 +41,22 @@
import static org.assertj.core.api.Assertions.assertThat;
-/** TestInputFormat that reads data from (2 + delta) splits which share the
same {@code queue}. */
+/**
+ * TestInputFormat that reads data from (initNum + delta) splits which share
the same {@code queue}.
Review Comment:
Yes, you are right. Changed the JavaDoc
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -123,46 +127,48 @@ void testExceptionDuringReload() throws Exception {
() -> {
throw exception;
};
- InputFormatCacheLoader cacheLoader = createCacheLoader(0,
reloadAction);
- InterceptingCacheMetricGroup metricGroup = new
InterceptingCacheMetricGroup();
- cacheLoader.open(metricGroup);
- assertThatThrownBy(cacheLoader::run).hasRootCause(exception);
- assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1);
+ try (InputFormatCacheLoader cacheLoader =
+ createCacheLoader(DEFAULT_NUM_SPLITS,
DEFAULT_DELTA_NUM_SPLITS, reloadAction)) {
+ InterceptingCacheMetricGroup metricGroup = new
InterceptingCacheMetricGroup();
+ cacheLoader.initializeMetrics(metricGroup);
+ assertThatThrownBy(() -> run(cacheLoader)).hasRootCause(exception);
+ assertThat(metricGroup.loadCounter.getCount()).isEqualTo(0);
+
assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1);
+ }
}
- @Test
- void testCloseAndInterruptDuringReload() throws Exception {
- AtomicInteger sleepCounter = new AtomicInteger(0);
- int totalSleepCount = TestCacheLoader.DATA.size() + 1; // equals to
number of all rows
+ /**
+ * Cache loader creates additional threads in case of multiple input
splits. In both cases cache
+ * loader must correctly react on close and interrupt all threads.
+ */
+ @ParameterizedTest
+ @MethodSource("numSplits")
+ void testCloseDuringReload(int numSplits) throws Exception {
+ OneShotLatch reloadLatch = new OneShotLatch();
Runnable reloadAction =
- ThrowingRunnable.unchecked(
- () -> {
- sleepCounter.incrementAndGet();
- Thread.sleep(1000);
- });
- InputFormatCacheLoader cacheLoader = createCacheLoader(0,
reloadAction);
+ () -> {
+ reloadLatch.trigger();
+ assertThatThrownBy(() -> new OneShotLatch().await())
+ .as("Wait should be interrupted if everything
works ok")
+ .isInstanceOf(InterruptedException.class);
+ Thread.currentThread().interrupt(); // restore interrupted
status
+ };
InterceptingCacheMetricGroup metricGroup = new
InterceptingCacheMetricGroup();
- cacheLoader.open(metricGroup);
-
- // check interruption
- ExecutorService executorService = Executors.newSingleThreadExecutor();
- Future<?> future = executorService.submit(cacheLoader);
- executorService.shutdownNow(); // internally interrupts a thread
- assertThatNoException().isThrownBy(future::get); // wait for the end
- // check that we didn't process all elements, but reacted on
interruption
- assertThat(sleepCounter).hasValueLessThan(totalSleepCount);
+ CompletableFuture<Void> future;
+ try (InputFormatCacheLoader cacheLoader =
+ createCacheLoader(numSplits, DEFAULT_DELTA_NUM_SPLITS,
reloadAction)) {
+ cacheLoader.initializeMetrics(metricGroup);
+ future = cacheLoader.reloadAsync();
+ reloadLatch.await();
+ }
+ // try-with-resources calls #close, which should wait for the end of
reload
+ assertThat(future.isDone()).isTrue();
Review Comment:
Fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]