SmirAlex commented on code in PR #20919:
URL: https://github.com/apache/flink/pull/20919#discussion_r1084842440
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java:
##########
@@ -71,56 +72,56 @@ public void open(Configuration parameters) throws Exception
{
}
@Override
- protected void reloadCache() throws Exception {
+ protected boolean updateCache() throws Exception {
InputSplit[] inputSplits = createInputSplits();
int numSplits = inputSplits.length;
+ int concurrencyLevel = getConcurrencyLevel(numSplits);
// load data into the another copy of cache
- // notice: it requires twice more memory, but on the other hand we
don't need any blocking
+ // notice: it requires twice more memory, but on the other hand we
don't need any blocking;
// cache has default initialCapacity and loadFactor, but overridden
concurrencyLevel
ConcurrentHashMap<RowData, Collection<RowData>> newCache =
- new ConcurrentHashMap<>(16, 0.75f,
getConcurrencyLevel(numSplits));
- this.cacheLoadTasks =
+ new ConcurrentHashMap<>(16, 0.75f, concurrencyLevel);
+ Deque<InputSplitCacheLoadTask> cacheLoadTasks =
Arrays.stream(inputSplits)
.map(split -> createCacheLoadTask(split, newCache))
- .collect(Collectors.toList());
- if (isStopped) {
- // check for cases when #close was called during reload before
creating cacheLoadTasks
- return;
- }
- // run first task or create numSplits threads to run all tasks
+ .collect(Collectors.toCollection(ArrayDeque::new));
+ // run first task and create concurrencyLevel - 1 threads to run
remaining tasks
ExecutorService cacheLoadTaskService = null;
+ boolean wasInterrupted;
Review Comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]