hejufang commented on code in PR #25837:
URL: https://github.com/apache/flink/pull/25837#discussion_r2013961018
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/MetricsTrackingMapState.java:
##########
@@ -33,133 +37,223 @@
* @param <UK> Type of the user entry key of state
* @param <UV> Type of the user entry value of state
*/
-class LatencyTrackingMapState<K, N, UK, UV>
- extends AbstractLatencyTrackState<
+class MetricsTrackingMapState<K, N, UK, UV>
+ extends AbstractMetricsTrackState<
K,
N,
Map<UK, UV>,
InternalMapState<K, N, UK, UV>,
- LatencyTrackingMapState.MapStateLatencyMetrics>
+ MetricsTrackingMapState.MapStateMetrics>
implements InternalMapState<K, N, UK, UV> {
- LatencyTrackingMapState(
+
+ private TypeSerializer<UK> userKeySerializer;
+ private TypeSerializer<UV> userValueSerializer;
+
+ MetricsTrackingMapState(
String stateName,
InternalMapState<K, N, UK, UV> original,
- LatencyTrackingStateConfig latencyTrackingStateConfig) {
+ KeyedStateBackend<K> keyedStateBackend,
+ LatencyTrackingStateConfig latencyTrackingStateConfig,
+ SizeTrackingStateConfig sizeTrackingStateConfig) {
super(
original,
- new MapStateLatencyMetrics(
- stateName,
- latencyTrackingStateConfig.getMetricGroup(),
- latencyTrackingStateConfig.getSampleInterval(),
- latencyTrackingStateConfig.getHistorySize(),
- latencyTrackingStateConfig.isStateNameAsVariable()));
+ keyedStateBackend,
+ latencyTrackingStateConfig.isEnabled()
+ ? new MapStateMetrics(
+ stateName,
+ latencyTrackingStateConfig.getMetricGroup(),
+ latencyTrackingStateConfig.getSampleInterval(),
+ latencyTrackingStateConfig.getHistorySize(),
+
latencyTrackingStateConfig.isStateNameAsVariable())
+ : null,
+ sizeTrackingStateConfig.isEnabled()
+ ? new MapStateMetrics(
+ stateName,
+ sizeTrackingStateConfig.getMetricGroup(),
+ sizeTrackingStateConfig.getSampleInterval(),
+ sizeTrackingStateConfig.getHistorySize(),
+
sizeTrackingStateConfig.isStateNameAsVariable())
+ : null);
+ if (valueSerializer != null) {
+ MapSerializer<UK, UV> castedMapSerializer = (MapSerializer<UK,
UV>) valueSerializer;
+ userKeySerializer = castedMapSerializer.getKeySerializer();
+ userValueSerializer = castedMapSerializer.getValueSerializer();
+ }
}
@Override
public UV get(UK key) throws Exception {
- if (latencyTrackingStateMetric.trackLatencyOnGet()) {
+ if (sizeTrackingStateMetric != null &&
sizeTrackingStateMetric.trackLatencyOnGet()) {
+ sizeTrackingStateMetric.updateMetrics(
+ MapStateMetrics.MAP_STATE_GET_KEY_SIZE,
sizeOfKeyAndUserKey(key));
+ sizeTrackingStateMetric.updateMetrics(
+ MapStateMetrics.MAP_STATE_GET_VALUE_SIZE,
sizeOfUserValue(original.get(key)));
+ }
+ if (latencyTrackingStateMetric != null &&
latencyTrackingStateMetric.trackLatencyOnGet()) {
return trackLatencyWithException(
- () -> original.get(key),
MapStateLatencyMetrics.MAP_STATE_GET_LATENCY);
+ () -> original.get(key),
MapStateMetrics.MAP_STATE_GET_LATENCY);
} else {
return original.get(key);
}
}
@Override
public void put(UK key, UV value) throws Exception {
- if (latencyTrackingStateMetric.trackLatencyOnPut()) {
+ if (sizeTrackingStateMetric != null &&
sizeTrackingStateMetric.trackLatencyOnPut()) {
+ sizeTrackingStateMetric.updateMetrics(
+ MapStateMetrics.MAP_STATE_PUT_KEY_SIZE,
sizeOfKeyAndUserKey(key));
+ sizeTrackingStateMetric.updateMetrics(
+ MapStateMetrics.MAP_STATE_PUT_VALUE_SIZE,
sizeOfUserValue(value));
+ }
+ if (latencyTrackingStateMetric != null &&
latencyTrackingStateMetric.trackLatencyOnPut()) {
trackLatencyWithException(
- () -> original.put(key, value),
MapStateLatencyMetrics.MAP_STATE_PUT_LATENCY);
+ () -> original.put(key, value),
MapStateMetrics.MAP_STATE_PUT_LATENCY);
} else {
original.put(key, value);
}
}
@Override
public void putAll(Map<UK, UV> map) throws Exception {
- if (latencyTrackingStateMetric.trackLatencyOnPutAll()) {
+ if (sizeTrackingStateMetric != null &&
sizeTrackingStateMetric.trackLatencyOnPutAll()) {
+ for (Map.Entry<UK, UV> entry : map.entrySet()) {
+ sizeTrackingStateMetric.updateMetrics(
+ MapStateMetrics.MAP_STATE_PUT_KEY_SIZE,
+ sizeOfKeyAndUserKey(entry.getKey()));
+ sizeTrackingStateMetric.updateMetrics(
+ MapStateMetrics.MAP_STATE_PUT_VALUE_SIZE,
+ sizeOfUserValue(entry.getValue()));
+ }
+ }
+
+ if (latencyTrackingStateMetric != null
+ && latencyTrackingStateMetric.trackLatencyOnPutAll()) {
trackLatencyWithException(
- () -> original.putAll(map),
MapStateLatencyMetrics.MAP_STATE_PUT_ALL_LATENCY);
+ () -> original.putAll(map),
MapStateMetrics.MAP_STATE_PUT_ALL_LATENCY);
} else {
original.putAll(map);
}
}
@Override
public void remove(UK key) throws Exception {
- if (latencyTrackingStateMetric.trackLatencyOnRemove()) {
+ if (latencyTrackingStateMetric != null
+ && latencyTrackingStateMetric.trackLatencyOnRemove()) {
trackLatencyWithException(
- () -> original.remove(key),
MapStateLatencyMetrics.MAP_STATE_REMOVE_LATENCY);
+ () -> original.remove(key),
MapStateMetrics.MAP_STATE_REMOVE_LATENCY);
} else {
original.remove(key);
}
}
@Override
public boolean contains(UK key) throws Exception {
- if (latencyTrackingStateMetric.trackLatencyOnContains()) {
+ if (latencyTrackingStateMetric != null
+ && latencyTrackingStateMetric.trackLatencyOnContains()) {
return trackLatencyWithException(
- () -> original.contains(key),
- MapStateLatencyMetrics.MAP_STATE_CONTAINS_LATENCY);
+ () -> original.contains(key),
MapStateMetrics.MAP_STATE_CONTAINS_LATENCY);
} else {
return original.contains(key);
}
}
@Override
public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
- if (latencyTrackingStateMetric.trackLatencyOnEntriesInit()) {
+ if (latencyTrackingStateMetric != null
+ && latencyTrackingStateMetric.trackLatencyOnEntriesInit()) {
return trackLatencyWithException(
() -> new IterableWrapper<>(original.entries()),
- MapStateLatencyMetrics.MAP_STATE_ENTRIES_INIT_LATENCY);
+ MapStateMetrics.MAP_STATE_ENTRIES_INIT_LATENCY);
} else {
return new IterableWrapper<>(original.entries());
}
}
@Override
public Iterable<UK> keys() throws Exception {
- if (latencyTrackingStateMetric.trackLatencyOnKeysInit()) {
+ if (latencyTrackingStateMetric != null
+ && latencyTrackingStateMetric.trackLatencyOnKeysInit()) {
return trackLatencyWithException(
() -> new IterableWrapper<>(original.keys()),
- MapStateLatencyMetrics.MAP_STATE_KEYS_INIT_LATENCY);
+ MapStateMetrics.MAP_STATE_KEYS_INIT_LATENCY);
} else {
return new IterableWrapper<>(original.keys());
}
}
@Override
public Iterable<UV> values() throws Exception {
- if (latencyTrackingStateMetric.trackLatencyOnValuesInit()) {
+ if (latencyTrackingStateMetric != null
+ && latencyTrackingStateMetric.trackLatencyOnValuesInit()) {
return trackLatencyWithException(
() -> new IterableWrapper<>(original.values()),
- MapStateLatencyMetrics.MAP_STATE_VALUES_INIT_LATENCY);
+ MapStateMetrics.MAP_STATE_VALUES_INIT_LATENCY);
} else {
return new IterableWrapper<>(original.values());
}
}
@Override
public Iterator<Map.Entry<UK, UV>> iterator() throws Exception {
- if (latencyTrackingStateMetric.trackLatencyOnIteratorInit()) {
+ if (latencyTrackingStateMetric != null
+ && latencyTrackingStateMetric.trackLatencyOnIteratorInit()) {
return trackLatencyWithException(
() -> new IteratorWrapper<>(original.iterator()),
- MapStateLatencyMetrics.MAP_STATE_ITERATOR_INIT_LATENCY);
+ MapStateMetrics.MAP_STATE_ITERATOR_INIT_LATENCY);
} else {
return new IteratorWrapper<>(original.iterator());
}
}
@Override
public boolean isEmpty() throws Exception {
Review Comment:
Thank you for your suggestion. I have added the metric
mapStateIsEmptyKeySize.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]