XComp commented on code in PR #20919:
URL: https://github.com/apache/flink/pull/20919#discussion_r1084051452
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java:
##########
@@ -71,56 +72,56 @@ public void open(Configuration parameters) throws Exception
{
}
@Override
- protected void reloadCache() throws Exception {
+ protected boolean updateCache() throws Exception {
InputSplit[] inputSplits = createInputSplits();
int numSplits = inputSplits.length;
+ int concurrencyLevel = getConcurrencyLevel(numSplits);
// load data into the another copy of cache
- // notice: it requires twice more memory, but on the other hand we
don't need any blocking
+ // notice: it requires twice more memory, but on the other hand we
don't need any blocking;
// cache has default initialCapacity and loadFactor, but overridden
concurrencyLevel
ConcurrentHashMap<RowData, Collection<RowData>> newCache =
- new ConcurrentHashMap<>(16, 0.75f,
getConcurrencyLevel(numSplits));
- this.cacheLoadTasks =
+ new ConcurrentHashMap<>(16, 0.75f, concurrencyLevel);
+ Deque<InputSplitCacheLoadTask> cacheLoadTasks =
Arrays.stream(inputSplits)
.map(split -> createCacheLoadTask(split, newCache))
- .collect(Collectors.toList());
- if (isStopped) {
- // check for cases when #close was called during reload before
creating cacheLoadTasks
- return;
- }
- // run first task or create numSplits threads to run all tasks
+ .collect(Collectors.toCollection(ArrayDeque::new));
+ // run first task and create concurrencyLevel - 1 threads to run
remaining tasks
ExecutorService cacheLoadTaskService = null;
+ boolean wasInterrupted;
Review Comment:
```suggestion
boolean wasInterrupted = false;
```
I know that `false` is the default value but for readability purposes it's
good to make this explicit, I think.
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/CachingLookupFunction.java:
##########
@@ -107,12 +107,12 @@ public void open(FunctionContext context) throws
Exception {
numLoadFailuresCounter = new SimpleCounter();
cacheMetricGroup.numLoadFailuresCounter(numLoadFailuresCounter);
}
- // Initialize cache and the delegating function
- cache.open(cacheMetricGroup);
if (cache instanceof LookupFullCache) {
// TODO add Configuration into FunctionContext
- ((LookupFullCache) cache).open(new Configuration());
+ ((LookupFullCache) cache).setParameters(new Configuration());
Review Comment:
What value does this call bring here? Can't we get rid of the if block
entirely? We're setting an empty Conflguration here which is already the
default within `CachingLookupFunction`. I don't see
`LookupFullCache.setParameter` being called anywhere else? :thinking:
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/CachingLookupFunction.java:
##########
@@ -107,12 +107,12 @@ public void open(FunctionContext context) throws
Exception {
numLoadFailuresCounter = new SimpleCounter();
cacheMetricGroup.numLoadFailuresCounter(numLoadFailuresCounter);
}
- // Initialize cache and the delegating function
- cache.open(cacheMetricGroup);
if (cache instanceof LookupFullCache) {
// TODO add Configuration into FunctionContext
Review Comment:
I know that this is not part of the current PR. But just as a side-note:
It's better to create follow-up Jira issues rather than hiding planned changes
in a TODO comment.
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java:
##########
@@ -71,58 +72,58 @@ public void open(Configuration parameters) throws Exception
{
}
@Override
- protected void reloadCache() throws Exception {
+ protected void updateCache() throws Exception {
InputSplit[] inputSplits = createInputSplits();
int numSplits = inputSplits.length;
+ int concurrencyLevel = getConcurrencyLevel(numSplits);
// load data into the another copy of cache
- // notice: it requires twice more memory, but on the other hand we
don't need any blocking
+ // notice: it requires twice more memory, but on the other hand we
don't need any blocking;
// cache has default initialCapacity and loadFactor, but overridden
concurrencyLevel
ConcurrentHashMap<RowData, Collection<RowData>> newCache =
- new ConcurrentHashMap<>(16, 0.75f,
getConcurrencyLevel(numSplits));
- this.cacheLoadTasks =
+ new ConcurrentHashMap<>(16, 0.75f, concurrencyLevel);
+ Deque<InputSplitCacheLoadTask> cacheLoadTasks =
Arrays.stream(inputSplits)
.map(split -> createCacheLoadTask(split, newCache))
- .collect(Collectors.toList());
- if (isStopped) {
- // check for cases when #close was called during reload before
creating cacheLoadTasks
- return;
- }
- // run first task or create numSplits threads to run all tasks
+ .collect(Collectors.toCollection(ArrayDeque::new));
+ // run first task and create concurrencyLevel - 1 threads to run
remaining tasks
ExecutorService cacheLoadTaskService = null;
try {
- if (numSplits > 1) {
- int numThreads = getConcurrencyLevel(numSplits);
- cacheLoadTaskService =
Executors.newFixedThreadPool(numThreads);
- ExecutorService finalCacheLoadTaskService =
cacheLoadTaskService;
- List<Future<?>> futures =
+ InputSplitCacheLoadTask firstTask = cacheLoadTasks.pop();
+ CompletableFuture<?> otherTasksFuture = null;
+ if (!cacheLoadTasks.isEmpty()) {
+ cacheLoadTaskService =
Executors.newFixedThreadPool(concurrencyLevel - 1);
+ ExecutorService finalExecutor = cacheLoadTaskService;
Review Comment:
ah true, I overlooked that one. 8)
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/LookupFullCache.java:
##########
@@ -109,7 +116,9 @@ public long size() {
@Override
public void close() throws Exception {
- reloadTrigger.close(); // firstly try to interrupt reload thread
+ // in default triggers shutdowns scheduled thread pool used for
periodic triggers of reloads
Review Comment:
```suggestion
// stops scheduled thread pool that's responsible for scheduling
cache updates
```
Just as a proposal
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/LookupFullCache.java:
##########
@@ -109,7 +116,9 @@ public long size() {
@Override
public void close() throws Exception {
- reloadTrigger.close(); // firstly try to interrupt reload thread
+ // in default triggers shutdowns scheduled thread pool used for
periodic triggers of reloads
+ reloadTrigger.close();
+ // shutdowns the reload thread and interrupts active reload task, if
present
Review Comment:
```suggestion
// stops thread pool that's responsible for executing the actual
cache update
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]