XComp commented on code in PR #20919:
URL: https://github.com/apache/flink/pull/20919#discussion_r1085241002
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -123,46 +127,48 @@ void testExceptionDuringReload() throws Exception {
() -> {
throw exception;
};
- InputFormatCacheLoader cacheLoader = createCacheLoader(0,
reloadAction);
- InterceptingCacheMetricGroup metricGroup = new
InterceptingCacheMetricGroup();
- cacheLoader.open(metricGroup);
- assertThatThrownBy(cacheLoader::run).hasRootCause(exception);
- assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1);
+ try (InputFormatCacheLoader cacheLoader =
+ createCacheLoader(DEFAULT_NUM_SPLITS,
DEFAULT_DELTA_NUM_SPLITS, reloadAction)) {
+ InterceptingCacheMetricGroup metricGroup = new
InterceptingCacheMetricGroup();
+ cacheLoader.initializeMetrics(metricGroup);
+ assertThatThrownBy(() -> run(cacheLoader)).hasRootCause(exception);
+ assertThat(metricGroup.loadCounter.getCount()).isEqualTo(0);
+
assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1);
+ }
}
- @Test
- void testCloseAndInterruptDuringReload() throws Exception {
- AtomicInteger sleepCounter = new AtomicInteger(0);
- int totalSleepCount = TestCacheLoader.DATA.size() + 1; // equals to
number of all rows
+ /**
+ * Cache loader creates additional threads in case of multiple input
splits. In both cases cache
+ * loader must correctly react on close and interrupt all threads.
+ */
+ @ParameterizedTest
+ @MethodSource("numSplits")
+ void testCloseDuringReload(int numSplits) throws Exception {
+ OneShotLatch reloadLatch = new OneShotLatch();
Runnable reloadAction =
- ThrowingRunnable.unchecked(
- () -> {
- sleepCounter.incrementAndGet();
- Thread.sleep(1000);
- });
- InputFormatCacheLoader cacheLoader = createCacheLoader(0,
reloadAction);
+ () -> {
+ reloadLatch.trigger();
+ assertThatThrownBy(() -> new OneShotLatch().await())
+ .as("Wait should be interrupted if everything
works ok")
+ .isInstanceOf(InterruptedException.class);
+ Thread.currentThread().interrupt(); // restore interrupted
status
+ };
InterceptingCacheMetricGroup metricGroup = new
InterceptingCacheMetricGroup();
- cacheLoader.open(metricGroup);
-
- // check interruption
- ExecutorService executorService = Executors.newSingleThreadExecutor();
- Future<?> future = executorService.submit(cacheLoader);
- executorService.shutdownNow(); // internally interrupts a thread
- assertThatNoException().isThrownBy(future::get); // wait for the end
- // check that we didn't process all elements, but reacted on
interruption
- assertThat(sleepCounter).hasValueLessThan(totalSleepCount);
+ CompletableFuture<Void> future;
+ try (InputFormatCacheLoader cacheLoader =
+ createCacheLoader(numSplits, DEFAULT_DELTA_NUM_SPLITS,
reloadAction)) {
+ cacheLoader.initializeMetrics(metricGroup);
+ future = cacheLoader.reloadAsync();
+ reloadLatch.await();
+ }
+ // try-with-resources calls #close, which should wait for the end of
reload
+ assertThat(future.isDone()).isTrue();
Review Comment:
```suggestion
// try-with-resources calls #close which will interrupt any running
threads, which should wait for the end of reload
assertThat(future.isDone()).as("The reload future should still
complete successfully indicating that the reload was intentionally stopped
without an error.").isTrue();
```
Is this correct? At least that's how I understand this test. :thinking: Feel
free to change/revert
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/LookupFullCache.java:
##########
@@ -57,25 +59,28 @@ public synchronized void open(CacheMetricGroup metricGroup)
{
}
metricGroup.hitCounter(hitCounter);
metricGroup.missCounter(new SimpleCounter()); // always zero
- cacheLoader.open(metricGroup);
- }
+ cacheLoader.initializeMetrics(metricGroup);
- public synchronized void open(Configuration parameters) throws Exception {
if (reloadTriggerContext == null) {
- cacheLoader.open(parameters);
- reloadTriggerContext =
- new ReloadTriggerContext(
- cacheLoader,
- th -> {
- if (reloadFailCause == null) {
- reloadFailCause = th;
- } else {
- reloadFailCause.addSuppressed(th);
- }
- });
-
- reloadTrigger.open(reloadTriggerContext);
- cacheLoader.awaitFirstLoad();
+ try {
+ // TODO add Configuration into FunctionContext and pass in
into LookupFullCache
Review Comment:
If we want to have a follow-up, we should create a FLINK Jira issue.
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/FullCacheTestInputFormat.java:
##########
@@ -41,18 +41,22 @@
import static org.assertj.core.api.Assertions.assertThat;
-/** TestInputFormat that reads data from (2 + delta) splits which share the
same {@code queue}. */
+/**
+ * TestInputFormat that reads data from (initNum + delta) splits which share
the same {@code queue}.
+ */
public class FullCacheTestInputFormat
extends RichInputFormat<RowData,
FullCacheTestInputFormat.QueueInputSplit> {
public static final AtomicInteger OPEN_CLOSED_COUNTER = new
AtomicInteger(0);
- private static final int DEFAULT_NUM_SPLITS = 2;
+ public static final int DEFAULT_NUM_SPLITS = 2;
+ public static final int DEFAULT_DELTA_NUM_SPLITS = 0;
Review Comment:
tbh, it appears that we could even make these values `private`. The only
time, they are used is in `InputFormatCacheLoaderTest` which doesn't seem to
rely on these values but just uses them as "some" input value. Or is there a
specific reason why we need the default values as input values? :thinking:
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -178,12 +184,16 @@ private void assertCacheContent(Map<RowData,
Collection<RowData>> actual) {
assertThat(rows).containsExactlyInAnyOrderElementsOf(actual.get(key)));
}
+ private void run(CacheLoader cacheLoader) {
Review Comment:
```suggestion
private void reloadSynchronously(CacheLoader cacheLoader) {
```
nit: that would be a better description of what this method is doing.
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/ReloadTriggerContext.java:
##########
@@ -26,11 +26,11 @@
/** Runtime implementation of {@link CacheReloadTrigger.Context}. */
public class ReloadTriggerContext implements CacheReloadTrigger.Context {
- private final Runnable reloadTask;
+ private final CacheLoader cacheLoader;
private final Consumer<Throwable> reloadFailCallback;
- public ReloadTriggerContext(Runnable reloadTask, Consumer<Throwable>
reloadFailCallback) {
- this.reloadTask = reloadTask;
+ public ReloadTriggerContext(CacheLoader cacheLoader, Consumer<Throwable>
reloadFailCallback) {
Review Comment:
Just as a general side-note for future improvements: Using an interface here
might help the testability of the code. `ReloadTriggerContext` does not
necessarily require `CacheLoader` but only the single function
`CacheLoader::reloadAsync()` which you can pass in through
`Supplier<CompletableFuture<Void>>` (since you're passing in another functional
interface for the error handling already). But again, that's just meant as a
viewpoint for future code changes. :-)
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/FullCacheTestInputFormat.java:
##########
@@ -41,18 +41,22 @@
import static org.assertj.core.api.Assertions.assertThat;
-/** TestInputFormat that reads data from (2 + delta) splits which share the
same {@code queue}. */
+/**
+ * TestInputFormat that reads data from (initNum + delta) splits which share
the same {@code queue}.
Review Comment:
```suggestion
* TestInputFormat that reads data from ({@link #numSplits} + {@link
#deltaNumSplits}) splits which share the same {@code queue}.
```
What is `initNum`? I guess, you were referring to the newly added field
`numSplits` analogously to the field `deltaNumSplits`?
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/FullCacheTestInputFormat.java:
##########
@@ -41,18 +41,22 @@
import static org.assertj.core.api.Assertions.assertThat;
-/** TestInputFormat that reads data from (2 + delta) splits which share the
same {@code queue}. */
+/**
+ * TestInputFormat that reads data from (initNum + delta) splits which share
the same {@code queue}.
+ */
public class FullCacheTestInputFormat
extends RichInputFormat<RowData,
FullCacheTestInputFormat.QueueInputSplit> {
public static final AtomicInteger OPEN_CLOSED_COUNTER = new
AtomicInteger(0);
- private static final int DEFAULT_NUM_SPLITS = 2;
+ public static final int DEFAULT_NUM_SPLITS = 2;
+ public static final int DEFAULT_DELTA_NUM_SPLITS = 0;
Review Comment:
```suggestion
static final int DEFAULT_NUM_SPLITS = 2;
static final int DEFAULT_DELTA_NUM_SPLITS = 0;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]