XComp commented on code in PR #20919:
URL: https://github.com/apache/flink/pull/20919#discussion_r1038080578
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java:
##########
@@ -107,7 +109,11 @@ protected void reloadCache() throws Exception {
} catch (InterruptedException ignored) { // we use interrupt to close
reload thread
} finally {
if (cacheLoadTaskService != null) {
+ // if main cache reload thread encountered an exception,
+ // it interrupts underlying InputSplitCacheLoadTasks threads
cacheLoadTaskService.shutdownNow();
Review Comment:
I just noticed that there's also `ExecutorUtils.gracefulShutdown`. Maybe,
that would be worth using it as it would also include proper logging in case of
an error.
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -149,19 +155,33 @@ void testCloseAndInterruptDuringReload() throws Exception
{
Future<?> future = executorService.submit(cacheLoader);
executorService.shutdownNow(); // internally interrupts a thread
assertThatNoException().isThrownBy(future::get); // wait for the end
- // check that we didn't process all elements, but reacted on
interruption
- assertThat(sleepCounter).hasValueLessThan(totalSleepCount);
assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(0);
+ }
- sleepCounter.set(0);
+ @Test
+ void testCloseDuringReload() throws Exception {
+ AtomicInteger recordsCounter = new AtomicInteger(0);
+ int totalRecords =
TestCacheLoader.DATA.values().stream().mapToInt(Collection::size).sum();
+ CountDownLatch latch = new CountDownLatch(1);
+ Runnable reloadAction =
+ ThrowingRunnable.unchecked(
+ () -> {
+ recordsCounter.incrementAndGet();
+ latch.await();
Review Comment:
Doing that leads to a deadlock. Even just adding `Thread.sleep` shortly
before closing the cache loader makes the test run into a deadlock because
[CacheLoader:101](https://github.com/apache/flink/blob/340b100f2de5e0d90ba475aa8a00e359a61442ce/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/CacheLoader.java#L101)
acquires the `CacheLoader.reloadLock` and initiates the reloading of the cache
synchronously while having the lock acquired. The data reload blocks due to the
`CountDownLatch` we use in the test. The subsequent call of close triggers the
shutdown of the `InputSplitCacheLoadTask` instances through
`InputSplitCacheLoadTask#stopRunning`. But the shutdown of those tasks will
never be done because the data loading is still blocked and the tasks are stuck
in the [while
loop](https://github.com/apache/flink/blob/340b100f2de5e0d90ba475aa8a00e359a61442ce/flink-table/flink-table-runtime/src/main/java/org/apache/flink/tabl
e/runtime/functions/table/lookup/fullcache/inputformat/InputSplitCacheLoadTask.java#L71).
[InputFormatCacheLoader#close](https://github.com/apache/flink/blob/a024a366f73f822bb4fd35db737ac2b8177f6b25/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java#L129)
will continue with calling `CacheLoader#close` which tries to acquire the lock
that's already acquired by the thread that executes `InputFormatCacheLoader`
and we end up in a deadlock.
My suspicion is that the test as it is pushed right now is passing because
we're calling close before the data reload is initiated. AFAIU, that's not what
we want to test, is it? Please correct me if I'm wrong here.
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -131,15 +133,19 @@ void testExceptionDuringReload() throws Exception {
}
@Test
- void testCloseAndInterruptDuringReload() throws Exception {
- AtomicInteger sleepCounter = new AtomicInteger(0);
- int totalSleepCount = TestCacheLoader.DATA.size() + 1; // equals to
number of all rows
+ void testInterruptDuringReload() throws Exception {
+ CountDownLatch recordsProcessingLatch = new CountDownLatch(1);
Runnable reloadAction =
- ThrowingRunnable.unchecked(
- () -> {
- sleepCounter.incrementAndGet();
- Thread.sleep(1000);
- });
+ () -> {
+ try {
+ // wait should be interrupted if everything works ok
+ if (!recordsProcessingLatch.await(5,
TimeUnit.SECONDS)) {
+ throw new RuntimeException("timeout");
+ }
Review Comment:
```suggestion
assertThatThrownBy(recordsProcessingLatch::await)
.as("wait should be interrupted if everything
works ok")
.isInstanceOf(InterruptedException.class);
Thread.currentThread().interrupt(); // restore
interrupted status
```
I guess, we could get rid of the 5 seconds here. By waiting forever enables
us to generate the thread dump at the end which gives more insights into what
went wrong during the test execution.
nit: I played around with the assertj API a bit more and utilized the
comment as a assertion message. WDYT?
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -149,19 +155,33 @@ void testCloseAndInterruptDuringReload() throws Exception
{
Future<?> future = executorService.submit(cacheLoader);
executorService.shutdownNow(); // internally interrupts a thread
assertThatNoException().isThrownBy(future::get); // wait for the end
- // check that we didn't process all elements, but reacted on
interruption
- assertThat(sleepCounter).hasValueLessThan(totalSleepCount);
assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(0);
+ }
- sleepCounter.set(0);
+ @Test
+ void testCloseDuringReload() throws Exception {
+ AtomicInteger recordsCounter = new AtomicInteger(0);
+ int totalRecords =
TestCacheLoader.DATA.values().stream().mapToInt(Collection::size).sum();
+ CountDownLatch latch = new CountDownLatch(1);
+ Runnable reloadAction =
+ ThrowingRunnable.unchecked(
+ () -> {
+ recordsCounter.incrementAndGet();
+ latch.await();
Review Comment:
```suggestion
reloadActionReachedTrigger.trigger();
latch.await();
```
Don't we have to add something like a `OneShotLatch#trigger` here and we
wait for it before calling `cacheLoader.close()` later in the test. That way,
we make sure that we're actually at the stage where the reload tasks are
instantiated before closing the loader.
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java:
##########
@@ -107,7 +109,11 @@ protected void reloadCache() throws Exception {
} catch (InterruptedException ignored) { // we use interrupt to close
reload thread
} finally {
if (cacheLoadTaskService != null) {
+ // if main cache reload thread encountered an exception,
+ // it interrupts underlying InputSplitCacheLoadTasks threads
cacheLoadTaskService.shutdownNow();
Review Comment:
yeah, I was just curious about your opinion on the usage of common pool. It
felt like implementing it asynchronously would have worked here. I just
struggled to think of a better way to enable thread loading based on the number
of `InputSplits` to optimize the resource utilization per cache loading cycle.
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -149,19 +155,33 @@ void testCloseAndInterruptDuringReload() throws Exception
{
Future<?> future = executorService.submit(cacheLoader);
executorService.shutdownNow(); // internally interrupts a thread
assertThatNoException().isThrownBy(future::get); // wait for the end
- // check that we didn't process all elements, but reacted on
interruption
- assertThat(sleepCounter).hasValueLessThan(totalSleepCount);
assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(0);
+ }
- sleepCounter.set(0);
+ @Test
+ void testCloseDuringReload() throws Exception {
+ AtomicInteger recordsCounter = new AtomicInteger(0);
+ int totalRecords = TestCacheLoader.DATA.size() + 1; // 1 key with 2
records
+ CountDownLatch latch = new CountDownLatch(1);
+ Runnable reloadAction =
+ ThrowingRunnable.unchecked(
+ () -> {
+ recordsCounter.incrementAndGet();
+ latch.await();
+ });
+ InputFormatCacheLoader cacheLoader = createCacheLoader(0,
reloadAction);
+ InterceptingCacheMetricGroup metricGroup = new
InterceptingCacheMetricGroup();
+ cacheLoader.open(metricGroup);
// check closing
- executorService = Executors.newSingleThreadExecutor();
- future = executorService.submit(cacheLoader);
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ Future<?> future = executorService.submit(cacheLoader);
cacheLoader.close();
- assertThatNoException().isThrownBy(future::get); // wait for the end
+ latch.countDown();
+ future.get(); // wait for the end
+ executorService.shutdown();
Review Comment:
Probably, you're right. I checked how we dealt with threads in other cases.
But since the future itself completed already, it should be fine. My initial
concern was that the thread itself doesn't finalize properly before the test is
passed and that it could mingle around while other tests are executed. But
looks like that's not an issue.
What we could do, though, is to add a try/finally block to make sure that
shutdown is also triggered if something else causes an exception to be thrown
to handle proper shutdown of the thread in that case
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]