dmvk commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1117202797
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.OperatorIDPair; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.OperatorState; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.KeyedStateHandle; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toMap; + +/** Managed Keyed State size estimates used to make scheduling decisions. */ +class StateSizeEstimates { + private final Map<JobVertexID, Long> averages; + + public StateSizeEstimates() { + this(Collections.emptyMap()); + } + + public StateSizeEstimates(Map<JobVertexID, Long> averages) { + this.averages = averages; + } + + public Optional<Long> estimate(JobVertexID jobVertexId) { + return Optional.ofNullable(averages.get(jobVertexId)); + } + + static StateSizeEstimates empty() { + return new StateSizeEstimates(); + } + + static StateSizeEstimates fromGraph(@Nullable ExecutionGraph executionGraph) { + return Optional.ofNullable(executionGraph) + .flatMap(graph -> Optional.ofNullable(graph.getCheckpointCoordinator())) + .flatMap(coordinator -> Optional.ofNullable(coordinator.getCheckpointStore())) + .flatMap(store -> Optional.ofNullable(store.getLatestCheckpoint())) + .map( + cp -> + build( + fromCompletedCheckpoint(cp), + mapVerticesToOperators(executionGraph))) + .orElse(empty()); + } + + private static StateSizeEstimates build( + Map<OperatorID, Long> sizePerOperator, + Map<JobVertexID, Set<OperatorID>> verticesToOperators) { + Map<JobVertexID, Long> verticesToSizes = + verticesToOperators.entrySet().stream() + .collect( + toMap(Map.Entry::getKey, e -> size(e.getValue(), sizePerOperator))); + return new StateSizeEstimates(verticesToSizes); + } + + private static long size(Set<OperatorID> ids, Map<OperatorID, Long> sizes) { + return ids.stream() + .mapToLong(key -> sizes.getOrDefault(key, 0L)) + .boxed() + .reduce(Long::sum) + .orElse(0L); + } + + private static Map<JobVertexID, Set<OperatorID>> mapVerticesToOperators( + ExecutionGraph executionGraph) { + return executionGraph.getAllVertices().entrySet().stream() + .collect(toMap(Map.Entry::getKey, e -> getOperatorIDS(e.getValue()))); + } + + private static Set<OperatorID> getOperatorIDS(ExecutionJobVertex v) { + return v.getOperatorIDs().stream() + .map(OperatorIDPair::getGeneratedOperatorID) + .collect(Collectors.toSet()); + } + + private static Map<OperatorID, Long> fromCompletedCheckpoint(CompletedCheckpoint cp) { + Stream<Map.Entry<OperatorID, OperatorState>> states = + cp.getOperatorStates().entrySet().stream(); + Map<OperatorID, Long> estimates = + states.collect( + toMap(Map.Entry::getKey, e -> estimateKeyGroupStateSize(e.getValue()))); + return estimates; + } + + private static long estimateKeyGroupStateSize(OperatorState state) { + Stream<KeyedStateHandle> handles = + state.getSubtaskStates().values().stream() + .flatMap(s -> s.getManagedKeyedState().stream()); + Stream<Tuple2<Long, Integer>> sizeAndCount = + handles.map( + h -> + Tuple2.of( + h.getStateSize(), + h.getKeyGroupRange().getNumberOfKeyGroups())); + Optional<Tuple2<Long, Integer>> totalSizeAndCount = + sizeAndCount.reduce( + (left, right) -> Tuple2.of(left.f0 + right.f0, left.f1 + right.f1)); + Optional<Long> average = totalSizeAndCount.filter(t2 -> t2.f1 > 0).map(t2 -> t2.f0 / t2.f1); + return average.orElse(0L); Review Comment: > But ideally, we don't aggregate across all subtasks: it's possible to query by KeyGroupRange and then compute some aggregate for it (I'd implement this in a separate PR though, to keep this one smaller). Agreed, we shouldn't further evolve this PR in this regard. > In fact, I think it's more susceptible to suboptimal decisions. Can you expand on that? My feeling is that there few extreme cases: 1) one large state + one small state (this is IMO quite common, e.g. buffer + watermark hold) 2) multiple with the approximately same size Assumption: it's better to move a smaller state than a big one; For 1) Max performs better because the small state would average the big one out, even though the price of moving the small one is insignificant; For 2) both would perform approximately the same Are there any other cases you have in mind? My problem with percentiles is that it'd actually ignore the worst cases that are most costly to move/split/merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
