rkhachatryan commented on code in PR #22046:
URL: https://github.com/apache/flink/pull/22046#discussion_r1681407852
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##########
@@ -198,4 +195,18 @@ public Collection<AllocationScore> calculateScore(
.map(e -> new AllocationScore(group.getId(), e.getKey(),
e.getValue()))
.collect(Collectors.toList());
}
+
+ private static long estimateSize(
+ KeyGroupRange newRange, VertexAllocationInformation allocation) {
+ KeyGroupRange oldRange = allocation.getKeyGroupRange();
+ if (allocation.stateSizeInBytes * oldRange.getNumberOfKeyGroups() ==
0) {
+ return 0L;
+ }
+ // round up to 1
+ long keyGroupSize =
Review Comment:
Yes, I think the state size of different subtasks might vary significantly
even if the number of key groups is the same, because of the hot keys/groups.
I've also remember some patterns of working with state where a single
subtask 0 handles all the state.
Besides that, with slot sharing, single slot can be reserved for multiple
tasks, having completely different state size and parallelism. Without state
size estimation, stateless high-parallelism tasks might outweigh state-heavy
low-parallelism tasks while scoring.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]