[ 
https://issues.apache.org/jira/browse/FLINK-30354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-30354:
----------------------------------
    Description: 
In the course of reviewing FLINK-29405, I came up with a proposal to reduce the 
complexity of the {{LookupFullCache}} implementation and shrinking the amount 
of threadpools being used from 3 to 2. Here's the proposal I also shared in the 
[FLINK-29405 PR 
comment|https://github.com/apache/flink/pull/20919#pullrequestreview-1208332584]:

About the responsibilities how I see them:
* {{LookupFullCache}} is the composite class for combining the {{CacheLoader}} 
and the {{CacheReloadTrigger}} through the {{ReloadTriggerContext}}
* {{ReloadTriggerContext}} provides an async call to trigger the reload but 
also some utility methods for providing processing or event time (where it's 
not clear to me why this is connected with the reload. It looks like a future 
task based on the TODO comments)
* {{CacheLoader}} is in charge of loading the data into memory (if possible 
concurrently).
{{CacheReloadTrigger}} provides different strategies to trigger new reloads.

About the different executors:
* The {{CacheReloadTrigger}} utilize a {{SingleThreadScheduledExecutor}} which 
triggers {{ReloadTriggerContext::reload}} subsequently. If the loading takes 
longer, subsequently triggered calls pile up. Here, I'm wondering whether 
that's what we want. thinking
* {{CacheLoader}} utilizes a {{SingleThreadExecutor}} in 
{{CacheLoader#reloadExecutor}} which is kind of the "main" thread for reloading 
the data. It triggers {{CacheLoader#updateCache}} with 
{{CacheLoader#reloadLock}} being acquired. 
{{(InputFormat)CacheLoader#updateCache}} is implemented synchronously. The data 
is loaded concurrently if possible using a {{FixedThreadPool}}.

My proposal is now to reduce the number of used thread pools: Instead of having 
a {{SingleThreadExecutor}} and a {{FixedThreadPool}} in the {{CacheLoader}} 
implementation, couldn't we come up with a custom {{ThreadPoolExecutor}} where 
we specify the minimum number of threads being 1 and the maximum being the 
number of cores (similar to what is already there with 
[ThreadUtils#newThreadPool|https://github.com/apache/flink/blob/d067629d4d200f940d0b58759459d7ff5832b292/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/utils/ThreadUtils.java#L36]).
 That would free the {{CacheLoader}} from starting and shutting down thread 
pools by moving its ownership from {{CacheLoader}} to {{LookupFullCache}} 
calling it the {{cacheLoadingThreadPool}} (or similar). Additionally, the 
{{ScheduledThreadPool}} currently living in the {{CacheReloadTrigger}} 
implementations could move into {{LookupFullCache}} as well calling it 
something like {{cacheLoadSchedulingThreadPool}}. LookupFullCache would be in 
charge of managing all cache loading-related threads. Additionally, it would 
manage the current execution through {{CompletableFutures}} (one for triggering 
the reload and one for executing the reload. Triggering a reload would require 
cancelling the current future (if it's not completed, yet) or ignoring the 
trigger if we want a reload to finish before triggering a new one.

{{CacheLoader#updateCache}} would become 
{{CacheLoader#updateCacheAsync(ExecutorService)}} returning a 
{{CompletableFuture}} that completes as soon as all subtasks are completed. 
{{CacheLoader#reloadAsync}} would return this {{CompletableFuture}} instead of 
creating its own future. The lifecycle (as already explained in the previous 
paragraph) would be managed by {{LookupFullCache}}. The benefit would be that 
we wouldn't have to deal interrupts in {{CacheLoader}}.

I see the following benefits:
* {{ReloadtriggerContext}} becomes obsolete (one has to clarify what the event 
time and processing time functions are for, though).
* {{CacheLoader#awaitFirstLoad}} becomes obsolete as well. We can verify the 
completion of the cache loading in {{LookupFullCache}} through the 
{{CompletableFuture}} instances.
* {{CacheReloadTrigger}} can focus on the strategy implementation without 
worrying about instantiating threads. This is duplicated code right now in 
{{PeriodicCacheReloadTrigger}} and {{TimedCacheReloadTrigger}}.

  was:
In the course of reviewing FLINK-29405, I came up with a proposal to reduce the 
complexity of the {{LookupFullCache}} implementation and shrinking the amount 
of threadpools being used from 3 to 2. Here's the proposal I also shared in the 
[FLINK-29405 PR 
comment|https://github.com/apache/flink/pull/20919#pullrequestreview-1208332584]:

About the responsibilities how I see them:
* {{LookupFullCache}} is the composite class for combining the {{CacheLoader}} 
and the {{CacheReloadTrigger}} through the {{ReloadTriggerContext}}
* {{ReloadTriggerContext}} provides an async call to trigger the reload but 
also some utility methods for providing processing or event time (where it's 
not clear to me why this is connected with the reload. It looks like a future 
task based on the TODO comments)
* {{CacheLoader}} is in charge of loading the data into memory (if possible 
concurrently).
{{CacheReloadTrigger}} provides different strategies to trigger new reloads.

About the different executors:
* The {{CacheReloadTrigger}} utilize a {{SingleThreadScheduledExecutor}} which 
triggers {{ReloadTriggerContext::reload}} subsequently. If the loading takes 
longer, subsequently triggered calls pile up. Here, I'm wondering whether 
that's what we want. thinking
* {{CacheLoader}} utilizes a {{SingleThreadExecutor}} in 
{{CacheLoader#reloadExecutor}} which is kind of the "main" thread for reloading 
the data. It triggers {{CacheLoader#updateCache}} with 
{{CacheLoader#reloadLock}} being acquired. 
{{(InputFormat)CacheLoader#updateCache}} is implemented synchronously. The data 
is loaded concurrently if possible using a {{FixedThreadPool}}.

My proposal is now to reduce the number of used thread pools: Instead of having 
a {{SingleThreadExecutor}} and a {{FixedThreadPool}} in the {{CacheLoader}} 
implementation, couldn't we come up with a custom {{ThreadPoolExecutor}} where 
we specify the minimum number of threads being 1 and the maximum being the 
number of cores (similar to what is already there with 
[ThreadUtils#newThreadPool|https://github.com/apache/flink/blob/d067629d4d200f940d0b58759459d7ff5832b292/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/utils/ThreadUtils.java#L36]).
 That would free the {{CacheLoader}} from starting and shutting down thread 
pools by moving its ownership from {{CacheLoader}} to {{LookupFullCache}} 
calling it the {{cacheLoadingThreadPool}} (or similar). Additionally, the 
{{ScheduledThreadPool}} currently living in the {{CacheReloadTrigger}} 
implementations could move into {{LookupFullCache}} as well calling it 
something like {{cacheLoadSchedulingThreadPool}}. LookupFullCache would be in 
charge of managing all cache loading-related threads. Additionally, it would 
manage the current execution through {{CompletableFutures}} (one for triggering 
the reload and one for executing the reload. Triggering a reload would require 
cancelling the current future (if it's not completed, yet) or ignoring the 
trigger if we want a reload to finish before triggering a new one.

{{CacheLoader#updateCache}} would become 
{{CacheLoader#updateCacheAsync(ExecutorService)}} returning a 
{{CompletableFuture}} that completes as soon as all subtasks are completed. 
{{CacheLoader#reloadAsync}} would return this {{CompletableFuture}} instead of 
creating its own future. The lifecycle (as already explained in the previous 
paragraph) would be managed by {{LookupFullCache}}. The benefit would be that 
we wouldn't have to deal interrupts in {{CacheLoader}}.

I see the following benefits:

{{ReloadtriggerContext}} becomes obsolete (one has to clarify what the event 
time and processing time functions are for, though).
{{CacheLoader#awaitFirstLoad}} becomes obsolete as well. We can verify the 
completion of the cache loading in {{LookupFullCache}} through the 
{{CompletableFuture}} instances.
{{CacheReloadTrigger}} can focus on the strategy implementation without 
worrying about instantiating threads. This is duplicated code right now in 
{{PeriodicCacheReloadTrigger}} and {{TimedCacheReloadTrigger}}.
I might miss something here. I'm curious what you think. I probably got carried 
away a bit by your proposal introducing async calls. innocent I totally 
understand if you argue that it's way too much out-of-scope for this issue and 
we actually want to focus on fixing the test instability. In that case, I would 
do another round of review on your current proposal. But I'm happy to help you 
if you think that my proposal is reasonable. Or we create a follow-up Jira 
issue to tackle that.


> Reducing the number of ThreadPools in LookupFullCache and related 
> cache-loading classes
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-30354
>                 URL: https://issues.apache.org/jira/browse/FLINK-30354
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Runtime
>    Affects Versions: 1.17.0
>            Reporter: Matthias Pohl
>            Priority: Major
>
> In the course of reviewing FLINK-29405, I came up with a proposal to reduce 
> the complexity of the {{LookupFullCache}} implementation and shrinking the 
> amount of threadpools being used from 3 to 2. Here's the proposal I also 
> shared in the [FLINK-29405 PR 
> comment|https://github.com/apache/flink/pull/20919#pullrequestreview-1208332584]:
> About the responsibilities how I see them:
> * {{LookupFullCache}} is the composite class for combining the 
> {{CacheLoader}} and the {{CacheReloadTrigger}} through the 
> {{ReloadTriggerContext}}
> * {{ReloadTriggerContext}} provides an async call to trigger the reload but 
> also some utility methods for providing processing or event time (where it's 
> not clear to me why this is connected with the reload. It looks like a future 
> task based on the TODO comments)
> * {{CacheLoader}} is in charge of loading the data into memory (if possible 
> concurrently).
> {{CacheReloadTrigger}} provides different strategies to trigger new reloads.
> About the different executors:
> * The {{CacheReloadTrigger}} utilize a {{SingleThreadScheduledExecutor}} 
> which triggers {{ReloadTriggerContext::reload}} subsequently. If the loading 
> takes longer, subsequently triggered calls pile up. Here, I'm wondering 
> whether that's what we want. thinking
> * {{CacheLoader}} utilizes a {{SingleThreadExecutor}} in 
> {{CacheLoader#reloadExecutor}} which is kind of the "main" thread for 
> reloading the data. It triggers {{CacheLoader#updateCache}} with 
> {{CacheLoader#reloadLock}} being acquired. 
> {{(InputFormat)CacheLoader#updateCache}} is implemented synchronously. The 
> data is loaded concurrently if possible using a {{FixedThreadPool}}.
> My proposal is now to reduce the number of used thread pools: Instead of 
> having a {{SingleThreadExecutor}} and a {{FixedThreadPool}} in the 
> {{CacheLoader}} implementation, couldn't we come up with a custom 
> {{ThreadPoolExecutor}} where we specify the minimum number of threads being 1 
> and the maximum being the number of cores (similar to what is already there 
> with 
> [ThreadUtils#newThreadPool|https://github.com/apache/flink/blob/d067629d4d200f940d0b58759459d7ff5832b292/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/utils/ThreadUtils.java#L36]).
>  That would free the {{CacheLoader}} from starting and shutting down thread 
> pools by moving its ownership from {{CacheLoader}} to {{LookupFullCache}} 
> calling it the {{cacheLoadingThreadPool}} (or similar). Additionally, the 
> {{ScheduledThreadPool}} currently living in the {{CacheReloadTrigger}} 
> implementations could move into {{LookupFullCache}} as well calling it 
> something like {{cacheLoadSchedulingThreadPool}}. LookupFullCache would be in 
> charge of managing all cache loading-related threads. Additionally, it would 
> manage the current execution through {{CompletableFutures}} (one for 
> triggering the reload and one for executing the reload. Triggering a reload 
> would require cancelling the current future (if it's not completed, yet) or 
> ignoring the trigger if we want a reload to finish before triggering a new 
> one.
> {{CacheLoader#updateCache}} would become 
> {{CacheLoader#updateCacheAsync(ExecutorService)}} returning a 
> {{CompletableFuture}} that completes as soon as all subtasks are completed. 
> {{CacheLoader#reloadAsync}} would return this {{CompletableFuture}} instead 
> of creating its own future. The lifecycle (as already explained in the 
> previous paragraph) would be managed by {{LookupFullCache}}. The benefit 
> would be that we wouldn't have to deal interrupts in {{CacheLoader}}.
> I see the following benefits:
> * {{ReloadtriggerContext}} becomes obsolete (one has to clarify what the 
> event time and processing time functions are for, though).
> * {{CacheLoader#awaitFirstLoad}} becomes obsolete as well. We can verify the 
> completion of the cache loading in {{LookupFullCache}} through the 
> {{CompletableFuture}} instances.
> * {{CacheReloadTrigger}} can focus on the strategy implementation without 
> worrying about instantiating threads. This is duplicated code right now in 
> {{PeriodicCacheReloadTrigger}} and {{TimedCacheReloadTrigger}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to