This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 22241080a8013296a95c2b18cc17d5148befad4f
Author: Stefan Richter <[email protected]>
AuthorDate: Tue Jan 16 15:22:19 2024 +0100

    [FLINK-34134] Introduce size/location statistics reporting to StateObject 
interface.
---
 .../apache/flink/runtime/state/StateObject.java    | 57 ++++++++++++++++++++++
 1 file changed, 57 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
index 8eaa1d68921..5aadebc5cd7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state;
 
 import java.io.Serializable;
+import java.util.EnumMap;
 
 /**
  * Base of all handles that represent checkpointed state in some form. The 
object may hold the
@@ -63,4 +64,60 @@ public interface StateObject extends Serializable {
      * @return Size of the state in bytes.
      */
     long getStateSize();
+
+    /**
+     * Collects statistics about state size and location from the state object.
+     *
+     * @implNote default implementation reports {@link 
StateObject#getStateSize()} as size and
+     *     {@link StateObjectLocation#UNKNOWN} as location.
+     * @param collector the statistics collector.
+     */
+    default void collectSizeStats(StateObjectSizeStatsCollector collector) {
+        collector.add(StateObjectLocation.UNKNOWN, getStateSize());
+    }
+
+    /** Enum for state locations. */
+    enum StateObjectLocation {
+        LOCAL_MEMORY,
+        LOCAL_DISK,
+        REMOTE,
+        UNKNOWN,
+    }
+
+    /**
+     * Collector for size and location stats from a state object via {@link
+     * StateObject#collectSizeStats(StateObjectSizeStatsCollector)}.
+     */
+    final class StateObjectSizeStatsCollector {
+        private final EnumMap<StateObjectLocation, Long> stats;
+
+        private StateObjectSizeStatsCollector() {
+            stats = new EnumMap<>(StateObjectLocation.class);
+        }
+
+        public void add(StateObjectLocation key, long value) {
+            stats.compute(
+                    key,
+                    (k, v) -> {
+                        if (v != null) {
+                            return v + value;
+                        } else {
+                            return value;
+                        }
+                    });
+        }
+
+        public EnumMap<StateObjectLocation, Long> getStats() {
+            return stats;
+        }
+
+        public static StateObjectSizeStatsCollector create() {
+            return new StateObjectSizeStatsCollector();
+        }
+
+        @Override
+        public String toString() {
+            return "StateObjectSizeStatsCollector{" + "stats=" + stats + '}';
+        }
+    }
 }

Reply via email to