XComp commented on code in PR #20919:
URL: https://github.com/apache/flink/pull/20919#discussion_r1053127950


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/LookupFullCache.java:
##########
@@ -57,7 +57,7 @@ public synchronized void open(CacheMetricGroup metricGroup) {
         }
         metricGroup.hitCounter(hitCounter);
         metricGroup.missCounter(new SimpleCounter()); // always zero
-        cacheLoader.open(metricGroup);
+        cacheLoader.initializeMetrics(metricGroup);
     }
 
     public synchronized void open(Configuration parameters) throws Exception {

Review Comment:
   There's the same naming issue with the `open` methods in `LookupFullCache`. 
It doesn't help the readability if we have the same method name with different 
signature and different purpose. What about calling `open(Configuration)` 
something like `initializeReloadTrigger(Configuration)`. :thinking: 



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/FullCacheTestInputFormat.java:
##########
@@ -86,20 +92,21 @@ public FullCacheTestInputFormat(
         this.projectable = generatedProjection.isPresent();
         this.generatedProjection = generatedProjection.orElse(null);
         this.rowConverter = rowConverter;
-        this.deltaNumSplits = 0;
+        this.numSplits = DEFAULT_NUM_SPLITS;
+        this.deltaNumSplits = DEFAULT_DELTA_NUM_SPLITS;

Review Comment:
   you could call `this(dataRows, generatedProjection, rowConverter, 
DEFAULT_NUM_SPLITS, DEFAULT_DELTA_NUM_SPLITS)` instead of initializing all 
fields here once more.



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -77,43 +77,46 @@ void checkCounter() {
     @ParameterizedTest
     @MethodSource("deltaNumSplits")
     void testReadWithDifferentSplits(int deltaNumSplits) throws Exception {
-        InputFormatCacheLoader cacheLoader = createCacheLoader(deltaNumSplits);
-        cacheLoader.open(UnregisteredMetricsGroup.createCacheMetricGroup());
-        cacheLoader.run();
-        ConcurrentHashMap<RowData, Collection<RowData>> cache = 
cacheLoader.getCache();
-        assertCacheContent(cache);
-        cacheLoader.run();
-        assertThat(cacheLoader.getCache()).isNotSameAs(cache); // new instance 
of cache after reload
-        cacheLoader.close();
-        assertThat(cacheLoader.getCache().size()).isZero(); // cache is 
cleared after close
+        try (InputFormatCacheLoader cacheLoader = 
createCacheLoader(deltaNumSplits)) {
+            
cacheLoader.initializeMetrics(UnregisteredMetricsGroup.createCacheMetricGroup());
+            run(cacheLoader);
+            ConcurrentHashMap<RowData, Collection<RowData>> cache = 
cacheLoader.getCache();
+            assertCacheContent(cache);
+            run(cacheLoader);
+            // new instance of cache after reload
+            assertThat(cacheLoader.getCache()).isNotSameAs(cache);
+            cacheLoader.close();

Review Comment:
   ```suggestion
   ```
   This is handled by the try block



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -123,46 +126,47 @@ void testExceptionDuringReload() throws Exception {
                 () -> {
                     throw exception;
                 };
-        InputFormatCacheLoader cacheLoader = createCacheLoader(0, 
reloadAction);
-        InterceptingCacheMetricGroup metricGroup = new 
InterceptingCacheMetricGroup();
-        cacheLoader.open(metricGroup);
-        assertThatThrownBy(cacheLoader::run).hasRootCause(exception);
-        assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1);
+        try (InputFormatCacheLoader cacheLoader =
+                createCacheLoader(DEFAULT_NUM_SPLITS, 
DEFAULT_DELTA_NUM_SPLITS, reloadAction)) {
+            InterceptingCacheMetricGroup metricGroup = new 
InterceptingCacheMetricGroup();
+            cacheLoader.initializeMetrics(metricGroup);
+            assertThatThrownBy(() -> run(cacheLoader)).hasRootCause(exception);
+            
assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1);
+        }
     }
 
-    @Test
-    void testCloseAndInterruptDuringReload() throws Exception {
-        AtomicInteger sleepCounter = new AtomicInteger(0);
-        int totalSleepCount = TestCacheLoader.DATA.size() + 1; // equals to 
number of all rows
+    /**
+     * Cache loader creates additional threads in case of multiple input 
splits. In both cases cache
+     * loader must correctly react on close and interrupt all threads.
+     */
+    @ParameterizedTest
+    @MethodSource("numSplits")
+    void testCloseDuringReload(int numSplits) throws Exception {
+        OneShotLatch reloadLatch = new OneShotLatch();
         Runnable reloadAction =
-                ThrowingRunnable.unchecked(
-                        () -> {
-                            sleepCounter.incrementAndGet();
-                            Thread.sleep(1000);
-                        });
-        InputFormatCacheLoader cacheLoader = createCacheLoader(0, 
reloadAction);
+                () -> {
+                    reloadLatch.trigger();
+                    assertThatThrownBy(() -> new OneShotLatch().await())
+                            .as("Wait should be interrupted if everything 
works ok")
+                            .isInstanceOf(InterruptedException.class);
+                    Thread.currentThread().interrupt(); // restore interrupted 
status
+                };
         InterceptingCacheMetricGroup metricGroup = new 
InterceptingCacheMetricGroup();
-        cacheLoader.open(metricGroup);
-
-        // check interruption
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
-        Future<?> future = executorService.submit(cacheLoader);
-        executorService.shutdownNow(); // internally interrupts a thread
-        assertThatNoException().isThrownBy(future::get); // wait for the end
-        // check that we didn't process all elements, but reacted on 
interruption
-        assertThat(sleepCounter).hasValueLessThan(totalSleepCount);
+        CompletableFuture<Void> future;
+        try (InputFormatCacheLoader cacheLoader =
+                createCacheLoader(numSplits, DEFAULT_DELTA_NUM_SPLITS, 
reloadAction)) {
+            cacheLoader.initializeMetrics(metricGroup);
+            future = cacheLoader.reloadAsync();
+            reloadLatch.await(5000, TimeUnit.MILLISECONDS);
+        }
+        // try-with-resources calls #close, which should wait for the end of 
reload
+        assertThat(future.isDone()).isTrue();
+        assertThat(metricGroup.loadCounter.getCount()).isEqualTo(1);
         assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(0);
+    }
 
-        sleepCounter.set(0);
-
-        // check closing
-        executorService = Executors.newSingleThreadExecutor();
-        future = executorService.submit(cacheLoader);
-        cacheLoader.close();
-        assertThatNoException().isThrownBy(future::get); // wait for the end
-        // check that we didn't process all elements, but reacted on closing
-        assertThat(sleepCounter).hasValueLessThan(totalSleepCount);
-        assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(0);
+    static Stream<Arguments> numSplits() {
+        return Stream.of(Arguments.of(1), Arguments.of(2));

Review Comment:
   AFAIU, `numSplits` being `1` means that there is no extra fixed-size thread 
pool created in the `InputFormatCacheLoader`, but for `2` we expect additional 
threads to be created. Is this considered an implementation detail or shall we 
test this feature through some `@VisibleForTesting` utility method in 
`InputFormatCacheLoader` that returns the threadpool (which would be `null` in 
case of `numSplits=1`). :thinking: 



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -77,43 +77,46 @@ void checkCounter() {
     @ParameterizedTest
     @MethodSource("deltaNumSplits")
     void testReadWithDifferentSplits(int deltaNumSplits) throws Exception {
-        InputFormatCacheLoader cacheLoader = createCacheLoader(deltaNumSplits);
-        cacheLoader.open(UnregisteredMetricsGroup.createCacheMetricGroup());
-        cacheLoader.run();
-        ConcurrentHashMap<RowData, Collection<RowData>> cache = 
cacheLoader.getCache();
-        assertCacheContent(cache);
-        cacheLoader.run();
-        assertThat(cacheLoader.getCache()).isNotSameAs(cache); // new instance 
of cache after reload
-        cacheLoader.close();
-        assertThat(cacheLoader.getCache().size()).isZero(); // cache is 
cleared after close
+        try (InputFormatCacheLoader cacheLoader = 
createCacheLoader(deltaNumSplits)) {
+            
cacheLoader.initializeMetrics(UnregisteredMetricsGroup.createCacheMetricGroup());
+            run(cacheLoader);
+            ConcurrentHashMap<RowData, Collection<RowData>> cache = 
cacheLoader.getCache();
+            assertCacheContent(cache);
+            run(cacheLoader);
+            // new instance of cache after reload
+            assertThat(cacheLoader.getCache()).isNotSameAs(cache);

Review Comment:
   ```suggestion
               assertThat(cacheLoader.getCache()).as("A new instance of the 
cached data should be loaded.").isNotSameAs(cache);
   ```
   nit: just as another way to document tests in the future: You could use the 
assert message for documentation purposes. This adds more value as it describes 
the test during execution as well.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/LookupFullCache.java:
##########
@@ -109,7 +109,7 @@ public long size() {
 
     @Override
     public void close() throws Exception {
-        reloadTrigger.close(); // firstly try to interrupt reload thread
+        reloadTrigger.close();
         cacheLoader.close();

Review Comment:
   ```suggestion
           reloadTrigger.close();
           cacheLoader.close();
   ```
   Just as a discussion item: Should we add the purpose of each of the calls as 
comments? Or is this considered an implementation detail? AFAIU, the first one 
shuts down the scheduled thread pool for triggering an async cache loading. The 
second one shuts down the cache loading (fixed-size) thread pool.



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -123,46 +126,47 @@ void testExceptionDuringReload() throws Exception {
                 () -> {
                     throw exception;
                 };
-        InputFormatCacheLoader cacheLoader = createCacheLoader(0, 
reloadAction);
-        InterceptingCacheMetricGroup metricGroup = new 
InterceptingCacheMetricGroup();
-        cacheLoader.open(metricGroup);
-        assertThatThrownBy(cacheLoader::run).hasRootCause(exception);
-        assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1);
+        try (InputFormatCacheLoader cacheLoader =
+                createCacheLoader(DEFAULT_NUM_SPLITS, 
DEFAULT_DELTA_NUM_SPLITS, reloadAction)) {
+            InterceptingCacheMetricGroup metricGroup = new 
InterceptingCacheMetricGroup();
+            cacheLoader.initializeMetrics(metricGroup);
+            assertThatThrownBy(() -> run(cacheLoader)).hasRootCause(exception);
+            
assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1);
+        }
     }
 
-    @Test
-    void testCloseAndInterruptDuringReload() throws Exception {
-        AtomicInteger sleepCounter = new AtomicInteger(0);
-        int totalSleepCount = TestCacheLoader.DATA.size() + 1; // equals to 
number of all rows
+    /**
+     * Cache loader creates additional threads in case of multiple input 
splits. In both cases cache
+     * loader must correctly react on close and interrupt all threads.
+     */
+    @ParameterizedTest
+    @MethodSource("numSplits")
+    void testCloseDuringReload(int numSplits) throws Exception {
+        OneShotLatch reloadLatch = new OneShotLatch();
         Runnable reloadAction =
-                ThrowingRunnable.unchecked(
-                        () -> {
-                            sleepCounter.incrementAndGet();
-                            Thread.sleep(1000);
-                        });
-        InputFormatCacheLoader cacheLoader = createCacheLoader(0, 
reloadAction);
+                () -> {
+                    reloadLatch.trigger();
+                    assertThatThrownBy(() -> new OneShotLatch().await())
+                            .as("Wait should be interrupted if everything 
works ok")
+                            .isInstanceOf(InterruptedException.class);
+                    Thread.currentThread().interrupt(); // restore interrupted 
status
+                };
         InterceptingCacheMetricGroup metricGroup = new 
InterceptingCacheMetricGroup();
-        cacheLoader.open(metricGroup);
-
-        // check interruption
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
-        Future<?> future = executorService.submit(cacheLoader);
-        executorService.shutdownNow(); // internally interrupts a thread
-        assertThatNoException().isThrownBy(future::get); // wait for the end
-        // check that we didn't process all elements, but reacted on 
interruption
-        assertThat(sleepCounter).hasValueLessThan(totalSleepCount);
+        CompletableFuture<Void> future;
+        try (InputFormatCacheLoader cacheLoader =
+                createCacheLoader(numSplits, DEFAULT_DELTA_NUM_SPLITS, 
reloadAction)) {
+            cacheLoader.initializeMetrics(metricGroup);
+            future = cacheLoader.reloadAsync();
+            reloadLatch.await(5000, TimeUnit.MILLISECONDS);
+        }
+        // try-with-resources calls #close, which should wait for the end of 
reload
+        assertThat(future.isDone()).isTrue();
+        assertThat(metricGroup.loadCounter.getCount()).isEqualTo(1);

Review Comment:
   That's kind of unexpected: We're closing the loader by interrupting the 
threads executing `InputSplitCacheLoadTask`. Is this actually where we want to 
consider the cache loading to be a success? :thinking: 



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -123,46 +126,47 @@ void testExceptionDuringReload() throws Exception {
                 () -> {
                     throw exception;
                 };
-        InputFormatCacheLoader cacheLoader = createCacheLoader(0, 
reloadAction);
-        InterceptingCacheMetricGroup metricGroup = new 
InterceptingCacheMetricGroup();
-        cacheLoader.open(metricGroup);
-        assertThatThrownBy(cacheLoader::run).hasRootCause(exception);
-        assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1);
+        try (InputFormatCacheLoader cacheLoader =
+                createCacheLoader(DEFAULT_NUM_SPLITS, 
DEFAULT_DELTA_NUM_SPLITS, reloadAction)) {
+            InterceptingCacheMetricGroup metricGroup = new 
InterceptingCacheMetricGroup();
+            cacheLoader.initializeMetrics(metricGroup);
+            assertThatThrownBy(() -> run(cacheLoader)).hasRootCause(exception);
+            
assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1);
+        }
     }
 
-    @Test
-    void testCloseAndInterruptDuringReload() throws Exception {
-        AtomicInteger sleepCounter = new AtomicInteger(0);
-        int totalSleepCount = TestCacheLoader.DATA.size() + 1; // equals to 
number of all rows
+    /**
+     * Cache loader creates additional threads in case of multiple input 
splits. In both cases cache
+     * loader must correctly react on close and interrupt all threads.
+     */
+    @ParameterizedTest
+    @MethodSource("numSplits")
+    void testCloseDuringReload(int numSplits) throws Exception {
+        OneShotLatch reloadLatch = new OneShotLatch();
         Runnable reloadAction =
-                ThrowingRunnable.unchecked(
-                        () -> {
-                            sleepCounter.incrementAndGet();
-                            Thread.sleep(1000);
-                        });
-        InputFormatCacheLoader cacheLoader = createCacheLoader(0, 
reloadAction);
+                () -> {
+                    reloadLatch.trigger();
+                    assertThatThrownBy(() -> new OneShotLatch().await())
+                            .as("Wait should be interrupted if everything 
works ok")
+                            .isInstanceOf(InterruptedException.class);
+                    Thread.currentThread().interrupt(); // restore interrupted 
status
+                };
         InterceptingCacheMetricGroup metricGroup = new 
InterceptingCacheMetricGroup();
-        cacheLoader.open(metricGroup);
-
-        // check interruption
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
-        Future<?> future = executorService.submit(cacheLoader);
-        executorService.shutdownNow(); // internally interrupts a thread
-        assertThatNoException().isThrownBy(future::get); // wait for the end
-        // check that we didn't process all elements, but reacted on 
interruption
-        assertThat(sleepCounter).hasValueLessThan(totalSleepCount);
+        CompletableFuture<Void> future;
+        try (InputFormatCacheLoader cacheLoader =
+                createCacheLoader(numSplits, DEFAULT_DELTA_NUM_SPLITS, 
reloadAction)) {
+            cacheLoader.initializeMetrics(metricGroup);
+            future = cacheLoader.reloadAsync();
+            reloadLatch.await(5000, TimeUnit.MILLISECONDS);

Review Comment:
   ```suggestion
               reloadLatch.await();
   ```
   We agreed on not using timeouts in unit tests. This enables us to retrieve 
the thread dump at the end of the test suite if a test actually times out (see 
[docs](https://flink.apache.org/contributing/code-style-and-quality-common.html#avoid-timeouts-in-junit-tests)).



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java:
##########
@@ -71,58 +72,58 @@ public void open(Configuration parameters) throws Exception 
{
     }
 
     @Override
-    protected void reloadCache() throws Exception {
+    protected void updateCache() throws Exception {
         InputSplit[] inputSplits = createInputSplits();
         int numSplits = inputSplits.length;
+        int concurrencyLevel = getConcurrencyLevel(numSplits);
         // load data into the another copy of cache
-        // notice: it requires twice more memory, but on the other hand we 
don't need any blocking
+        // notice: it requires twice more memory, but on the other hand we 
don't need any blocking;
         // cache has default initialCapacity and loadFactor, but overridden 
concurrencyLevel
         ConcurrentHashMap<RowData, Collection<RowData>> newCache =
-                new ConcurrentHashMap<>(16, 0.75f, 
getConcurrencyLevel(numSplits));
-        this.cacheLoadTasks =
+                new ConcurrentHashMap<>(16, 0.75f, concurrencyLevel);
+        Deque<InputSplitCacheLoadTask> cacheLoadTasks =
                 Arrays.stream(inputSplits)
                         .map(split -> createCacheLoadTask(split, newCache))
-                        .collect(Collectors.toList());
-        if (isStopped) {
-            // check for cases when #close was called during reload before 
creating cacheLoadTasks
-            return;
-        }
-        // run first task or create numSplits threads to run all tasks
+                        .collect(Collectors.toCollection(ArrayDeque::new));
+        // run first task and create concurrencyLevel - 1 threads to run 
remaining tasks
         ExecutorService cacheLoadTaskService = null;
         try {
-            if (numSplits > 1) {
-                int numThreads = getConcurrencyLevel(numSplits);
-                cacheLoadTaskService = 
Executors.newFixedThreadPool(numThreads);
-                ExecutorService finalCacheLoadTaskService = 
cacheLoadTaskService;
-                List<Future<?>> futures =
+            InputSplitCacheLoadTask firstTask = cacheLoadTasks.pop();
+            CompletableFuture<?> otherTasksFuture = null;
+            if (!cacheLoadTasks.isEmpty()) {
+                cacheLoadTaskService = 
Executors.newFixedThreadPool(concurrencyLevel - 1);
+                ExecutorService finalExecutor = cacheLoadTaskService;

Review Comment:
   What's the purpose of creating a local variable here that refers to 
`cacheLoadTaskService`? :thinking: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to