SmirAlex commented on code in PR #20919:
URL: https://github.com/apache/flink/pull/20919#discussion_r1066988174
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java:
##########
@@ -71,58 +72,58 @@ public void open(Configuration parameters) throws Exception
{
}
@Override
- protected void reloadCache() throws Exception {
+ protected void updateCache() throws Exception {
InputSplit[] inputSplits = createInputSplits();
int numSplits = inputSplits.length;
+ int concurrencyLevel = getConcurrencyLevel(numSplits);
// load data into the another copy of cache
- // notice: it requires twice more memory, but on the other hand we
don't need any blocking
+ // notice: it requires twice more memory, but on the other hand we
don't need any blocking;
// cache has default initialCapacity and loadFactor, but overridden
concurrencyLevel
ConcurrentHashMap<RowData, Collection<RowData>> newCache =
- new ConcurrentHashMap<>(16, 0.75f,
getConcurrencyLevel(numSplits));
- this.cacheLoadTasks =
+ new ConcurrentHashMap<>(16, 0.75f, concurrencyLevel);
+ Deque<InputSplitCacheLoadTask> cacheLoadTasks =
Arrays.stream(inputSplits)
.map(split -> createCacheLoadTask(split, newCache))
- .collect(Collectors.toList());
- if (isStopped) {
- // check for cases when #close was called during reload before
creating cacheLoadTasks
- return;
- }
- // run first task or create numSplits threads to run all tasks
+ .collect(Collectors.toCollection(ArrayDeque::new));
+ // run first task and create concurrencyLevel - 1 threads to run
remaining tasks
ExecutorService cacheLoadTaskService = null;
try {
- if (numSplits > 1) {
- int numThreads = getConcurrencyLevel(numSplits);
- cacheLoadTaskService =
Executors.newFixedThreadPool(numThreads);
- ExecutorService finalCacheLoadTaskService =
cacheLoadTaskService;
- List<Future<?>> futures =
+ InputSplitCacheLoadTask firstTask = cacheLoadTasks.pop();
+ CompletableFuture<?> otherTasksFuture = null;
+ if (!cacheLoadTasks.isEmpty()) {
+ cacheLoadTaskService =
Executors.newFixedThreadPool(concurrencyLevel - 1);
+ ExecutorService finalExecutor = cacheLoadTaskService;
Review Comment:
Otherwise it's impossible to refer to `cacheLoadTaskService` in subsequent
lambda because this variable is not final
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]