This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 22241080a8013296a95c2b18cc17d5148befad4f Author: Stefan Richter <[email protected]> AuthorDate: Tue Jan 16 15:22:19 2024 +0100 [FLINK-34134] Introduce size/location statistics reporting to StateObject interface. --- .../apache/flink/runtime/state/StateObject.java | 57 ++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java index 8eaa1d68921..5aadebc5cd7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.state; import java.io.Serializable; +import java.util.EnumMap; /** * Base of all handles that represent checkpointed state in some form. The object may hold the @@ -63,4 +64,60 @@ public interface StateObject extends Serializable { * @return Size of the state in bytes. */ long getStateSize(); + + /** + * Collects statistics about state size and location from the state object. + * + * @implNote default implementation reports {@link StateObject#getStateSize()} as size and + * {@link StateObjectLocation#UNKNOWN} as location. + * @param collector the statistics collector. + */ + default void collectSizeStats(StateObjectSizeStatsCollector collector) { + collector.add(StateObjectLocation.UNKNOWN, getStateSize()); + } + + /** Enum for state locations. */ + enum StateObjectLocation { + LOCAL_MEMORY, + LOCAL_DISK, + REMOTE, + UNKNOWN, + } + + /** + * Collector for size and location stats from a state object via {@link + * StateObject#collectSizeStats(StateObjectSizeStatsCollector)}. + */ + final class StateObjectSizeStatsCollector { + private final EnumMap<StateObjectLocation, Long> stats; + + private StateObjectSizeStatsCollector() { + stats = new EnumMap<>(StateObjectLocation.class); + } + + public void add(StateObjectLocation key, long value) { + stats.compute( + key, + (k, v) -> { + if (v != null) { + return v + value; + } else { + return value; + } + }); + } + + public EnumMap<StateObjectLocation, Long> getStats() { + return stats; + } + + public static StateObjectSizeStatsCollector create() { + return new StateObjectSizeStatsCollector(); + } + + @Override + public String toString() { + return "StateObjectSizeStatsCollector{" + "stats=" + stats + '}'; + } + } }
