XComp commented on code in PR #20919:
URL: https://github.com/apache/flink/pull/20919#discussion_r1038080578


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java:
##########
@@ -107,7 +109,11 @@ protected void reloadCache() throws Exception {
         } catch (InterruptedException ignored) { // we use interrupt to close 
reload thread
         } finally {
             if (cacheLoadTaskService != null) {
+                // if main cache reload thread encountered an exception,
+                // it interrupts underlying InputSplitCacheLoadTasks threads
                 cacheLoadTaskService.shutdownNow();

Review Comment:
   I just noticed that there's also `ExecutorUtils.gracefulShutdown`. Maybe, 
that would be worth using it as it would also include proper logging in case of 
an error.



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -149,19 +155,33 @@ void testCloseAndInterruptDuringReload() throws Exception 
{
         Future<?> future = executorService.submit(cacheLoader);
         executorService.shutdownNow(); // internally interrupts a thread
         assertThatNoException().isThrownBy(future::get); // wait for the end
-        // check that we didn't process all elements, but reacted on 
interruption
-        assertThat(sleepCounter).hasValueLessThan(totalSleepCount);
         assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(0);
+    }
 
-        sleepCounter.set(0);
+    @Test
+    void testCloseDuringReload() throws Exception {
+        AtomicInteger recordsCounter = new AtomicInteger(0);
+        int totalRecords = 
TestCacheLoader.DATA.values().stream().mapToInt(Collection::size).sum();
+        CountDownLatch latch = new CountDownLatch(1);
+        Runnable reloadAction =
+                ThrowingRunnable.unchecked(
+                        () -> {
+                            recordsCounter.incrementAndGet();
+                            latch.await();

Review Comment:
   Doing that leads to a deadlock. Even just adding `Thread.sleep` shortly 
before closing the cache loader makes the test run into a deadlock because 
[CacheLoader:101](https://github.com/apache/flink/blob/340b100f2de5e0d90ba475aa8a00e359a61442ce/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/CacheLoader.java#L101)
 acquires the `CacheLoader.reloadLock` and initiates the reloading of the cache 
synchronously while having the lock acquired. The data reload blocks due to the 
`CountDownLatch` we use in the test. The subsequent call of close triggers the 
shutdown of the `InputSplitCacheLoadTask` instances through 
`InputSplitCacheLoadTask#stopRunning`. But the shutdown of those tasks will 
never be done because the data loading is still blocked and the tasks are stuck 
in the [while 
loop](https://github.com/apache/flink/blob/340b100f2de5e0d90ba475aa8a00e359a61442ce/flink-table/flink-table-runtime/src/main/java/org/apache/flink/tabl
 
e/runtime/functions/table/lookup/fullcache/inputformat/InputSplitCacheLoadTask.java#L71).
 
[InputFormatCacheLoader#close](https://github.com/apache/flink/blob/a024a366f73f822bb4fd35db737ac2b8177f6b25/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java#L129)
 will continue with calling `CacheLoader#close` which tries to acquire the lock 
that's already acquired by the thread that executes `InputFormatCacheLoader` 
and we end up in a deadlock.
   
   My suspicion is that the test as it is pushed right now is passing because 
we're calling close before the data reload is initiated. AFAIU, that's not what 
we want to test, is it? Please correct me if I'm wrong here.



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -131,15 +133,19 @@ void testExceptionDuringReload() throws Exception {
     }
 
     @Test
-    void testCloseAndInterruptDuringReload() throws Exception {
-        AtomicInteger sleepCounter = new AtomicInteger(0);
-        int totalSleepCount = TestCacheLoader.DATA.size() + 1; // equals to 
number of all rows
+    void testInterruptDuringReload() throws Exception {
+        CountDownLatch recordsProcessingLatch = new CountDownLatch(1);
         Runnable reloadAction =
-                ThrowingRunnable.unchecked(
-                        () -> {
-                            sleepCounter.incrementAndGet();
-                            Thread.sleep(1000);
-                        });
+                () -> {
+                    try {
+                        // wait should be interrupted if everything works ok
+                        if (!recordsProcessingLatch.await(5, 
TimeUnit.SECONDS)) {
+                            throw new RuntimeException("timeout");
+                        }

Review Comment:
   ```suggestion
                       assertThatThrownBy(recordsProcessingLatch::await)
                               .as("wait should be interrupted if everything 
works ok")
                               .isInstanceOf(InterruptedException.class);
                       Thread.currentThread().interrupt(); // restore 
interrupted status
   ```
   I guess, we could get rid of the 5 seconds here. By waiting forever enables 
us to generate the thread dump at the end which gives more insights into what 
went wrong during the test execution.
   
   nit: I played around with the assertj API a bit more and utilized the 
comment as a assertion message. WDYT?



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -149,19 +155,33 @@ void testCloseAndInterruptDuringReload() throws Exception 
{
         Future<?> future = executorService.submit(cacheLoader);
         executorService.shutdownNow(); // internally interrupts a thread
         assertThatNoException().isThrownBy(future::get); // wait for the end
-        // check that we didn't process all elements, but reacted on 
interruption
-        assertThat(sleepCounter).hasValueLessThan(totalSleepCount);
         assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(0);
+    }
 
-        sleepCounter.set(0);
+    @Test
+    void testCloseDuringReload() throws Exception {
+        AtomicInteger recordsCounter = new AtomicInteger(0);
+        int totalRecords = 
TestCacheLoader.DATA.values().stream().mapToInt(Collection::size).sum();
+        CountDownLatch latch = new CountDownLatch(1);
+        Runnable reloadAction =
+                ThrowingRunnable.unchecked(
+                        () -> {
+                            recordsCounter.incrementAndGet();
+                            latch.await();

Review Comment:
   ```suggestion
                               reloadActionReachedTrigger.trigger();
                               latch.await();
   ```
   Don't we have to add something like a `OneShotLatch#trigger` here and we 
wait for it before calling `cacheLoader.close()` later in the test. That way, 
we make sure that we're actually at the stage where the reload tasks are 
instantiated before closing the loader.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java:
##########
@@ -107,7 +109,11 @@ protected void reloadCache() throws Exception {
         } catch (InterruptedException ignored) { // we use interrupt to close 
reload thread
         } finally {
             if (cacheLoadTaskService != null) {
+                // if main cache reload thread encountered an exception,
+                // it interrupts underlying InputSplitCacheLoadTasks threads
                 cacheLoadTaskService.shutdownNow();

Review Comment:
   yeah, I was just curious about your opinion on the usage of common pool. It 
felt like implementing it asynchronously would have worked here. I just 
struggled to think of a better way to enable thread loading based on the number 
of `InputSplits` to optimize the resource utilization per cache loading cycle.



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -149,19 +155,33 @@ void testCloseAndInterruptDuringReload() throws Exception 
{
         Future<?> future = executorService.submit(cacheLoader);
         executorService.shutdownNow(); // internally interrupts a thread
         assertThatNoException().isThrownBy(future::get); // wait for the end
-        // check that we didn't process all elements, but reacted on 
interruption
-        assertThat(sleepCounter).hasValueLessThan(totalSleepCount);
         assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(0);
+    }
 
-        sleepCounter.set(0);
+    @Test
+    void testCloseDuringReload() throws Exception {
+        AtomicInteger recordsCounter = new AtomicInteger(0);
+        int totalRecords = TestCacheLoader.DATA.size() + 1; // 1 key with 2 
records
+        CountDownLatch latch = new CountDownLatch(1);
+        Runnable reloadAction =
+                ThrowingRunnable.unchecked(
+                        () -> {
+                            recordsCounter.incrementAndGet();
+                            latch.await();
+                        });
+        InputFormatCacheLoader cacheLoader = createCacheLoader(0, 
reloadAction);
+        InterceptingCacheMetricGroup metricGroup = new 
InterceptingCacheMetricGroup();
+        cacheLoader.open(metricGroup);
 
         // check closing
-        executorService = Executors.newSingleThreadExecutor();
-        future = executorService.submit(cacheLoader);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        Future<?> future = executorService.submit(cacheLoader);
         cacheLoader.close();
-        assertThatNoException().isThrownBy(future::get); // wait for the end
+        latch.countDown();
+        future.get(); // wait for the end
+        executorService.shutdown();

Review Comment:
   Probably, you're right. I checked how we dealt with threads in other cases. 
But since the future itself completed already, it should be fine. My initial 
concern was that the thread itself doesn't finalize properly before the test is 
passed and that it could mingle around while other tests are executed. But 
looks like that's not an issue.
   
   What we could do, though, is to add a try/finally block to make sure that 
shutdown is also triggered if something else causes an exception to be thrown 
to handle proper shutdown of the thread in that case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to