XComp commented on code in PR #20919:
URL: https://github.com/apache/flink/pull/20919#discussion_r1053127950
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/LookupFullCache.java:
##########
@@ -57,7 +57,7 @@ public synchronized void open(CacheMetricGroup metricGroup) {
}
metricGroup.hitCounter(hitCounter);
metricGroup.missCounter(new SimpleCounter()); // always zero
- cacheLoader.open(metricGroup);
+ cacheLoader.initializeMetrics(metricGroup);
}
public synchronized void open(Configuration parameters) throws Exception {
Review Comment:
There's the same naming issue with the `open` methods in `LookupFullCache`.
It doesn't help the readability if we have the same method name with different
signature and different purpose. What about calling `open(Configuration)`
something like `initializeReloadTrigger(Configuration)`. :thinking:
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/FullCacheTestInputFormat.java:
##########
@@ -86,20 +92,21 @@ public FullCacheTestInputFormat(
this.projectable = generatedProjection.isPresent();
this.generatedProjection = generatedProjection.orElse(null);
this.rowConverter = rowConverter;
- this.deltaNumSplits = 0;
+ this.numSplits = DEFAULT_NUM_SPLITS;
+ this.deltaNumSplits = DEFAULT_DELTA_NUM_SPLITS;
Review Comment:
you could call `this(dataRows, generatedProjection, rowConverter,
DEFAULT_NUM_SPLITS, DEFAULT_DELTA_NUM_SPLITS)` instead of initializing all
fields here once more.
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -77,43 +77,46 @@ void checkCounter() {
@ParameterizedTest
@MethodSource("deltaNumSplits")
void testReadWithDifferentSplits(int deltaNumSplits) throws Exception {
- InputFormatCacheLoader cacheLoader = createCacheLoader(deltaNumSplits);
- cacheLoader.open(UnregisteredMetricsGroup.createCacheMetricGroup());
- cacheLoader.run();
- ConcurrentHashMap<RowData, Collection<RowData>> cache =
cacheLoader.getCache();
- assertCacheContent(cache);
- cacheLoader.run();
- assertThat(cacheLoader.getCache()).isNotSameAs(cache); // new instance
of cache after reload
- cacheLoader.close();
- assertThat(cacheLoader.getCache().size()).isZero(); // cache is
cleared after close
+ try (InputFormatCacheLoader cacheLoader =
createCacheLoader(deltaNumSplits)) {
+
cacheLoader.initializeMetrics(UnregisteredMetricsGroup.createCacheMetricGroup());
+ run(cacheLoader);
+ ConcurrentHashMap<RowData, Collection<RowData>> cache =
cacheLoader.getCache();
+ assertCacheContent(cache);
+ run(cacheLoader);
+ // new instance of cache after reload
+ assertThat(cacheLoader.getCache()).isNotSameAs(cache);
+ cacheLoader.close();
Review Comment:
```suggestion
```
This is handled by the try block
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -123,46 +126,47 @@ void testExceptionDuringReload() throws Exception {
() -> {
throw exception;
};
- InputFormatCacheLoader cacheLoader = createCacheLoader(0,
reloadAction);
- InterceptingCacheMetricGroup metricGroup = new
InterceptingCacheMetricGroup();
- cacheLoader.open(metricGroup);
- assertThatThrownBy(cacheLoader::run).hasRootCause(exception);
- assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1);
+ try (InputFormatCacheLoader cacheLoader =
+ createCacheLoader(DEFAULT_NUM_SPLITS,
DEFAULT_DELTA_NUM_SPLITS, reloadAction)) {
+ InterceptingCacheMetricGroup metricGroup = new
InterceptingCacheMetricGroup();
+ cacheLoader.initializeMetrics(metricGroup);
+ assertThatThrownBy(() -> run(cacheLoader)).hasRootCause(exception);
+
assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1);
+ }
}
- @Test
- void testCloseAndInterruptDuringReload() throws Exception {
- AtomicInteger sleepCounter = new AtomicInteger(0);
- int totalSleepCount = TestCacheLoader.DATA.size() + 1; // equals to
number of all rows
+ /**
+ * Cache loader creates additional threads in case of multiple input
splits. In both cases cache
+ * loader must correctly react on close and interrupt all threads.
+ */
+ @ParameterizedTest
+ @MethodSource("numSplits")
+ void testCloseDuringReload(int numSplits) throws Exception {
+ OneShotLatch reloadLatch = new OneShotLatch();
Runnable reloadAction =
- ThrowingRunnable.unchecked(
- () -> {
- sleepCounter.incrementAndGet();
- Thread.sleep(1000);
- });
- InputFormatCacheLoader cacheLoader = createCacheLoader(0,
reloadAction);
+ () -> {
+ reloadLatch.trigger();
+ assertThatThrownBy(() -> new OneShotLatch().await())
+ .as("Wait should be interrupted if everything
works ok")
+ .isInstanceOf(InterruptedException.class);
+ Thread.currentThread().interrupt(); // restore interrupted
status
+ };
InterceptingCacheMetricGroup metricGroup = new
InterceptingCacheMetricGroup();
- cacheLoader.open(metricGroup);
-
- // check interruption
- ExecutorService executorService = Executors.newSingleThreadExecutor();
- Future<?> future = executorService.submit(cacheLoader);
- executorService.shutdownNow(); // internally interrupts a thread
- assertThatNoException().isThrownBy(future::get); // wait for the end
- // check that we didn't process all elements, but reacted on
interruption
- assertThat(sleepCounter).hasValueLessThan(totalSleepCount);
+ CompletableFuture<Void> future;
+ try (InputFormatCacheLoader cacheLoader =
+ createCacheLoader(numSplits, DEFAULT_DELTA_NUM_SPLITS,
reloadAction)) {
+ cacheLoader.initializeMetrics(metricGroup);
+ future = cacheLoader.reloadAsync();
+ reloadLatch.await(5000, TimeUnit.MILLISECONDS);
+ }
+ // try-with-resources calls #close, which should wait for the end of
reload
+ assertThat(future.isDone()).isTrue();
+ assertThat(metricGroup.loadCounter.getCount()).isEqualTo(1);
assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(0);
+ }
- sleepCounter.set(0);
-
- // check closing
- executorService = Executors.newSingleThreadExecutor();
- future = executorService.submit(cacheLoader);
- cacheLoader.close();
- assertThatNoException().isThrownBy(future::get); // wait for the end
- // check that we didn't process all elements, but reacted on closing
- assertThat(sleepCounter).hasValueLessThan(totalSleepCount);
- assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(0);
+ static Stream<Arguments> numSplits() {
+ return Stream.of(Arguments.of(1), Arguments.of(2));
Review Comment:
AFAIU, `numSplits` being `1` means that there is no extra fixed-size thread
pool created in the `InputFormatCacheLoader`, but for `2` we expect additional
threads to be created. Is this considered an implementation detail or shall we
test this feature through some `@VisibleForTesting` utility method in
`InputFormatCacheLoader` that returns the threadpool (which would be `null` in
case of `numSplits=1`). :thinking:
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -77,43 +77,46 @@ void checkCounter() {
@ParameterizedTest
@MethodSource("deltaNumSplits")
void testReadWithDifferentSplits(int deltaNumSplits) throws Exception {
- InputFormatCacheLoader cacheLoader = createCacheLoader(deltaNumSplits);
- cacheLoader.open(UnregisteredMetricsGroup.createCacheMetricGroup());
- cacheLoader.run();
- ConcurrentHashMap<RowData, Collection<RowData>> cache =
cacheLoader.getCache();
- assertCacheContent(cache);
- cacheLoader.run();
- assertThat(cacheLoader.getCache()).isNotSameAs(cache); // new instance
of cache after reload
- cacheLoader.close();
- assertThat(cacheLoader.getCache().size()).isZero(); // cache is
cleared after close
+ try (InputFormatCacheLoader cacheLoader =
createCacheLoader(deltaNumSplits)) {
+
cacheLoader.initializeMetrics(UnregisteredMetricsGroup.createCacheMetricGroup());
+ run(cacheLoader);
+ ConcurrentHashMap<RowData, Collection<RowData>> cache =
cacheLoader.getCache();
+ assertCacheContent(cache);
+ run(cacheLoader);
+ // new instance of cache after reload
+ assertThat(cacheLoader.getCache()).isNotSameAs(cache);
Review Comment:
```suggestion
assertThat(cacheLoader.getCache()).as("A new instance of the
cached data should be loaded.").isNotSameAs(cache);
```
nit: just as another way to document tests in the future: You could use the
assert message for documentation purposes. This adds more value as it describes
the test during execution as well.
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/LookupFullCache.java:
##########
@@ -109,7 +109,7 @@ public long size() {
@Override
public void close() throws Exception {
- reloadTrigger.close(); // firstly try to interrupt reload thread
+ reloadTrigger.close();
cacheLoader.close();
Review Comment:
```suggestion
reloadTrigger.close();
cacheLoader.close();
```
Just as a discussion item: Should we add the purpose of each of the calls as
comments? Or is this considered an implementation detail? AFAIU, the first one
shuts down the scheduled thread pool for triggering an async cache loading. The
second one shuts down the cache loading (fixed-size) thread pool.
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -123,46 +126,47 @@ void testExceptionDuringReload() throws Exception {
() -> {
throw exception;
};
- InputFormatCacheLoader cacheLoader = createCacheLoader(0,
reloadAction);
- InterceptingCacheMetricGroup metricGroup = new
InterceptingCacheMetricGroup();
- cacheLoader.open(metricGroup);
- assertThatThrownBy(cacheLoader::run).hasRootCause(exception);
- assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1);
+ try (InputFormatCacheLoader cacheLoader =
+ createCacheLoader(DEFAULT_NUM_SPLITS,
DEFAULT_DELTA_NUM_SPLITS, reloadAction)) {
+ InterceptingCacheMetricGroup metricGroup = new
InterceptingCacheMetricGroup();
+ cacheLoader.initializeMetrics(metricGroup);
+ assertThatThrownBy(() -> run(cacheLoader)).hasRootCause(exception);
+
assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1);
+ }
}
- @Test
- void testCloseAndInterruptDuringReload() throws Exception {
- AtomicInteger sleepCounter = new AtomicInteger(0);
- int totalSleepCount = TestCacheLoader.DATA.size() + 1; // equals to
number of all rows
+ /**
+ * Cache loader creates additional threads in case of multiple input
splits. In both cases cache
+ * loader must correctly react on close and interrupt all threads.
+ */
+ @ParameterizedTest
+ @MethodSource("numSplits")
+ void testCloseDuringReload(int numSplits) throws Exception {
+ OneShotLatch reloadLatch = new OneShotLatch();
Runnable reloadAction =
- ThrowingRunnable.unchecked(
- () -> {
- sleepCounter.incrementAndGet();
- Thread.sleep(1000);
- });
- InputFormatCacheLoader cacheLoader = createCacheLoader(0,
reloadAction);
+ () -> {
+ reloadLatch.trigger();
+ assertThatThrownBy(() -> new OneShotLatch().await())
+ .as("Wait should be interrupted if everything
works ok")
+ .isInstanceOf(InterruptedException.class);
+ Thread.currentThread().interrupt(); // restore interrupted
status
+ };
InterceptingCacheMetricGroup metricGroup = new
InterceptingCacheMetricGroup();
- cacheLoader.open(metricGroup);
-
- // check interruption
- ExecutorService executorService = Executors.newSingleThreadExecutor();
- Future<?> future = executorService.submit(cacheLoader);
- executorService.shutdownNow(); // internally interrupts a thread
- assertThatNoException().isThrownBy(future::get); // wait for the end
- // check that we didn't process all elements, but reacted on
interruption
- assertThat(sleepCounter).hasValueLessThan(totalSleepCount);
+ CompletableFuture<Void> future;
+ try (InputFormatCacheLoader cacheLoader =
+ createCacheLoader(numSplits, DEFAULT_DELTA_NUM_SPLITS,
reloadAction)) {
+ cacheLoader.initializeMetrics(metricGroup);
+ future = cacheLoader.reloadAsync();
+ reloadLatch.await(5000, TimeUnit.MILLISECONDS);
+ }
+ // try-with-resources calls #close, which should wait for the end of
reload
+ assertThat(future.isDone()).isTrue();
+ assertThat(metricGroup.loadCounter.getCount()).isEqualTo(1);
Review Comment:
That's kind of unexpected: We're closing the loader by interrupting the
threads executing `InputSplitCacheLoadTask`. Is this actually where we want to
consider the cache loading to be a success? :thinking:
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -123,46 +126,47 @@ void testExceptionDuringReload() throws Exception {
() -> {
throw exception;
};
- InputFormatCacheLoader cacheLoader = createCacheLoader(0,
reloadAction);
- InterceptingCacheMetricGroup metricGroup = new
InterceptingCacheMetricGroup();
- cacheLoader.open(metricGroup);
- assertThatThrownBy(cacheLoader::run).hasRootCause(exception);
- assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1);
+ try (InputFormatCacheLoader cacheLoader =
+ createCacheLoader(DEFAULT_NUM_SPLITS,
DEFAULT_DELTA_NUM_SPLITS, reloadAction)) {
+ InterceptingCacheMetricGroup metricGroup = new
InterceptingCacheMetricGroup();
+ cacheLoader.initializeMetrics(metricGroup);
+ assertThatThrownBy(() -> run(cacheLoader)).hasRootCause(exception);
+
assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1);
+ }
}
- @Test
- void testCloseAndInterruptDuringReload() throws Exception {
- AtomicInteger sleepCounter = new AtomicInteger(0);
- int totalSleepCount = TestCacheLoader.DATA.size() + 1; // equals to
number of all rows
+ /**
+ * Cache loader creates additional threads in case of multiple input
splits. In both cases cache
+ * loader must correctly react on close and interrupt all threads.
+ */
+ @ParameterizedTest
+ @MethodSource("numSplits")
+ void testCloseDuringReload(int numSplits) throws Exception {
+ OneShotLatch reloadLatch = new OneShotLatch();
Runnable reloadAction =
- ThrowingRunnable.unchecked(
- () -> {
- sleepCounter.incrementAndGet();
- Thread.sleep(1000);
- });
- InputFormatCacheLoader cacheLoader = createCacheLoader(0,
reloadAction);
+ () -> {
+ reloadLatch.trigger();
+ assertThatThrownBy(() -> new OneShotLatch().await())
+ .as("Wait should be interrupted if everything
works ok")
+ .isInstanceOf(InterruptedException.class);
+ Thread.currentThread().interrupt(); // restore interrupted
status
+ };
InterceptingCacheMetricGroup metricGroup = new
InterceptingCacheMetricGroup();
- cacheLoader.open(metricGroup);
-
- // check interruption
- ExecutorService executorService = Executors.newSingleThreadExecutor();
- Future<?> future = executorService.submit(cacheLoader);
- executorService.shutdownNow(); // internally interrupts a thread
- assertThatNoException().isThrownBy(future::get); // wait for the end
- // check that we didn't process all elements, but reacted on
interruption
- assertThat(sleepCounter).hasValueLessThan(totalSleepCount);
+ CompletableFuture<Void> future;
+ try (InputFormatCacheLoader cacheLoader =
+ createCacheLoader(numSplits, DEFAULT_DELTA_NUM_SPLITS,
reloadAction)) {
+ cacheLoader.initializeMetrics(metricGroup);
+ future = cacheLoader.reloadAsync();
+ reloadLatch.await(5000, TimeUnit.MILLISECONDS);
Review Comment:
```suggestion
reloadLatch.await();
```
We agreed on not using timeouts in unit tests. This enables us to retrieve
the thread dump at the end of the test suite if a test actually times out (see
[docs](https://flink.apache.org/contributing/code-style-and-quality-common.html#avoid-timeouts-in-junit-tests)).
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java:
##########
@@ -71,58 +72,58 @@ public void open(Configuration parameters) throws Exception
{
}
@Override
- protected void reloadCache() throws Exception {
+ protected void updateCache() throws Exception {
InputSplit[] inputSplits = createInputSplits();
int numSplits = inputSplits.length;
+ int concurrencyLevel = getConcurrencyLevel(numSplits);
// load data into the another copy of cache
- // notice: it requires twice more memory, but on the other hand we
don't need any blocking
+ // notice: it requires twice more memory, but on the other hand we
don't need any blocking;
// cache has default initialCapacity and loadFactor, but overridden
concurrencyLevel
ConcurrentHashMap<RowData, Collection<RowData>> newCache =
- new ConcurrentHashMap<>(16, 0.75f,
getConcurrencyLevel(numSplits));
- this.cacheLoadTasks =
+ new ConcurrentHashMap<>(16, 0.75f, concurrencyLevel);
+ Deque<InputSplitCacheLoadTask> cacheLoadTasks =
Arrays.stream(inputSplits)
.map(split -> createCacheLoadTask(split, newCache))
- .collect(Collectors.toList());
- if (isStopped) {
- // check for cases when #close was called during reload before
creating cacheLoadTasks
- return;
- }
- // run first task or create numSplits threads to run all tasks
+ .collect(Collectors.toCollection(ArrayDeque::new));
+ // run first task and create concurrencyLevel - 1 threads to run
remaining tasks
ExecutorService cacheLoadTaskService = null;
try {
- if (numSplits > 1) {
- int numThreads = getConcurrencyLevel(numSplits);
- cacheLoadTaskService =
Executors.newFixedThreadPool(numThreads);
- ExecutorService finalCacheLoadTaskService =
cacheLoadTaskService;
- List<Future<?>> futures =
+ InputSplitCacheLoadTask firstTask = cacheLoadTasks.pop();
+ CompletableFuture<?> otherTasksFuture = null;
+ if (!cacheLoadTasks.isEmpty()) {
+ cacheLoadTaskService =
Executors.newFixedThreadPool(concurrencyLevel - 1);
+ ExecutorService finalExecutor = cacheLoadTaskService;
Review Comment:
What's the purpose of creating a local variable here that refers to
`cacheLoadTaskService`? :thinking:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]