fredia commented on code in PR #25837: URL: https://github.com/apache/flink/pull/25837#discussion_r2006771109
########## flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/MetricsTrackingStateFactory.java: ########## @@ -34,35 +35,47 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -/** Factory to create {@link AbstractLatencyTrackState}. */ -public class LatencyTrackingStateFactory< +/** Factory to create {@link AbstractMetricsTrackState}. */ +public class MetricsTrackingStateFactory< K, N, V, S extends State, IS extends InternalKvState<K, N, ?>> { private final InternalKvState<K, N, ?> kvState; private final StateDescriptor<S, V> stateDescriptor; private final LatencyTrackingStateConfig latencyTrackingStateConfig; + private final SizeTrackingStateConfig sizeTrackingStateConfig; private final Map<StateDescriptor.Type, SupplierWithException<IS, Exception>> stateFactories; + private final KeyedStateBackend keyedStateBackend; - private LatencyTrackingStateFactory( + private MetricsTrackingStateFactory( InternalKvState<K, N, ?> kvState, + KeyedStateBackend<K> keyedStateBackend, StateDescriptor<S, V> stateDescriptor, - LatencyTrackingStateConfig latencyTrackingStateConfig) { + LatencyTrackingStateConfig latencyTrackingStateConfig, + SizeTrackingStateConfig sizeTrackingStateConfig) { this.kvState = kvState; + this.keyedStateBackend = keyedStateBackend; this.stateDescriptor = stateDescriptor; this.latencyTrackingStateConfig = latencyTrackingStateConfig; + this.sizeTrackingStateConfig = sizeTrackingStateConfig; this.stateFactories = createStateFactories(); } /** Create latency tracking state if enabled. */ public static <K, N, V, S extends State> InternalKvState<K, N, ?> createStateAndWrapWithLatencyTrackingIfEnabled( Review Comment: nit: Rename to `createStateAndWrapWithMetricsTrackingIfEnabled` ########## flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/AbstractMetricsTrackState.java: ########## @@ -137,18 +177,68 @@ protected void trackLatencyWithException( long startTime = System.nanoTime(); runnable.run(); long latency = System.nanoTime() - startTime; - latencyTrackingStateMetric.updateLatency(latencyLabel, latency); + latencyTrackingStateMetric.updateMetrics(latencyLabel, latency); } protected void trackLatency(Runnable runnable, String latencyLabel) { long startTime = System.nanoTime(); runnable.run(); long latency = System.nanoTime() - startTime; - latencyTrackingStateMetric.updateLatency(latencyLabel, latency); + latencyTrackingStateMetric.updateMetrics(latencyLabel, latency); + } + + protected K getCurrentKey() { + return keyedStateBackend.getCurrentKey(); + } + + public final N getCurrentNamespace() { + return currentNamespace; + } + + protected long sizeOfKey() throws IOException { + if (keySerializer != null && keySerializer.getLength() == -1) { + try { + keySerializer.serialize(keyedStateBackend.getCurrentKey(), outputSerializer); + keySize = outputSerializer.length(); + } finally { + outputSerializer.clear(); + } + } + + if (namespaceSerializer != null && namespaceSerializer.getLength() == -1) { + try { + namespaceSerializer.serialize(getCurrentNamespace(), outputSerializer); + namespaceSize = outputSerializer.length(); + } finally { + outputSerializer.clear(); + } + } + + return keySize + namespaceSize; + } + + protected long sizeOfValue(V value) throws IOException { + if (valueSerializer == null || value == null) { + return 0; Review Comment: Add `valueSize=0`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/AbstractMetricsTrackState.java: ########## @@ -28,24 +30,60 @@ import java.util.function.Supplier; /** - * Abstract implementation of latency tracking state. + * Abstract implementation of metrics tracking state. * * @param <K> The type of key the state is associated to * @param <N> The type of the namespace * @param <V> Type of the user entry value of state * @param <S> Type of the internal kv state - * @param <LSM> Type of the latency tracking state metrics + * @param <LSM> Type of the metrics tracking state metrics */ -class AbstractLatencyTrackState< - K, N, V, S extends InternalKvState<K, N, V>, LSM extends StateLatencyMetricBase> +class AbstractMetricsTrackState< + K, N, V, S extends InternalKvState<K, N, V>, LSM extends StateMetricBase> implements InternalKvState<K, N, V> { protected S original; protected LSM latencyTrackingStateMetric; - - AbstractLatencyTrackState(S original, LSM latencyTrackingStateMetric) { + protected LSM sizeTrackingStateMetric; + protected KeyedStateBackend<K> keyedStateBackend; + protected N currentNamespace; + protected final TypeSerializer<K> keySerializer; + protected final TypeSerializer<N> namespaceSerializer; + protected final TypeSerializer<V> valueSerializer; + protected final DataOutputSerializer outputSerializer; + protected long namespaceSize; + protected long keySize; + protected long valueSize; + + AbstractMetricsTrackState( + S original, + KeyedStateBackend<K> keyedStateBackend, + LSM latencyTrackingStateMetric, + LSM sizeTrackingStateMetric) { this.original = original; + this.keyedStateBackend = keyedStateBackend; this.latencyTrackingStateMetric = latencyTrackingStateMetric; + this.sizeTrackingStateMetric = sizeTrackingStateMetric; + this.keySerializer = getKeySerializer() != null ? getKeySerializer().duplicate() : null; + this.namespaceSerializer = + getNamespaceSerializer() != null ? getNamespaceSerializer().duplicate() : null; + this.valueSerializer = + getValueSerializer() != null ? getValueSerializer().duplicate() : null; + this.outputSerializer = new DataOutputSerializer(64); + initStateSize(); + } + + private void initStateSize() { + if (keySerializer != null) { + this.keySize = keySerializer.getLength() == -1 ? -1L : keySerializer.getLength(); Review Comment: How about ```suggestion this.keySize = keySerializer.getLength(); ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/MetricsTrackingMapState.java: ########## @@ -33,133 +37,223 @@ * @param <UK> Type of the user entry key of state * @param <UV> Type of the user entry value of state */ -class LatencyTrackingMapState<K, N, UK, UV> - extends AbstractLatencyTrackState< +class MetricsTrackingMapState<K, N, UK, UV> + extends AbstractMetricsTrackState< K, N, Map<UK, UV>, InternalMapState<K, N, UK, UV>, - LatencyTrackingMapState.MapStateLatencyMetrics> + MetricsTrackingMapState.MapStateMetrics> implements InternalMapState<K, N, UK, UV> { - LatencyTrackingMapState( + + private TypeSerializer<UK> userKeySerializer; + private TypeSerializer<UV> userValueSerializer; + + MetricsTrackingMapState( String stateName, InternalMapState<K, N, UK, UV> original, - LatencyTrackingStateConfig latencyTrackingStateConfig) { + KeyedStateBackend<K> keyedStateBackend, + LatencyTrackingStateConfig latencyTrackingStateConfig, + SizeTrackingStateConfig sizeTrackingStateConfig) { super( original, - new MapStateLatencyMetrics( - stateName, - latencyTrackingStateConfig.getMetricGroup(), - latencyTrackingStateConfig.getSampleInterval(), - latencyTrackingStateConfig.getHistorySize(), - latencyTrackingStateConfig.isStateNameAsVariable())); + keyedStateBackend, + latencyTrackingStateConfig.isEnabled() + ? new MapStateMetrics( + stateName, + latencyTrackingStateConfig.getMetricGroup(), + latencyTrackingStateConfig.getSampleInterval(), + latencyTrackingStateConfig.getHistorySize(), + latencyTrackingStateConfig.isStateNameAsVariable()) + : null, + sizeTrackingStateConfig.isEnabled() + ? new MapStateMetrics( + stateName, + sizeTrackingStateConfig.getMetricGroup(), + sizeTrackingStateConfig.getSampleInterval(), + sizeTrackingStateConfig.getHistorySize(), + sizeTrackingStateConfig.isStateNameAsVariable()) + : null); + if (valueSerializer != null) { + MapSerializer<UK, UV> castedMapSerializer = (MapSerializer<UK, UV>) valueSerializer; + userKeySerializer = castedMapSerializer.getKeySerializer(); + userValueSerializer = castedMapSerializer.getValueSerializer(); + } } @Override public UV get(UK key) throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnGet()) { + if (sizeTrackingStateMetric != null && sizeTrackingStateMetric.trackLatencyOnGet()) { + sizeTrackingStateMetric.updateMetrics( + MapStateMetrics.MAP_STATE_GET_KEY_SIZE, sizeOfKeyAndUserKey(key)); + sizeTrackingStateMetric.updateMetrics( + MapStateMetrics.MAP_STATE_GET_VALUE_SIZE, sizeOfUserValue(original.get(key))); + } + if (latencyTrackingStateMetric != null && latencyTrackingStateMetric.trackLatencyOnGet()) { return trackLatencyWithException( - () -> original.get(key), MapStateLatencyMetrics.MAP_STATE_GET_LATENCY); + () -> original.get(key), MapStateMetrics.MAP_STATE_GET_LATENCY); } else { return original.get(key); } } @Override public void put(UK key, UV value) throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnPut()) { + if (sizeTrackingStateMetric != null && sizeTrackingStateMetric.trackLatencyOnPut()) { + sizeTrackingStateMetric.updateMetrics( + MapStateMetrics.MAP_STATE_PUT_KEY_SIZE, sizeOfKeyAndUserKey(key)); + sizeTrackingStateMetric.updateMetrics( + MapStateMetrics.MAP_STATE_PUT_VALUE_SIZE, sizeOfUserValue(value)); + } + if (latencyTrackingStateMetric != null && latencyTrackingStateMetric.trackLatencyOnPut()) { trackLatencyWithException( - () -> original.put(key, value), MapStateLatencyMetrics.MAP_STATE_PUT_LATENCY); + () -> original.put(key, value), MapStateMetrics.MAP_STATE_PUT_LATENCY); } else { original.put(key, value); } } @Override public void putAll(Map<UK, UV> map) throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnPutAll()) { + if (sizeTrackingStateMetric != null && sizeTrackingStateMetric.trackLatencyOnPutAll()) { + for (Map.Entry<UK, UV> entry : map.entrySet()) { + sizeTrackingStateMetric.updateMetrics( + MapStateMetrics.MAP_STATE_PUT_KEY_SIZE, + sizeOfKeyAndUserKey(entry.getKey())); + sizeTrackingStateMetric.updateMetrics( + MapStateMetrics.MAP_STATE_PUT_VALUE_SIZE, + sizeOfUserValue(entry.getValue())); + } + } + + if (latencyTrackingStateMetric != null + && latencyTrackingStateMetric.trackLatencyOnPutAll()) { trackLatencyWithException( - () -> original.putAll(map), MapStateLatencyMetrics.MAP_STATE_PUT_ALL_LATENCY); + () -> original.putAll(map), MapStateMetrics.MAP_STATE_PUT_ALL_LATENCY); } else { original.putAll(map); } } @Override public void remove(UK key) throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnRemove()) { + if (latencyTrackingStateMetric != null + && latencyTrackingStateMetric.trackLatencyOnRemove()) { trackLatencyWithException( - () -> original.remove(key), MapStateLatencyMetrics.MAP_STATE_REMOVE_LATENCY); + () -> original.remove(key), MapStateMetrics.MAP_STATE_REMOVE_LATENCY); } else { original.remove(key); } } @Override public boolean contains(UK key) throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnContains()) { + if (latencyTrackingStateMetric != null + && latencyTrackingStateMetric.trackLatencyOnContains()) { return trackLatencyWithException( - () -> original.contains(key), - MapStateLatencyMetrics.MAP_STATE_CONTAINS_LATENCY); + () -> original.contains(key), MapStateMetrics.MAP_STATE_CONTAINS_LATENCY); } else { return original.contains(key); } } @Override public Iterable<Map.Entry<UK, UV>> entries() throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnEntriesInit()) { + if (latencyTrackingStateMetric != null + && latencyTrackingStateMetric.trackLatencyOnEntriesInit()) { return trackLatencyWithException( () -> new IterableWrapper<>(original.entries()), - MapStateLatencyMetrics.MAP_STATE_ENTRIES_INIT_LATENCY); + MapStateMetrics.MAP_STATE_ENTRIES_INIT_LATENCY); } else { return new IterableWrapper<>(original.entries()); } } @Override public Iterable<UK> keys() throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnKeysInit()) { + if (latencyTrackingStateMetric != null + && latencyTrackingStateMetric.trackLatencyOnKeysInit()) { return trackLatencyWithException( () -> new IterableWrapper<>(original.keys()), - MapStateLatencyMetrics.MAP_STATE_KEYS_INIT_LATENCY); + MapStateMetrics.MAP_STATE_KEYS_INIT_LATENCY); } else { return new IterableWrapper<>(original.keys()); } } @Override public Iterable<UV> values() throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnValuesInit()) { + if (latencyTrackingStateMetric != null + && latencyTrackingStateMetric.trackLatencyOnValuesInit()) { return trackLatencyWithException( () -> new IterableWrapper<>(original.values()), - MapStateLatencyMetrics.MAP_STATE_VALUES_INIT_LATENCY); + MapStateMetrics.MAP_STATE_VALUES_INIT_LATENCY); } else { return new IterableWrapper<>(original.values()); } } @Override public Iterator<Map.Entry<UK, UV>> iterator() throws Exception { Review Comment: Same as `values()` ########## flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/MetricsTrackingMapState.java: ########## @@ -33,133 +37,223 @@ * @param <UK> Type of the user entry key of state * @param <UV> Type of the user entry value of state */ -class LatencyTrackingMapState<K, N, UK, UV> - extends AbstractLatencyTrackState< +class MetricsTrackingMapState<K, N, UK, UV> + extends AbstractMetricsTrackState< K, N, Map<UK, UV>, InternalMapState<K, N, UK, UV>, - LatencyTrackingMapState.MapStateLatencyMetrics> + MetricsTrackingMapState.MapStateMetrics> implements InternalMapState<K, N, UK, UV> { - LatencyTrackingMapState( + + private TypeSerializer<UK> userKeySerializer; + private TypeSerializer<UV> userValueSerializer; + + MetricsTrackingMapState( String stateName, InternalMapState<K, N, UK, UV> original, - LatencyTrackingStateConfig latencyTrackingStateConfig) { + KeyedStateBackend<K> keyedStateBackend, + LatencyTrackingStateConfig latencyTrackingStateConfig, + SizeTrackingStateConfig sizeTrackingStateConfig) { super( original, - new MapStateLatencyMetrics( - stateName, - latencyTrackingStateConfig.getMetricGroup(), - latencyTrackingStateConfig.getSampleInterval(), - latencyTrackingStateConfig.getHistorySize(), - latencyTrackingStateConfig.isStateNameAsVariable())); + keyedStateBackend, + latencyTrackingStateConfig.isEnabled() + ? new MapStateMetrics( + stateName, + latencyTrackingStateConfig.getMetricGroup(), + latencyTrackingStateConfig.getSampleInterval(), + latencyTrackingStateConfig.getHistorySize(), + latencyTrackingStateConfig.isStateNameAsVariable()) + : null, + sizeTrackingStateConfig.isEnabled() + ? new MapStateMetrics( + stateName, + sizeTrackingStateConfig.getMetricGroup(), + sizeTrackingStateConfig.getSampleInterval(), + sizeTrackingStateConfig.getHistorySize(), + sizeTrackingStateConfig.isStateNameAsVariable()) + : null); + if (valueSerializer != null) { + MapSerializer<UK, UV> castedMapSerializer = (MapSerializer<UK, UV>) valueSerializer; + userKeySerializer = castedMapSerializer.getKeySerializer(); + userValueSerializer = castedMapSerializer.getValueSerializer(); + } } @Override public UV get(UK key) throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnGet()) { + if (sizeTrackingStateMetric != null && sizeTrackingStateMetric.trackLatencyOnGet()) { + sizeTrackingStateMetric.updateMetrics( + MapStateMetrics.MAP_STATE_GET_KEY_SIZE, sizeOfKeyAndUserKey(key)); + sizeTrackingStateMetric.updateMetrics( + MapStateMetrics.MAP_STATE_GET_VALUE_SIZE, sizeOfUserValue(original.get(key))); + } + if (latencyTrackingStateMetric != null && latencyTrackingStateMetric.trackLatencyOnGet()) { return trackLatencyWithException( - () -> original.get(key), MapStateLatencyMetrics.MAP_STATE_GET_LATENCY); + () -> original.get(key), MapStateMetrics.MAP_STATE_GET_LATENCY); } else { return original.get(key); } } @Override public void put(UK key, UV value) throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnPut()) { + if (sizeTrackingStateMetric != null && sizeTrackingStateMetric.trackLatencyOnPut()) { + sizeTrackingStateMetric.updateMetrics( + MapStateMetrics.MAP_STATE_PUT_KEY_SIZE, sizeOfKeyAndUserKey(key)); + sizeTrackingStateMetric.updateMetrics( + MapStateMetrics.MAP_STATE_PUT_VALUE_SIZE, sizeOfUserValue(value)); + } + if (latencyTrackingStateMetric != null && latencyTrackingStateMetric.trackLatencyOnPut()) { trackLatencyWithException( - () -> original.put(key, value), MapStateLatencyMetrics.MAP_STATE_PUT_LATENCY); + () -> original.put(key, value), MapStateMetrics.MAP_STATE_PUT_LATENCY); } else { original.put(key, value); } } @Override public void putAll(Map<UK, UV> map) throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnPutAll()) { + if (sizeTrackingStateMetric != null && sizeTrackingStateMetric.trackLatencyOnPutAll()) { + for (Map.Entry<UK, UV> entry : map.entrySet()) { + sizeTrackingStateMetric.updateMetrics( + MapStateMetrics.MAP_STATE_PUT_KEY_SIZE, + sizeOfKeyAndUserKey(entry.getKey())); + sizeTrackingStateMetric.updateMetrics( + MapStateMetrics.MAP_STATE_PUT_VALUE_SIZE, + sizeOfUserValue(entry.getValue())); + } + } + + if (latencyTrackingStateMetric != null + && latencyTrackingStateMetric.trackLatencyOnPutAll()) { trackLatencyWithException( - () -> original.putAll(map), MapStateLatencyMetrics.MAP_STATE_PUT_ALL_LATENCY); + () -> original.putAll(map), MapStateMetrics.MAP_STATE_PUT_ALL_LATENCY); } else { original.putAll(map); } } @Override public void remove(UK key) throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnRemove()) { + if (latencyTrackingStateMetric != null + && latencyTrackingStateMetric.trackLatencyOnRemove()) { trackLatencyWithException( - () -> original.remove(key), MapStateLatencyMetrics.MAP_STATE_REMOVE_LATENCY); + () -> original.remove(key), MapStateMetrics.MAP_STATE_REMOVE_LATENCY); } else { original.remove(key); } } @Override public boolean contains(UK key) throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnContains()) { + if (latencyTrackingStateMetric != null + && latencyTrackingStateMetric.trackLatencyOnContains()) { return trackLatencyWithException( - () -> original.contains(key), - MapStateLatencyMetrics.MAP_STATE_CONTAINS_LATENCY); + () -> original.contains(key), MapStateMetrics.MAP_STATE_CONTAINS_LATENCY); } else { return original.contains(key); } } @Override public Iterable<Map.Entry<UK, UV>> entries() throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnEntriesInit()) { + if (latencyTrackingStateMetric != null + && latencyTrackingStateMetric.trackLatencyOnEntriesInit()) { return trackLatencyWithException( () -> new IterableWrapper<>(original.entries()), - MapStateLatencyMetrics.MAP_STATE_ENTRIES_INIT_LATENCY); + MapStateMetrics.MAP_STATE_ENTRIES_INIT_LATENCY); } else { return new IterableWrapper<>(original.entries()); } } @Override public Iterable<UK> keys() throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnKeysInit()) { + if (latencyTrackingStateMetric != null + && latencyTrackingStateMetric.trackLatencyOnKeysInit()) { return trackLatencyWithException( () -> new IterableWrapper<>(original.keys()), - MapStateLatencyMetrics.MAP_STATE_KEYS_INIT_LATENCY); + MapStateMetrics.MAP_STATE_KEYS_INIT_LATENCY); } else { return new IterableWrapper<>(original.keys()); } } @Override public Iterable<UV> values() throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnValuesInit()) { + if (latencyTrackingStateMetric != null + && latencyTrackingStateMetric.trackLatencyOnValuesInit()) { return trackLatencyWithException( () -> new IterableWrapper<>(original.values()), - MapStateLatencyMetrics.MAP_STATE_VALUES_INIT_LATENCY); + MapStateMetrics.MAP_STATE_VALUES_INIT_LATENCY); } else { return new IterableWrapper<>(original.values()); } } @Override public Iterator<Map.Entry<UK, UV>> iterator() throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnIteratorInit()) { + if (latencyTrackingStateMetric != null + && latencyTrackingStateMetric.trackLatencyOnIteratorInit()) { return trackLatencyWithException( () -> new IteratorWrapper<>(original.iterator()), - MapStateLatencyMetrics.MAP_STATE_ITERATOR_INIT_LATENCY); + MapStateMetrics.MAP_STATE_ITERATOR_INIT_LATENCY); } else { return new IteratorWrapper<>(original.iterator()); } } @Override public boolean isEmpty() throws Exception { Review Comment: Same as `values()`, and other methods may also should update state-size metrics. ########## flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/MetricsTrackingMapState.java: ########## @@ -33,133 +37,223 @@ * @param <UK> Type of the user entry key of state * @param <UV> Type of the user entry value of state */ -class LatencyTrackingMapState<K, N, UK, UV> - extends AbstractLatencyTrackState< +class MetricsTrackingMapState<K, N, UK, UV> + extends AbstractMetricsTrackState< K, N, Map<UK, UV>, InternalMapState<K, N, UK, UV>, - LatencyTrackingMapState.MapStateLatencyMetrics> + MetricsTrackingMapState.MapStateMetrics> implements InternalMapState<K, N, UK, UV> { - LatencyTrackingMapState( + + private TypeSerializer<UK> userKeySerializer; + private TypeSerializer<UV> userValueSerializer; + + MetricsTrackingMapState( String stateName, InternalMapState<K, N, UK, UV> original, - LatencyTrackingStateConfig latencyTrackingStateConfig) { + KeyedStateBackend<K> keyedStateBackend, + LatencyTrackingStateConfig latencyTrackingStateConfig, + SizeTrackingStateConfig sizeTrackingStateConfig) { super( original, - new MapStateLatencyMetrics( - stateName, - latencyTrackingStateConfig.getMetricGroup(), - latencyTrackingStateConfig.getSampleInterval(), - latencyTrackingStateConfig.getHistorySize(), - latencyTrackingStateConfig.isStateNameAsVariable())); + keyedStateBackend, + latencyTrackingStateConfig.isEnabled() + ? new MapStateMetrics( + stateName, + latencyTrackingStateConfig.getMetricGroup(), + latencyTrackingStateConfig.getSampleInterval(), + latencyTrackingStateConfig.getHistorySize(), + latencyTrackingStateConfig.isStateNameAsVariable()) + : null, + sizeTrackingStateConfig.isEnabled() + ? new MapStateMetrics( + stateName, + sizeTrackingStateConfig.getMetricGroup(), + sizeTrackingStateConfig.getSampleInterval(), + sizeTrackingStateConfig.getHistorySize(), + sizeTrackingStateConfig.isStateNameAsVariable()) + : null); + if (valueSerializer != null) { + MapSerializer<UK, UV> castedMapSerializer = (MapSerializer<UK, UV>) valueSerializer; + userKeySerializer = castedMapSerializer.getKeySerializer(); + userValueSerializer = castedMapSerializer.getValueSerializer(); + } } @Override public UV get(UK key) throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnGet()) { + if (sizeTrackingStateMetric != null && sizeTrackingStateMetric.trackLatencyOnGet()) { + sizeTrackingStateMetric.updateMetrics( + MapStateMetrics.MAP_STATE_GET_KEY_SIZE, sizeOfKeyAndUserKey(key)); + sizeTrackingStateMetric.updateMetrics( + MapStateMetrics.MAP_STATE_GET_VALUE_SIZE, sizeOfUserValue(original.get(key))); + } + if (latencyTrackingStateMetric != null && latencyTrackingStateMetric.trackLatencyOnGet()) { return trackLatencyWithException( - () -> original.get(key), MapStateLatencyMetrics.MAP_STATE_GET_LATENCY); + () -> original.get(key), MapStateMetrics.MAP_STATE_GET_LATENCY); } else { return original.get(key); } } @Override public void put(UK key, UV value) throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnPut()) { + if (sizeTrackingStateMetric != null && sizeTrackingStateMetric.trackLatencyOnPut()) { + sizeTrackingStateMetric.updateMetrics( + MapStateMetrics.MAP_STATE_PUT_KEY_SIZE, sizeOfKeyAndUserKey(key)); + sizeTrackingStateMetric.updateMetrics( + MapStateMetrics.MAP_STATE_PUT_VALUE_SIZE, sizeOfUserValue(value)); + } + if (latencyTrackingStateMetric != null && latencyTrackingStateMetric.trackLatencyOnPut()) { trackLatencyWithException( - () -> original.put(key, value), MapStateLatencyMetrics.MAP_STATE_PUT_LATENCY); + () -> original.put(key, value), MapStateMetrics.MAP_STATE_PUT_LATENCY); } else { original.put(key, value); } } @Override public void putAll(Map<UK, UV> map) throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnPutAll()) { + if (sizeTrackingStateMetric != null && sizeTrackingStateMetric.trackLatencyOnPutAll()) { + for (Map.Entry<UK, UV> entry : map.entrySet()) { + sizeTrackingStateMetric.updateMetrics( + MapStateMetrics.MAP_STATE_PUT_KEY_SIZE, + sizeOfKeyAndUserKey(entry.getKey())); + sizeTrackingStateMetric.updateMetrics( + MapStateMetrics.MAP_STATE_PUT_VALUE_SIZE, + sizeOfUserValue(entry.getValue())); + } + } + + if (latencyTrackingStateMetric != null + && latencyTrackingStateMetric.trackLatencyOnPutAll()) { trackLatencyWithException( - () -> original.putAll(map), MapStateLatencyMetrics.MAP_STATE_PUT_ALL_LATENCY); + () -> original.putAll(map), MapStateMetrics.MAP_STATE_PUT_ALL_LATENCY); } else { original.putAll(map); } } @Override public void remove(UK key) throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnRemove()) { + if (latencyTrackingStateMetric != null + && latencyTrackingStateMetric.trackLatencyOnRemove()) { trackLatencyWithException( - () -> original.remove(key), MapStateLatencyMetrics.MAP_STATE_REMOVE_LATENCY); + () -> original.remove(key), MapStateMetrics.MAP_STATE_REMOVE_LATENCY); } else { original.remove(key); } } @Override public boolean contains(UK key) throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnContains()) { + if (latencyTrackingStateMetric != null + && latencyTrackingStateMetric.trackLatencyOnContains()) { return trackLatencyWithException( - () -> original.contains(key), - MapStateLatencyMetrics.MAP_STATE_CONTAINS_LATENCY); + () -> original.contains(key), MapStateMetrics.MAP_STATE_CONTAINS_LATENCY); } else { return original.contains(key); } } @Override public Iterable<Map.Entry<UK, UV>> entries() throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnEntriesInit()) { + if (latencyTrackingStateMetric != null + && latencyTrackingStateMetric.trackLatencyOnEntriesInit()) { return trackLatencyWithException( () -> new IterableWrapper<>(original.entries()), - MapStateLatencyMetrics.MAP_STATE_ENTRIES_INIT_LATENCY); + MapStateMetrics.MAP_STATE_ENTRIES_INIT_LATENCY); } else { return new IterableWrapper<>(original.entries()); } } @Override public Iterable<UK> keys() throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnKeysInit()) { + if (latencyTrackingStateMetric != null + && latencyTrackingStateMetric.trackLatencyOnKeysInit()) { return trackLatencyWithException( () -> new IterableWrapper<>(original.keys()), - MapStateLatencyMetrics.MAP_STATE_KEYS_INIT_LATENCY); + MapStateMetrics.MAP_STATE_KEYS_INIT_LATENCY); } else { return new IterableWrapper<>(original.keys()); } } @Override public Iterable<UV> values() throws Exception { Review Comment: Should `MAP_STATE_GET_KEY_SIZE/MAP_STATE_GET_VALUE_SIZE` be updated here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org