fredia commented on code in PR #25837:
URL: https://github.com/apache/flink/pull/25837#discussion_r2013300306


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/MetricsTrackingMapState.java:
##########
@@ -33,133 +37,223 @@
  * @param <UK> Type of the user entry key of state
  * @param <UV> Type of the user entry value of state
  */
-class LatencyTrackingMapState<K, N, UK, UV>
-        extends AbstractLatencyTrackState<
+class MetricsTrackingMapState<K, N, UK, UV>
+        extends AbstractMetricsTrackState<
                 K,
                 N,
                 Map<UK, UV>,
                 InternalMapState<K, N, UK, UV>,
-                LatencyTrackingMapState.MapStateLatencyMetrics>
+                MetricsTrackingMapState.MapStateMetrics>
         implements InternalMapState<K, N, UK, UV> {
-    LatencyTrackingMapState(
+
+    private TypeSerializer<UK> userKeySerializer;
+    private TypeSerializer<UV> userValueSerializer;
+
+    MetricsTrackingMapState(
             String stateName,
             InternalMapState<K, N, UK, UV> original,
-            LatencyTrackingStateConfig latencyTrackingStateConfig) {
+            KeyedStateBackend<K> keyedStateBackend,
+            LatencyTrackingStateConfig latencyTrackingStateConfig,
+            SizeTrackingStateConfig sizeTrackingStateConfig) {
         super(
                 original,
-                new MapStateLatencyMetrics(
-                        stateName,
-                        latencyTrackingStateConfig.getMetricGroup(),
-                        latencyTrackingStateConfig.getSampleInterval(),
-                        latencyTrackingStateConfig.getHistorySize(),
-                        latencyTrackingStateConfig.isStateNameAsVariable()));
+                keyedStateBackend,
+                latencyTrackingStateConfig.isEnabled()
+                        ? new MapStateMetrics(
+                                stateName,
+                                latencyTrackingStateConfig.getMetricGroup(),
+                                latencyTrackingStateConfig.getSampleInterval(),
+                                latencyTrackingStateConfig.getHistorySize(),
+                                
latencyTrackingStateConfig.isStateNameAsVariable())
+                        : null,
+                sizeTrackingStateConfig.isEnabled()
+                        ? new MapStateMetrics(
+                                stateName,
+                                sizeTrackingStateConfig.getMetricGroup(),
+                                sizeTrackingStateConfig.getSampleInterval(),
+                                sizeTrackingStateConfig.getHistorySize(),
+                                
sizeTrackingStateConfig.isStateNameAsVariable())
+                        : null);
+        if (valueSerializer != null) {
+            MapSerializer<UK, UV> castedMapSerializer = (MapSerializer<UK, 
UV>) valueSerializer;
+            userKeySerializer = castedMapSerializer.getKeySerializer();
+            userValueSerializer = castedMapSerializer.getValueSerializer();
+        }
     }
 
     @Override
     public UV get(UK key) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnGet()) {
+        if (sizeTrackingStateMetric != null && 
sizeTrackingStateMetric.trackLatencyOnGet()) {
+            sizeTrackingStateMetric.updateMetrics(
+                    MapStateMetrics.MAP_STATE_GET_KEY_SIZE, 
sizeOfKeyAndUserKey(key));
+            sizeTrackingStateMetric.updateMetrics(
+                    MapStateMetrics.MAP_STATE_GET_VALUE_SIZE, 
sizeOfUserValue(original.get(key)));
+        }
+        if (latencyTrackingStateMetric != null && 
latencyTrackingStateMetric.trackLatencyOnGet()) {
             return trackLatencyWithException(
-                    () -> original.get(key), 
MapStateLatencyMetrics.MAP_STATE_GET_LATENCY);
+                    () -> original.get(key), 
MapStateMetrics.MAP_STATE_GET_LATENCY);
         } else {
             return original.get(key);
         }
     }
 
     @Override
     public void put(UK key, UV value) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnPut()) {
+        if (sizeTrackingStateMetric != null && 
sizeTrackingStateMetric.trackLatencyOnPut()) {
+            sizeTrackingStateMetric.updateMetrics(
+                    MapStateMetrics.MAP_STATE_PUT_KEY_SIZE, 
sizeOfKeyAndUserKey(key));
+            sizeTrackingStateMetric.updateMetrics(
+                    MapStateMetrics.MAP_STATE_PUT_VALUE_SIZE, 
sizeOfUserValue(value));
+        }
+        if (latencyTrackingStateMetric != null && 
latencyTrackingStateMetric.trackLatencyOnPut()) {
             trackLatencyWithException(
-                    () -> original.put(key, value), 
MapStateLatencyMetrics.MAP_STATE_PUT_LATENCY);
+                    () -> original.put(key, value), 
MapStateMetrics.MAP_STATE_PUT_LATENCY);
         } else {
             original.put(key, value);
         }
     }
 
     @Override
     public void putAll(Map<UK, UV> map) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnPutAll()) {
+        if (sizeTrackingStateMetric != null && 
sizeTrackingStateMetric.trackLatencyOnPutAll()) {
+            for (Map.Entry<UK, UV> entry : map.entrySet()) {
+                sizeTrackingStateMetric.updateMetrics(
+                        MapStateMetrics.MAP_STATE_PUT_KEY_SIZE,
+                        sizeOfKeyAndUserKey(entry.getKey()));
+                sizeTrackingStateMetric.updateMetrics(
+                        MapStateMetrics.MAP_STATE_PUT_VALUE_SIZE,
+                        sizeOfUserValue(entry.getValue()));
+            }
+        }
+
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnPutAll()) {
             trackLatencyWithException(
-                    () -> original.putAll(map), 
MapStateLatencyMetrics.MAP_STATE_PUT_ALL_LATENCY);
+                    () -> original.putAll(map), 
MapStateMetrics.MAP_STATE_PUT_ALL_LATENCY);
         } else {
             original.putAll(map);
         }
     }
 
     @Override
     public void remove(UK key) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnRemove()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnRemove()) {
             trackLatencyWithException(
-                    () -> original.remove(key), 
MapStateLatencyMetrics.MAP_STATE_REMOVE_LATENCY);
+                    () -> original.remove(key), 
MapStateMetrics.MAP_STATE_REMOVE_LATENCY);
         } else {
             original.remove(key);
         }
     }
 
     @Override
     public boolean contains(UK key) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnContains()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnContains()) {
             return trackLatencyWithException(
-                    () -> original.contains(key),
-                    MapStateLatencyMetrics.MAP_STATE_CONTAINS_LATENCY);
+                    () -> original.contains(key), 
MapStateMetrics.MAP_STATE_CONTAINS_LATENCY);
         } else {
             return original.contains(key);
         }
     }
 
     @Override
     public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnEntriesInit()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnEntriesInit()) {
             return trackLatencyWithException(
                     () -> new IterableWrapper<>(original.entries()),
-                    MapStateLatencyMetrics.MAP_STATE_ENTRIES_INIT_LATENCY);
+                    MapStateMetrics.MAP_STATE_ENTRIES_INIT_LATENCY);
         } else {
             return new IterableWrapper<>(original.entries());
         }
     }
 
     @Override
     public Iterable<UK> keys() throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnKeysInit()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnKeysInit()) {
             return trackLatencyWithException(
                     () -> new IterableWrapper<>(original.keys()),
-                    MapStateLatencyMetrics.MAP_STATE_KEYS_INIT_LATENCY);
+                    MapStateMetrics.MAP_STATE_KEYS_INIT_LATENCY);
         } else {
             return new IterableWrapper<>(original.keys());
         }
     }
 
     @Override
     public Iterable<UV> values() throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnValuesInit()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnValuesInit()) {
             return trackLatencyWithException(
                     () -> new IterableWrapper<>(original.values()),
-                    MapStateLatencyMetrics.MAP_STATE_VALUES_INIT_LATENCY);
+                    MapStateMetrics.MAP_STATE_VALUES_INIT_LATENCY);
         } else {
             return new IterableWrapper<>(original.values());
         }
     }
 
     @Override
     public Iterator<Map.Entry<UK, UV>> iterator() throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnIteratorInit()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnIteratorInit()) {
             return trackLatencyWithException(
                     () -> new IteratorWrapper<>(original.iterator()),
-                    MapStateLatencyMetrics.MAP_STATE_ITERATOR_INIT_LATENCY);
+                    MapStateMetrics.MAP_STATE_ITERATOR_INIT_LATENCY);
         } else {
             return new IteratorWrapper<>(original.iterator());
         }
     }
 
     @Override
     public boolean isEmpty() throws Exception {

Review Comment:
   Different state backend has different implementations for `isEmpty`, 
regardless of whether the value exists or not, a key will be used to query it. 
Perhaps we can consider the size of current key to query?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/MetricsTrackingMapState.java:
##########
@@ -33,133 +37,223 @@
  * @param <UK> Type of the user entry key of state
  * @param <UV> Type of the user entry value of state
  */
-class LatencyTrackingMapState<K, N, UK, UV>
-        extends AbstractLatencyTrackState<
+class MetricsTrackingMapState<K, N, UK, UV>
+        extends AbstractMetricsTrackState<
                 K,
                 N,
                 Map<UK, UV>,
                 InternalMapState<K, N, UK, UV>,
-                LatencyTrackingMapState.MapStateLatencyMetrics>
+                MetricsTrackingMapState.MapStateMetrics>
         implements InternalMapState<K, N, UK, UV> {
-    LatencyTrackingMapState(
+
+    private TypeSerializer<UK> userKeySerializer;
+    private TypeSerializer<UV> userValueSerializer;
+
+    MetricsTrackingMapState(
             String stateName,
             InternalMapState<K, N, UK, UV> original,
-            LatencyTrackingStateConfig latencyTrackingStateConfig) {
+            KeyedStateBackend<K> keyedStateBackend,
+            LatencyTrackingStateConfig latencyTrackingStateConfig,
+            SizeTrackingStateConfig sizeTrackingStateConfig) {
         super(
                 original,
-                new MapStateLatencyMetrics(
-                        stateName,
-                        latencyTrackingStateConfig.getMetricGroup(),
-                        latencyTrackingStateConfig.getSampleInterval(),
-                        latencyTrackingStateConfig.getHistorySize(),
-                        latencyTrackingStateConfig.isStateNameAsVariable()));
+                keyedStateBackend,
+                latencyTrackingStateConfig.isEnabled()
+                        ? new MapStateMetrics(
+                                stateName,
+                                latencyTrackingStateConfig.getMetricGroup(),
+                                latencyTrackingStateConfig.getSampleInterval(),
+                                latencyTrackingStateConfig.getHistorySize(),
+                                
latencyTrackingStateConfig.isStateNameAsVariable())
+                        : null,
+                sizeTrackingStateConfig.isEnabled()
+                        ? new MapStateMetrics(
+                                stateName,
+                                sizeTrackingStateConfig.getMetricGroup(),
+                                sizeTrackingStateConfig.getSampleInterval(),
+                                sizeTrackingStateConfig.getHistorySize(),
+                                
sizeTrackingStateConfig.isStateNameAsVariable())
+                        : null);
+        if (valueSerializer != null) {
+            MapSerializer<UK, UV> castedMapSerializer = (MapSerializer<UK, 
UV>) valueSerializer;
+            userKeySerializer = castedMapSerializer.getKeySerializer();
+            userValueSerializer = castedMapSerializer.getValueSerializer();
+        }
     }
 
     @Override
     public UV get(UK key) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnGet()) {
+        if (sizeTrackingStateMetric != null && 
sizeTrackingStateMetric.trackLatencyOnGet()) {
+            sizeTrackingStateMetric.updateMetrics(
+                    MapStateMetrics.MAP_STATE_GET_KEY_SIZE, 
sizeOfKeyAndUserKey(key));
+            sizeTrackingStateMetric.updateMetrics(
+                    MapStateMetrics.MAP_STATE_GET_VALUE_SIZE, 
sizeOfUserValue(original.get(key)));
+        }
+        if (latencyTrackingStateMetric != null && 
latencyTrackingStateMetric.trackLatencyOnGet()) {
             return trackLatencyWithException(
-                    () -> original.get(key), 
MapStateLatencyMetrics.MAP_STATE_GET_LATENCY);
+                    () -> original.get(key), 
MapStateMetrics.MAP_STATE_GET_LATENCY);
         } else {
             return original.get(key);
         }
     }
 
     @Override
     public void put(UK key, UV value) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnPut()) {
+        if (sizeTrackingStateMetric != null && 
sizeTrackingStateMetric.trackLatencyOnPut()) {
+            sizeTrackingStateMetric.updateMetrics(
+                    MapStateMetrics.MAP_STATE_PUT_KEY_SIZE, 
sizeOfKeyAndUserKey(key));
+            sizeTrackingStateMetric.updateMetrics(
+                    MapStateMetrics.MAP_STATE_PUT_VALUE_SIZE, 
sizeOfUserValue(value));
+        }
+        if (latencyTrackingStateMetric != null && 
latencyTrackingStateMetric.trackLatencyOnPut()) {
             trackLatencyWithException(
-                    () -> original.put(key, value), 
MapStateLatencyMetrics.MAP_STATE_PUT_LATENCY);
+                    () -> original.put(key, value), 
MapStateMetrics.MAP_STATE_PUT_LATENCY);
         } else {
             original.put(key, value);
         }
     }
 
     @Override
     public void putAll(Map<UK, UV> map) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnPutAll()) {
+        if (sizeTrackingStateMetric != null && 
sizeTrackingStateMetric.trackLatencyOnPutAll()) {
+            for (Map.Entry<UK, UV> entry : map.entrySet()) {
+                sizeTrackingStateMetric.updateMetrics(
+                        MapStateMetrics.MAP_STATE_PUT_KEY_SIZE,
+                        sizeOfKeyAndUserKey(entry.getKey()));
+                sizeTrackingStateMetric.updateMetrics(
+                        MapStateMetrics.MAP_STATE_PUT_VALUE_SIZE,
+                        sizeOfUserValue(entry.getValue()));
+            }
+        }
+
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnPutAll()) {
             trackLatencyWithException(
-                    () -> original.putAll(map), 
MapStateLatencyMetrics.MAP_STATE_PUT_ALL_LATENCY);
+                    () -> original.putAll(map), 
MapStateMetrics.MAP_STATE_PUT_ALL_LATENCY);
         } else {
             original.putAll(map);
         }
     }
 
     @Override
     public void remove(UK key) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnRemove()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnRemove()) {
             trackLatencyWithException(
-                    () -> original.remove(key), 
MapStateLatencyMetrics.MAP_STATE_REMOVE_LATENCY);
+                    () -> original.remove(key), 
MapStateMetrics.MAP_STATE_REMOVE_LATENCY);
         } else {
             original.remove(key);
         }
     }
 
     @Override
     public boolean contains(UK key) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnContains()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnContains()) {
             return trackLatencyWithException(
-                    () -> original.contains(key),
-                    MapStateLatencyMetrics.MAP_STATE_CONTAINS_LATENCY);
+                    () -> original.contains(key), 
MapStateMetrics.MAP_STATE_CONTAINS_LATENCY);
         } else {
             return original.contains(key);
         }
     }
 
     @Override
     public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnEntriesInit()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnEntriesInit()) {
             return trackLatencyWithException(
                     () -> new IterableWrapper<>(original.entries()),
-                    MapStateLatencyMetrics.MAP_STATE_ENTRIES_INIT_LATENCY);
+                    MapStateMetrics.MAP_STATE_ENTRIES_INIT_LATENCY);
         } else {
             return new IterableWrapper<>(original.entries());
         }
     }
 
     @Override
     public Iterable<UK> keys() throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnKeysInit()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnKeysInit()) {
             return trackLatencyWithException(
                     () -> new IterableWrapper<>(original.keys()),
-                    MapStateLatencyMetrics.MAP_STATE_KEYS_INIT_LATENCY);
+                    MapStateMetrics.MAP_STATE_KEYS_INIT_LATENCY);
         } else {
             return new IterableWrapper<>(original.keys());
         }
     }
 
     @Override
     public Iterable<UV> values() throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnValuesInit()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnValuesInit()) {
             return trackLatencyWithException(
                     () -> new IterableWrapper<>(original.values()),
-                    MapStateLatencyMetrics.MAP_STATE_VALUES_INIT_LATENCY);
+                    MapStateMetrics.MAP_STATE_VALUES_INIT_LATENCY);
         } else {
             return new IterableWrapper<>(original.values());
         }
     }
 
     @Override
     public Iterator<Map.Entry<UK, UV>> iterator() throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnIteratorInit()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnIteratorInit()) {
             return trackLatencyWithException(
                     () -> new IteratorWrapper<>(original.iterator()),
-                    MapStateLatencyMetrics.MAP_STATE_ITERATOR_INIT_LATENCY);
+                    MapStateMetrics.MAP_STATE_ITERATOR_INIT_LATENCY);
         } else {
             return new IteratorWrapper<>(original.iterator());
         }
     }
 
     @Override
     public boolean isEmpty() throws Exception {

Review Comment:
   Different state backends have different implementations for `isEmpty`, 
regardless of whether the value exists or not, a key will be used to query it. 
Perhaps we can consider the size of current key to query?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to