XComp commented on code in PR #20919:
URL: https://github.com/apache/flink/pull/20919#discussion_r1085241002


##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -123,46 +127,48 @@ void testExceptionDuringReload() throws Exception {
                 () -> {
                     throw exception;
                 };
-        InputFormatCacheLoader cacheLoader = createCacheLoader(0, 
reloadAction);
-        InterceptingCacheMetricGroup metricGroup = new 
InterceptingCacheMetricGroup();
-        cacheLoader.open(metricGroup);
-        assertThatThrownBy(cacheLoader::run).hasRootCause(exception);
-        assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1);
+        try (InputFormatCacheLoader cacheLoader =
+                createCacheLoader(DEFAULT_NUM_SPLITS, 
DEFAULT_DELTA_NUM_SPLITS, reloadAction)) {
+            InterceptingCacheMetricGroup metricGroup = new 
InterceptingCacheMetricGroup();
+            cacheLoader.initializeMetrics(metricGroup);
+            assertThatThrownBy(() -> run(cacheLoader)).hasRootCause(exception);
+            assertThat(metricGroup.loadCounter.getCount()).isEqualTo(0);
+            
assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1);
+        }
     }
 
-    @Test
-    void testCloseAndInterruptDuringReload() throws Exception {
-        AtomicInteger sleepCounter = new AtomicInteger(0);
-        int totalSleepCount = TestCacheLoader.DATA.size() + 1; // equals to 
number of all rows
+    /**
+     * Cache loader creates additional threads in case of multiple input 
splits. In both cases cache
+     * loader must correctly react on close and interrupt all threads.
+     */
+    @ParameterizedTest
+    @MethodSource("numSplits")
+    void testCloseDuringReload(int numSplits) throws Exception {
+        OneShotLatch reloadLatch = new OneShotLatch();
         Runnable reloadAction =
-                ThrowingRunnable.unchecked(
-                        () -> {
-                            sleepCounter.incrementAndGet();
-                            Thread.sleep(1000);
-                        });
-        InputFormatCacheLoader cacheLoader = createCacheLoader(0, 
reloadAction);
+                () -> {
+                    reloadLatch.trigger();
+                    assertThatThrownBy(() -> new OneShotLatch().await())
+                            .as("Wait should be interrupted if everything 
works ok")
+                            .isInstanceOf(InterruptedException.class);
+                    Thread.currentThread().interrupt(); // restore interrupted 
status
+                };
         InterceptingCacheMetricGroup metricGroup = new 
InterceptingCacheMetricGroup();
-        cacheLoader.open(metricGroup);
-
-        // check interruption
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
-        Future<?> future = executorService.submit(cacheLoader);
-        executorService.shutdownNow(); // internally interrupts a thread
-        assertThatNoException().isThrownBy(future::get); // wait for the end
-        // check that we didn't process all elements, but reacted on 
interruption
-        assertThat(sleepCounter).hasValueLessThan(totalSleepCount);
+        CompletableFuture<Void> future;
+        try (InputFormatCacheLoader cacheLoader =
+                createCacheLoader(numSplits, DEFAULT_DELTA_NUM_SPLITS, 
reloadAction)) {
+            cacheLoader.initializeMetrics(metricGroup);
+            future = cacheLoader.reloadAsync();
+            reloadLatch.await();
+        }
+        // try-with-resources calls #close, which should wait for the end of 
reload
+        assertThat(future.isDone()).isTrue();

Review Comment:
   ```suggestion
           // try-with-resources calls #close which will interrupt any running 
threads, which should wait for the end of reload
           assertThat(future.isDone()).as("The reload future should still 
complete successfully indicating that the reload was intentionally stopped 
without an error.").isTrue();
   ```
   Is this correct? At least that's how I understand this test. :thinking: Feel 
free to change/revert



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/LookupFullCache.java:
##########
@@ -57,25 +59,28 @@ public synchronized void open(CacheMetricGroup metricGroup) 
{
         }
         metricGroup.hitCounter(hitCounter);
         metricGroup.missCounter(new SimpleCounter()); // always zero
-        cacheLoader.open(metricGroup);
-    }
+        cacheLoader.initializeMetrics(metricGroup);
 
-    public synchronized void open(Configuration parameters) throws Exception {
         if (reloadTriggerContext == null) {
-            cacheLoader.open(parameters);
-            reloadTriggerContext =
-                    new ReloadTriggerContext(
-                            cacheLoader,
-                            th -> {
-                                if (reloadFailCause == null) {
-                                    reloadFailCause = th;
-                                } else {
-                                    reloadFailCause.addSuppressed(th);
-                                }
-                            });
-
-            reloadTrigger.open(reloadTriggerContext);
-            cacheLoader.awaitFirstLoad();
+            try {
+                // TODO add Configuration into FunctionContext and pass in 
into LookupFullCache

Review Comment:
   If we want to have a follow-up, we should create a FLINK Jira issue.



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/FullCacheTestInputFormat.java:
##########
@@ -41,18 +41,22 @@
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** TestInputFormat that reads data from (2 + delta) splits which share the 
same {@code queue}. */
+/**
+ * TestInputFormat that reads data from (initNum + delta) splits which share 
the same {@code queue}.
+ */
 public class FullCacheTestInputFormat
         extends RichInputFormat<RowData, 
FullCacheTestInputFormat.QueueInputSplit> {
 
     public static final AtomicInteger OPEN_CLOSED_COUNTER = new 
AtomicInteger(0);
-    private static final int DEFAULT_NUM_SPLITS = 2;
+    public static final int DEFAULT_NUM_SPLITS = 2;
+    public static final int DEFAULT_DELTA_NUM_SPLITS = 0;

Review Comment:
   tbh, it appears that we could even make these values `private`. The only 
time, they are used is in `InputFormatCacheLoaderTest` which doesn't seem to 
rely on these values but just uses them as "some" input value. Or is there a 
specific reason why we need the default values as input values? :thinking: 



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -178,12 +184,16 @@ private void assertCacheContent(Map<RowData, 
Collection<RowData>> actual) {
                         
assertThat(rows).containsExactlyInAnyOrderElementsOf(actual.get(key)));
     }
 
+    private void run(CacheLoader cacheLoader) {

Review Comment:
   ```suggestion
       private void reloadSynchronously(CacheLoader cacheLoader) {
   ```
   nit: that would be a better description of what this method is doing.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/ReloadTriggerContext.java:
##########
@@ -26,11 +26,11 @@
 /** Runtime implementation of {@link CacheReloadTrigger.Context}. */
 public class ReloadTriggerContext implements CacheReloadTrigger.Context {
 
-    private final Runnable reloadTask;
+    private final CacheLoader cacheLoader;
     private final Consumer<Throwable> reloadFailCallback;
 
-    public ReloadTriggerContext(Runnable reloadTask, Consumer<Throwable> 
reloadFailCallback) {
-        this.reloadTask = reloadTask;
+    public ReloadTriggerContext(CacheLoader cacheLoader, Consumer<Throwable> 
reloadFailCallback) {

Review Comment:
   Just as a general side-note for future improvements: Using an interface here 
might help the testability of the code. `ReloadTriggerContext` does not 
necessarily require `CacheLoader` but only the single function 
`CacheLoader::reloadAsync()` which you can pass in through 
`Supplier<CompletableFuture<Void>>` (since you're passing in another functional 
interface for the error handling already). But again, that's just meant as a 
viewpoint for future code changes. :-)



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/FullCacheTestInputFormat.java:
##########
@@ -41,18 +41,22 @@
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** TestInputFormat that reads data from (2 + delta) splits which share the 
same {@code queue}. */
+/**
+ * TestInputFormat that reads data from (initNum + delta) splits which share 
the same {@code queue}.

Review Comment:
   ```suggestion
    * TestInputFormat that reads data from ({@link #numSplits} + {@link 
#deltaNumSplits}) splits which share the same {@code queue}.
   ```
   What is `initNum`? I guess, you were referring to the newly added field 
`numSplits` analogously to the field `deltaNumSplits`?



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/FullCacheTestInputFormat.java:
##########
@@ -41,18 +41,22 @@
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** TestInputFormat that reads data from (2 + delta) splits which share the 
same {@code queue}. */
+/**
+ * TestInputFormat that reads data from (initNum + delta) splits which share 
the same {@code queue}.
+ */
 public class FullCacheTestInputFormat
         extends RichInputFormat<RowData, 
FullCacheTestInputFormat.QueueInputSplit> {
 
     public static final AtomicInteger OPEN_CLOSED_COUNTER = new 
AtomicInteger(0);
-    private static final int DEFAULT_NUM_SPLITS = 2;
+    public static final int DEFAULT_NUM_SPLITS = 2;
+    public static final int DEFAULT_DELTA_NUM_SPLITS = 0;

Review Comment:
   ```suggestion
       static final int DEFAULT_NUM_SPLITS = 2;
       static final int DEFAULT_DELTA_NUM_SPLITS = 0;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to