fredia commented on code in PR #25837:
URL: https://github.com/apache/flink/pull/25837#discussion_r2006771109


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/MetricsTrackingStateFactory.java:
##########
@@ -34,35 +35,47 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-/** Factory to create {@link AbstractLatencyTrackState}. */
-public class LatencyTrackingStateFactory<
+/** Factory to create {@link AbstractMetricsTrackState}. */
+public class MetricsTrackingStateFactory<
         K, N, V, S extends State, IS extends InternalKvState<K, N, ?>> {
 
     private final InternalKvState<K, N, ?> kvState;
     private final StateDescriptor<S, V> stateDescriptor;
     private final LatencyTrackingStateConfig latencyTrackingStateConfig;
+    private final SizeTrackingStateConfig sizeTrackingStateConfig;
     private final Map<StateDescriptor.Type, SupplierWithException<IS, 
Exception>> stateFactories;
+    private final KeyedStateBackend keyedStateBackend;
 
-    private LatencyTrackingStateFactory(
+    private MetricsTrackingStateFactory(
             InternalKvState<K, N, ?> kvState,
+            KeyedStateBackend<K> keyedStateBackend,
             StateDescriptor<S, V> stateDescriptor,
-            LatencyTrackingStateConfig latencyTrackingStateConfig) {
+            LatencyTrackingStateConfig latencyTrackingStateConfig,
+            SizeTrackingStateConfig sizeTrackingStateConfig) {
         this.kvState = kvState;
+        this.keyedStateBackend = keyedStateBackend;
         this.stateDescriptor = stateDescriptor;
         this.latencyTrackingStateConfig = latencyTrackingStateConfig;
+        this.sizeTrackingStateConfig = sizeTrackingStateConfig;
         this.stateFactories = createStateFactories();
     }
 
     /** Create latency tracking state if enabled. */
     public static <K, N, V, S extends State>
             InternalKvState<K, N, ?> 
createStateAndWrapWithLatencyTrackingIfEnabled(

Review Comment:
   nit: Rename to `createStateAndWrapWithMetricsTrackingIfEnabled`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/AbstractMetricsTrackState.java:
##########
@@ -137,18 +177,68 @@ protected void trackLatencyWithException(
         long startTime = System.nanoTime();
         runnable.run();
         long latency = System.nanoTime() - startTime;
-        latencyTrackingStateMetric.updateLatency(latencyLabel, latency);
+        latencyTrackingStateMetric.updateMetrics(latencyLabel, latency);
     }
 
     protected void trackLatency(Runnable runnable, String latencyLabel) {
         long startTime = System.nanoTime();
         runnable.run();
         long latency = System.nanoTime() - startTime;
-        latencyTrackingStateMetric.updateLatency(latencyLabel, latency);
+        latencyTrackingStateMetric.updateMetrics(latencyLabel, latency);
+    }
+
+    protected K getCurrentKey() {
+        return keyedStateBackend.getCurrentKey();
+    }
+
+    public final N getCurrentNamespace() {
+        return currentNamespace;
+    }
+
+    protected long sizeOfKey() throws IOException {
+        if (keySerializer != null && keySerializer.getLength() == -1) {
+            try {
+                keySerializer.serialize(keyedStateBackend.getCurrentKey(), 
outputSerializer);
+                keySize = outputSerializer.length();
+            } finally {
+                outputSerializer.clear();
+            }
+        }
+
+        if (namespaceSerializer != null && namespaceSerializer.getLength() == 
-1) {
+            try {
+                namespaceSerializer.serialize(getCurrentNamespace(), 
outputSerializer);
+                namespaceSize = outputSerializer.length();
+            } finally {
+                outputSerializer.clear();
+            }
+        }
+
+        return keySize + namespaceSize;
+    }
+
+    protected long sizeOfValue(V value) throws IOException {
+        if (valueSerializer == null || value == null) {
+            return 0;

Review Comment:
   Add `valueSize=0`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/AbstractMetricsTrackState.java:
##########
@@ -28,24 +30,60 @@
 import java.util.function.Supplier;
 
 /**
- * Abstract implementation of latency tracking state.
+ * Abstract implementation of metrics tracking state.
  *
  * @param <K> The type of key the state is associated to
  * @param <N> The type of the namespace
  * @param <V> Type of the user entry value of state
  * @param <S> Type of the internal kv state
- * @param <LSM> Type of the latency tracking state metrics
+ * @param <LSM> Type of the metrics tracking state metrics
  */
-class AbstractLatencyTrackState<
-                K, N, V, S extends InternalKvState<K, N, V>, LSM extends 
StateLatencyMetricBase>
+class AbstractMetricsTrackState<
+                K, N, V, S extends InternalKvState<K, N, V>, LSM extends 
StateMetricBase>
         implements InternalKvState<K, N, V> {
 
     protected S original;
     protected LSM latencyTrackingStateMetric;
-
-    AbstractLatencyTrackState(S original, LSM latencyTrackingStateMetric) {
+    protected LSM sizeTrackingStateMetric;
+    protected KeyedStateBackend<K> keyedStateBackend;
+    protected N currentNamespace;
+    protected final TypeSerializer<K> keySerializer;
+    protected final TypeSerializer<N> namespaceSerializer;
+    protected final TypeSerializer<V> valueSerializer;
+    protected final DataOutputSerializer outputSerializer;
+    protected long namespaceSize;
+    protected long keySize;
+    protected long valueSize;
+
+    AbstractMetricsTrackState(
+            S original,
+            KeyedStateBackend<K> keyedStateBackend,
+            LSM latencyTrackingStateMetric,
+            LSM sizeTrackingStateMetric) {
         this.original = original;
+        this.keyedStateBackend = keyedStateBackend;
         this.latencyTrackingStateMetric = latencyTrackingStateMetric;
+        this.sizeTrackingStateMetric = sizeTrackingStateMetric;
+        this.keySerializer = getKeySerializer() != null ? 
getKeySerializer().duplicate() : null;
+        this.namespaceSerializer =
+                getNamespaceSerializer() != null ? 
getNamespaceSerializer().duplicate() : null;
+        this.valueSerializer =
+                getValueSerializer() != null ? 
getValueSerializer().duplicate() : null;
+        this.outputSerializer = new DataOutputSerializer(64);
+        initStateSize();
+    }
+
+    private void initStateSize() {
+        if (keySerializer != null) {
+            this.keySize = keySerializer.getLength() == -1 ? -1L : 
keySerializer.getLength();

Review Comment:
   How about 
   ```suggestion
              this.keySize = keySerializer.getLength();
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/MetricsTrackingMapState.java:
##########
@@ -33,133 +37,223 @@
  * @param <UK> Type of the user entry key of state
  * @param <UV> Type of the user entry value of state
  */
-class LatencyTrackingMapState<K, N, UK, UV>
-        extends AbstractLatencyTrackState<
+class MetricsTrackingMapState<K, N, UK, UV>
+        extends AbstractMetricsTrackState<
                 K,
                 N,
                 Map<UK, UV>,
                 InternalMapState<K, N, UK, UV>,
-                LatencyTrackingMapState.MapStateLatencyMetrics>
+                MetricsTrackingMapState.MapStateMetrics>
         implements InternalMapState<K, N, UK, UV> {
-    LatencyTrackingMapState(
+
+    private TypeSerializer<UK> userKeySerializer;
+    private TypeSerializer<UV> userValueSerializer;
+
+    MetricsTrackingMapState(
             String stateName,
             InternalMapState<K, N, UK, UV> original,
-            LatencyTrackingStateConfig latencyTrackingStateConfig) {
+            KeyedStateBackend<K> keyedStateBackend,
+            LatencyTrackingStateConfig latencyTrackingStateConfig,
+            SizeTrackingStateConfig sizeTrackingStateConfig) {
         super(
                 original,
-                new MapStateLatencyMetrics(
-                        stateName,
-                        latencyTrackingStateConfig.getMetricGroup(),
-                        latencyTrackingStateConfig.getSampleInterval(),
-                        latencyTrackingStateConfig.getHistorySize(),
-                        latencyTrackingStateConfig.isStateNameAsVariable()));
+                keyedStateBackend,
+                latencyTrackingStateConfig.isEnabled()
+                        ? new MapStateMetrics(
+                                stateName,
+                                latencyTrackingStateConfig.getMetricGroup(),
+                                latencyTrackingStateConfig.getSampleInterval(),
+                                latencyTrackingStateConfig.getHistorySize(),
+                                
latencyTrackingStateConfig.isStateNameAsVariable())
+                        : null,
+                sizeTrackingStateConfig.isEnabled()
+                        ? new MapStateMetrics(
+                                stateName,
+                                sizeTrackingStateConfig.getMetricGroup(),
+                                sizeTrackingStateConfig.getSampleInterval(),
+                                sizeTrackingStateConfig.getHistorySize(),
+                                
sizeTrackingStateConfig.isStateNameAsVariable())
+                        : null);
+        if (valueSerializer != null) {
+            MapSerializer<UK, UV> castedMapSerializer = (MapSerializer<UK, 
UV>) valueSerializer;
+            userKeySerializer = castedMapSerializer.getKeySerializer();
+            userValueSerializer = castedMapSerializer.getValueSerializer();
+        }
     }
 
     @Override
     public UV get(UK key) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnGet()) {
+        if (sizeTrackingStateMetric != null && 
sizeTrackingStateMetric.trackLatencyOnGet()) {
+            sizeTrackingStateMetric.updateMetrics(
+                    MapStateMetrics.MAP_STATE_GET_KEY_SIZE, 
sizeOfKeyAndUserKey(key));
+            sizeTrackingStateMetric.updateMetrics(
+                    MapStateMetrics.MAP_STATE_GET_VALUE_SIZE, 
sizeOfUserValue(original.get(key)));
+        }
+        if (latencyTrackingStateMetric != null && 
latencyTrackingStateMetric.trackLatencyOnGet()) {
             return trackLatencyWithException(
-                    () -> original.get(key), 
MapStateLatencyMetrics.MAP_STATE_GET_LATENCY);
+                    () -> original.get(key), 
MapStateMetrics.MAP_STATE_GET_LATENCY);
         } else {
             return original.get(key);
         }
     }
 
     @Override
     public void put(UK key, UV value) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnPut()) {
+        if (sizeTrackingStateMetric != null && 
sizeTrackingStateMetric.trackLatencyOnPut()) {
+            sizeTrackingStateMetric.updateMetrics(
+                    MapStateMetrics.MAP_STATE_PUT_KEY_SIZE, 
sizeOfKeyAndUserKey(key));
+            sizeTrackingStateMetric.updateMetrics(
+                    MapStateMetrics.MAP_STATE_PUT_VALUE_SIZE, 
sizeOfUserValue(value));
+        }
+        if (latencyTrackingStateMetric != null && 
latencyTrackingStateMetric.trackLatencyOnPut()) {
             trackLatencyWithException(
-                    () -> original.put(key, value), 
MapStateLatencyMetrics.MAP_STATE_PUT_LATENCY);
+                    () -> original.put(key, value), 
MapStateMetrics.MAP_STATE_PUT_LATENCY);
         } else {
             original.put(key, value);
         }
     }
 
     @Override
     public void putAll(Map<UK, UV> map) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnPutAll()) {
+        if (sizeTrackingStateMetric != null && 
sizeTrackingStateMetric.trackLatencyOnPutAll()) {
+            for (Map.Entry<UK, UV> entry : map.entrySet()) {
+                sizeTrackingStateMetric.updateMetrics(
+                        MapStateMetrics.MAP_STATE_PUT_KEY_SIZE,
+                        sizeOfKeyAndUserKey(entry.getKey()));
+                sizeTrackingStateMetric.updateMetrics(
+                        MapStateMetrics.MAP_STATE_PUT_VALUE_SIZE,
+                        sizeOfUserValue(entry.getValue()));
+            }
+        }
+
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnPutAll()) {
             trackLatencyWithException(
-                    () -> original.putAll(map), 
MapStateLatencyMetrics.MAP_STATE_PUT_ALL_LATENCY);
+                    () -> original.putAll(map), 
MapStateMetrics.MAP_STATE_PUT_ALL_LATENCY);
         } else {
             original.putAll(map);
         }
     }
 
     @Override
     public void remove(UK key) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnRemove()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnRemove()) {
             trackLatencyWithException(
-                    () -> original.remove(key), 
MapStateLatencyMetrics.MAP_STATE_REMOVE_LATENCY);
+                    () -> original.remove(key), 
MapStateMetrics.MAP_STATE_REMOVE_LATENCY);
         } else {
             original.remove(key);
         }
     }
 
     @Override
     public boolean contains(UK key) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnContains()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnContains()) {
             return trackLatencyWithException(
-                    () -> original.contains(key),
-                    MapStateLatencyMetrics.MAP_STATE_CONTAINS_LATENCY);
+                    () -> original.contains(key), 
MapStateMetrics.MAP_STATE_CONTAINS_LATENCY);
         } else {
             return original.contains(key);
         }
     }
 
     @Override
     public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnEntriesInit()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnEntriesInit()) {
             return trackLatencyWithException(
                     () -> new IterableWrapper<>(original.entries()),
-                    MapStateLatencyMetrics.MAP_STATE_ENTRIES_INIT_LATENCY);
+                    MapStateMetrics.MAP_STATE_ENTRIES_INIT_LATENCY);
         } else {
             return new IterableWrapper<>(original.entries());
         }
     }
 
     @Override
     public Iterable<UK> keys() throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnKeysInit()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnKeysInit()) {
             return trackLatencyWithException(
                     () -> new IterableWrapper<>(original.keys()),
-                    MapStateLatencyMetrics.MAP_STATE_KEYS_INIT_LATENCY);
+                    MapStateMetrics.MAP_STATE_KEYS_INIT_LATENCY);
         } else {
             return new IterableWrapper<>(original.keys());
         }
     }
 
     @Override
     public Iterable<UV> values() throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnValuesInit()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnValuesInit()) {
             return trackLatencyWithException(
                     () -> new IterableWrapper<>(original.values()),
-                    MapStateLatencyMetrics.MAP_STATE_VALUES_INIT_LATENCY);
+                    MapStateMetrics.MAP_STATE_VALUES_INIT_LATENCY);
         } else {
             return new IterableWrapper<>(original.values());
         }
     }
 
     @Override
     public Iterator<Map.Entry<UK, UV>> iterator() throws Exception {

Review Comment:
   Same as `values()`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/MetricsTrackingMapState.java:
##########
@@ -33,133 +37,223 @@
  * @param <UK> Type of the user entry key of state
  * @param <UV> Type of the user entry value of state
  */
-class LatencyTrackingMapState<K, N, UK, UV>
-        extends AbstractLatencyTrackState<
+class MetricsTrackingMapState<K, N, UK, UV>
+        extends AbstractMetricsTrackState<
                 K,
                 N,
                 Map<UK, UV>,
                 InternalMapState<K, N, UK, UV>,
-                LatencyTrackingMapState.MapStateLatencyMetrics>
+                MetricsTrackingMapState.MapStateMetrics>
         implements InternalMapState<K, N, UK, UV> {
-    LatencyTrackingMapState(
+
+    private TypeSerializer<UK> userKeySerializer;
+    private TypeSerializer<UV> userValueSerializer;
+
+    MetricsTrackingMapState(
             String stateName,
             InternalMapState<K, N, UK, UV> original,
-            LatencyTrackingStateConfig latencyTrackingStateConfig) {
+            KeyedStateBackend<K> keyedStateBackend,
+            LatencyTrackingStateConfig latencyTrackingStateConfig,
+            SizeTrackingStateConfig sizeTrackingStateConfig) {
         super(
                 original,
-                new MapStateLatencyMetrics(
-                        stateName,
-                        latencyTrackingStateConfig.getMetricGroup(),
-                        latencyTrackingStateConfig.getSampleInterval(),
-                        latencyTrackingStateConfig.getHistorySize(),
-                        latencyTrackingStateConfig.isStateNameAsVariable()));
+                keyedStateBackend,
+                latencyTrackingStateConfig.isEnabled()
+                        ? new MapStateMetrics(
+                                stateName,
+                                latencyTrackingStateConfig.getMetricGroup(),
+                                latencyTrackingStateConfig.getSampleInterval(),
+                                latencyTrackingStateConfig.getHistorySize(),
+                                
latencyTrackingStateConfig.isStateNameAsVariable())
+                        : null,
+                sizeTrackingStateConfig.isEnabled()
+                        ? new MapStateMetrics(
+                                stateName,
+                                sizeTrackingStateConfig.getMetricGroup(),
+                                sizeTrackingStateConfig.getSampleInterval(),
+                                sizeTrackingStateConfig.getHistorySize(),
+                                
sizeTrackingStateConfig.isStateNameAsVariable())
+                        : null);
+        if (valueSerializer != null) {
+            MapSerializer<UK, UV> castedMapSerializer = (MapSerializer<UK, 
UV>) valueSerializer;
+            userKeySerializer = castedMapSerializer.getKeySerializer();
+            userValueSerializer = castedMapSerializer.getValueSerializer();
+        }
     }
 
     @Override
     public UV get(UK key) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnGet()) {
+        if (sizeTrackingStateMetric != null && 
sizeTrackingStateMetric.trackLatencyOnGet()) {
+            sizeTrackingStateMetric.updateMetrics(
+                    MapStateMetrics.MAP_STATE_GET_KEY_SIZE, 
sizeOfKeyAndUserKey(key));
+            sizeTrackingStateMetric.updateMetrics(
+                    MapStateMetrics.MAP_STATE_GET_VALUE_SIZE, 
sizeOfUserValue(original.get(key)));
+        }
+        if (latencyTrackingStateMetric != null && 
latencyTrackingStateMetric.trackLatencyOnGet()) {
             return trackLatencyWithException(
-                    () -> original.get(key), 
MapStateLatencyMetrics.MAP_STATE_GET_LATENCY);
+                    () -> original.get(key), 
MapStateMetrics.MAP_STATE_GET_LATENCY);
         } else {
             return original.get(key);
         }
     }
 
     @Override
     public void put(UK key, UV value) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnPut()) {
+        if (sizeTrackingStateMetric != null && 
sizeTrackingStateMetric.trackLatencyOnPut()) {
+            sizeTrackingStateMetric.updateMetrics(
+                    MapStateMetrics.MAP_STATE_PUT_KEY_SIZE, 
sizeOfKeyAndUserKey(key));
+            sizeTrackingStateMetric.updateMetrics(
+                    MapStateMetrics.MAP_STATE_PUT_VALUE_SIZE, 
sizeOfUserValue(value));
+        }
+        if (latencyTrackingStateMetric != null && 
latencyTrackingStateMetric.trackLatencyOnPut()) {
             trackLatencyWithException(
-                    () -> original.put(key, value), 
MapStateLatencyMetrics.MAP_STATE_PUT_LATENCY);
+                    () -> original.put(key, value), 
MapStateMetrics.MAP_STATE_PUT_LATENCY);
         } else {
             original.put(key, value);
         }
     }
 
     @Override
     public void putAll(Map<UK, UV> map) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnPutAll()) {
+        if (sizeTrackingStateMetric != null && 
sizeTrackingStateMetric.trackLatencyOnPutAll()) {
+            for (Map.Entry<UK, UV> entry : map.entrySet()) {
+                sizeTrackingStateMetric.updateMetrics(
+                        MapStateMetrics.MAP_STATE_PUT_KEY_SIZE,
+                        sizeOfKeyAndUserKey(entry.getKey()));
+                sizeTrackingStateMetric.updateMetrics(
+                        MapStateMetrics.MAP_STATE_PUT_VALUE_SIZE,
+                        sizeOfUserValue(entry.getValue()));
+            }
+        }
+
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnPutAll()) {
             trackLatencyWithException(
-                    () -> original.putAll(map), 
MapStateLatencyMetrics.MAP_STATE_PUT_ALL_LATENCY);
+                    () -> original.putAll(map), 
MapStateMetrics.MAP_STATE_PUT_ALL_LATENCY);
         } else {
             original.putAll(map);
         }
     }
 
     @Override
     public void remove(UK key) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnRemove()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnRemove()) {
             trackLatencyWithException(
-                    () -> original.remove(key), 
MapStateLatencyMetrics.MAP_STATE_REMOVE_LATENCY);
+                    () -> original.remove(key), 
MapStateMetrics.MAP_STATE_REMOVE_LATENCY);
         } else {
             original.remove(key);
         }
     }
 
     @Override
     public boolean contains(UK key) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnContains()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnContains()) {
             return trackLatencyWithException(
-                    () -> original.contains(key),
-                    MapStateLatencyMetrics.MAP_STATE_CONTAINS_LATENCY);
+                    () -> original.contains(key), 
MapStateMetrics.MAP_STATE_CONTAINS_LATENCY);
         } else {
             return original.contains(key);
         }
     }
 
     @Override
     public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnEntriesInit()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnEntriesInit()) {
             return trackLatencyWithException(
                     () -> new IterableWrapper<>(original.entries()),
-                    MapStateLatencyMetrics.MAP_STATE_ENTRIES_INIT_LATENCY);
+                    MapStateMetrics.MAP_STATE_ENTRIES_INIT_LATENCY);
         } else {
             return new IterableWrapper<>(original.entries());
         }
     }
 
     @Override
     public Iterable<UK> keys() throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnKeysInit()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnKeysInit()) {
             return trackLatencyWithException(
                     () -> new IterableWrapper<>(original.keys()),
-                    MapStateLatencyMetrics.MAP_STATE_KEYS_INIT_LATENCY);
+                    MapStateMetrics.MAP_STATE_KEYS_INIT_LATENCY);
         } else {
             return new IterableWrapper<>(original.keys());
         }
     }
 
     @Override
     public Iterable<UV> values() throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnValuesInit()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnValuesInit()) {
             return trackLatencyWithException(
                     () -> new IterableWrapper<>(original.values()),
-                    MapStateLatencyMetrics.MAP_STATE_VALUES_INIT_LATENCY);
+                    MapStateMetrics.MAP_STATE_VALUES_INIT_LATENCY);
         } else {
             return new IterableWrapper<>(original.values());
         }
     }
 
     @Override
     public Iterator<Map.Entry<UK, UV>> iterator() throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnIteratorInit()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnIteratorInit()) {
             return trackLatencyWithException(
                     () -> new IteratorWrapper<>(original.iterator()),
-                    MapStateLatencyMetrics.MAP_STATE_ITERATOR_INIT_LATENCY);
+                    MapStateMetrics.MAP_STATE_ITERATOR_INIT_LATENCY);
         } else {
             return new IteratorWrapper<>(original.iterator());
         }
     }
 
     @Override
     public boolean isEmpty() throws Exception {

Review Comment:
   Same as `values()`, and other methods may also should update state-size 
metrics.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/MetricsTrackingMapState.java:
##########
@@ -33,133 +37,223 @@
  * @param <UK> Type of the user entry key of state
  * @param <UV> Type of the user entry value of state
  */
-class LatencyTrackingMapState<K, N, UK, UV>
-        extends AbstractLatencyTrackState<
+class MetricsTrackingMapState<K, N, UK, UV>
+        extends AbstractMetricsTrackState<
                 K,
                 N,
                 Map<UK, UV>,
                 InternalMapState<K, N, UK, UV>,
-                LatencyTrackingMapState.MapStateLatencyMetrics>
+                MetricsTrackingMapState.MapStateMetrics>
         implements InternalMapState<K, N, UK, UV> {
-    LatencyTrackingMapState(
+
+    private TypeSerializer<UK> userKeySerializer;
+    private TypeSerializer<UV> userValueSerializer;
+
+    MetricsTrackingMapState(
             String stateName,
             InternalMapState<K, N, UK, UV> original,
-            LatencyTrackingStateConfig latencyTrackingStateConfig) {
+            KeyedStateBackend<K> keyedStateBackend,
+            LatencyTrackingStateConfig latencyTrackingStateConfig,
+            SizeTrackingStateConfig sizeTrackingStateConfig) {
         super(
                 original,
-                new MapStateLatencyMetrics(
-                        stateName,
-                        latencyTrackingStateConfig.getMetricGroup(),
-                        latencyTrackingStateConfig.getSampleInterval(),
-                        latencyTrackingStateConfig.getHistorySize(),
-                        latencyTrackingStateConfig.isStateNameAsVariable()));
+                keyedStateBackend,
+                latencyTrackingStateConfig.isEnabled()
+                        ? new MapStateMetrics(
+                                stateName,
+                                latencyTrackingStateConfig.getMetricGroup(),
+                                latencyTrackingStateConfig.getSampleInterval(),
+                                latencyTrackingStateConfig.getHistorySize(),
+                                
latencyTrackingStateConfig.isStateNameAsVariable())
+                        : null,
+                sizeTrackingStateConfig.isEnabled()
+                        ? new MapStateMetrics(
+                                stateName,
+                                sizeTrackingStateConfig.getMetricGroup(),
+                                sizeTrackingStateConfig.getSampleInterval(),
+                                sizeTrackingStateConfig.getHistorySize(),
+                                
sizeTrackingStateConfig.isStateNameAsVariable())
+                        : null);
+        if (valueSerializer != null) {
+            MapSerializer<UK, UV> castedMapSerializer = (MapSerializer<UK, 
UV>) valueSerializer;
+            userKeySerializer = castedMapSerializer.getKeySerializer();
+            userValueSerializer = castedMapSerializer.getValueSerializer();
+        }
     }
 
     @Override
     public UV get(UK key) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnGet()) {
+        if (sizeTrackingStateMetric != null && 
sizeTrackingStateMetric.trackLatencyOnGet()) {
+            sizeTrackingStateMetric.updateMetrics(
+                    MapStateMetrics.MAP_STATE_GET_KEY_SIZE, 
sizeOfKeyAndUserKey(key));
+            sizeTrackingStateMetric.updateMetrics(
+                    MapStateMetrics.MAP_STATE_GET_VALUE_SIZE, 
sizeOfUserValue(original.get(key)));
+        }
+        if (latencyTrackingStateMetric != null && 
latencyTrackingStateMetric.trackLatencyOnGet()) {
             return trackLatencyWithException(
-                    () -> original.get(key), 
MapStateLatencyMetrics.MAP_STATE_GET_LATENCY);
+                    () -> original.get(key), 
MapStateMetrics.MAP_STATE_GET_LATENCY);
         } else {
             return original.get(key);
         }
     }
 
     @Override
     public void put(UK key, UV value) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnPut()) {
+        if (sizeTrackingStateMetric != null && 
sizeTrackingStateMetric.trackLatencyOnPut()) {
+            sizeTrackingStateMetric.updateMetrics(
+                    MapStateMetrics.MAP_STATE_PUT_KEY_SIZE, 
sizeOfKeyAndUserKey(key));
+            sizeTrackingStateMetric.updateMetrics(
+                    MapStateMetrics.MAP_STATE_PUT_VALUE_SIZE, 
sizeOfUserValue(value));
+        }
+        if (latencyTrackingStateMetric != null && 
latencyTrackingStateMetric.trackLatencyOnPut()) {
             trackLatencyWithException(
-                    () -> original.put(key, value), 
MapStateLatencyMetrics.MAP_STATE_PUT_LATENCY);
+                    () -> original.put(key, value), 
MapStateMetrics.MAP_STATE_PUT_LATENCY);
         } else {
             original.put(key, value);
         }
     }
 
     @Override
     public void putAll(Map<UK, UV> map) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnPutAll()) {
+        if (sizeTrackingStateMetric != null && 
sizeTrackingStateMetric.trackLatencyOnPutAll()) {
+            for (Map.Entry<UK, UV> entry : map.entrySet()) {
+                sizeTrackingStateMetric.updateMetrics(
+                        MapStateMetrics.MAP_STATE_PUT_KEY_SIZE,
+                        sizeOfKeyAndUserKey(entry.getKey()));
+                sizeTrackingStateMetric.updateMetrics(
+                        MapStateMetrics.MAP_STATE_PUT_VALUE_SIZE,
+                        sizeOfUserValue(entry.getValue()));
+            }
+        }
+
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnPutAll()) {
             trackLatencyWithException(
-                    () -> original.putAll(map), 
MapStateLatencyMetrics.MAP_STATE_PUT_ALL_LATENCY);
+                    () -> original.putAll(map), 
MapStateMetrics.MAP_STATE_PUT_ALL_LATENCY);
         } else {
             original.putAll(map);
         }
     }
 
     @Override
     public void remove(UK key) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnRemove()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnRemove()) {
             trackLatencyWithException(
-                    () -> original.remove(key), 
MapStateLatencyMetrics.MAP_STATE_REMOVE_LATENCY);
+                    () -> original.remove(key), 
MapStateMetrics.MAP_STATE_REMOVE_LATENCY);
         } else {
             original.remove(key);
         }
     }
 
     @Override
     public boolean contains(UK key) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnContains()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnContains()) {
             return trackLatencyWithException(
-                    () -> original.contains(key),
-                    MapStateLatencyMetrics.MAP_STATE_CONTAINS_LATENCY);
+                    () -> original.contains(key), 
MapStateMetrics.MAP_STATE_CONTAINS_LATENCY);
         } else {
             return original.contains(key);
         }
     }
 
     @Override
     public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnEntriesInit()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnEntriesInit()) {
             return trackLatencyWithException(
                     () -> new IterableWrapper<>(original.entries()),
-                    MapStateLatencyMetrics.MAP_STATE_ENTRIES_INIT_LATENCY);
+                    MapStateMetrics.MAP_STATE_ENTRIES_INIT_LATENCY);
         } else {
             return new IterableWrapper<>(original.entries());
         }
     }
 
     @Override
     public Iterable<UK> keys() throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnKeysInit()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnKeysInit()) {
             return trackLatencyWithException(
                     () -> new IterableWrapper<>(original.keys()),
-                    MapStateLatencyMetrics.MAP_STATE_KEYS_INIT_LATENCY);
+                    MapStateMetrics.MAP_STATE_KEYS_INIT_LATENCY);
         } else {
             return new IterableWrapper<>(original.keys());
         }
     }
 
     @Override
     public Iterable<UV> values() throws Exception {

Review Comment:
   Should `MAP_STATE_GET_KEY_SIZE/MAP_STATE_GET_VALUE_SIZE` be updated here?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to