hejufang commented on code in PR #25837:
URL: https://github.com/apache/flink/pull/25837#discussion_r2009127739
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/MetricsTrackingMapState.java:
##########
@@ -33,133 +37,223 @@
* @param <UK> Type of the user entry key of state
* @param <UV> Type of the user entry value of state
*/
-class LatencyTrackingMapState<K, N, UK, UV>
- extends AbstractLatencyTrackState<
+class MetricsTrackingMapState<K, N, UK, UV>
+ extends AbstractMetricsTrackState<
K,
N,
Map<UK, UV>,
InternalMapState<K, N, UK, UV>,
- LatencyTrackingMapState.MapStateLatencyMetrics>
+ MetricsTrackingMapState.MapStateMetrics>
implements InternalMapState<K, N, UK, UV> {
- LatencyTrackingMapState(
+
+ private TypeSerializer<UK> userKeySerializer;
+ private TypeSerializer<UV> userValueSerializer;
+
+ MetricsTrackingMapState(
String stateName,
InternalMapState<K, N, UK, UV> original,
- LatencyTrackingStateConfig latencyTrackingStateConfig) {
+ KeyedStateBackend<K> keyedStateBackend,
+ LatencyTrackingStateConfig latencyTrackingStateConfig,
+ SizeTrackingStateConfig sizeTrackingStateConfig) {
super(
original,
- new MapStateLatencyMetrics(
- stateName,
- latencyTrackingStateConfig.getMetricGroup(),
- latencyTrackingStateConfig.getSampleInterval(),
- latencyTrackingStateConfig.getHistorySize(),
- latencyTrackingStateConfig.isStateNameAsVariable()));
+ keyedStateBackend,
+ latencyTrackingStateConfig.isEnabled()
+ ? new MapStateMetrics(
+ stateName,
+ latencyTrackingStateConfig.getMetricGroup(),
+ latencyTrackingStateConfig.getSampleInterval(),
+ latencyTrackingStateConfig.getHistorySize(),
+
latencyTrackingStateConfig.isStateNameAsVariable())
+ : null,
+ sizeTrackingStateConfig.isEnabled()
+ ? new MapStateMetrics(
+ stateName,
+ sizeTrackingStateConfig.getMetricGroup(),
+ sizeTrackingStateConfig.getSampleInterval(),
+ sizeTrackingStateConfig.getHistorySize(),
+
sizeTrackingStateConfig.isStateNameAsVariable())
+ : null);
+ if (valueSerializer != null) {
+ MapSerializer<UK, UV> castedMapSerializer = (MapSerializer<UK,
UV>) valueSerializer;
+ userKeySerializer = castedMapSerializer.getKeySerializer();
+ userValueSerializer = castedMapSerializer.getValueSerializer();
+ }
}
@Override
public UV get(UK key) throws Exception {
- if (latencyTrackingStateMetric.trackLatencyOnGet()) {
+ if (sizeTrackingStateMetric != null &&
sizeTrackingStateMetric.trackLatencyOnGet()) {
+ sizeTrackingStateMetric.updateMetrics(
+ MapStateMetrics.MAP_STATE_GET_KEY_SIZE,
sizeOfKeyAndUserKey(key));
+ sizeTrackingStateMetric.updateMetrics(
+ MapStateMetrics.MAP_STATE_GET_VALUE_SIZE,
sizeOfUserValue(original.get(key)));
+ }
+ if (latencyTrackingStateMetric != null &&
latencyTrackingStateMetric.trackLatencyOnGet()) {
return trackLatencyWithException(
- () -> original.get(key),
MapStateLatencyMetrics.MAP_STATE_GET_LATENCY);
+ () -> original.get(key),
MapStateMetrics.MAP_STATE_GET_LATENCY);
} else {
return original.get(key);
}
}
@Override
public void put(UK key, UV value) throws Exception {
- if (latencyTrackingStateMetric.trackLatencyOnPut()) {
+ if (sizeTrackingStateMetric != null &&
sizeTrackingStateMetric.trackLatencyOnPut()) {
+ sizeTrackingStateMetric.updateMetrics(
+ MapStateMetrics.MAP_STATE_PUT_KEY_SIZE,
sizeOfKeyAndUserKey(key));
+ sizeTrackingStateMetric.updateMetrics(
+ MapStateMetrics.MAP_STATE_PUT_VALUE_SIZE,
sizeOfUserValue(value));
+ }
+ if (latencyTrackingStateMetric != null &&
latencyTrackingStateMetric.trackLatencyOnPut()) {
trackLatencyWithException(
- () -> original.put(key, value),
MapStateLatencyMetrics.MAP_STATE_PUT_LATENCY);
+ () -> original.put(key, value),
MapStateMetrics.MAP_STATE_PUT_LATENCY);
} else {
original.put(key, value);
}
}
@Override
public void putAll(Map<UK, UV> map) throws Exception {
- if (latencyTrackingStateMetric.trackLatencyOnPutAll()) {
+ if (sizeTrackingStateMetric != null &&
sizeTrackingStateMetric.trackLatencyOnPutAll()) {
+ for (Map.Entry<UK, UV> entry : map.entrySet()) {
+ sizeTrackingStateMetric.updateMetrics(
+ MapStateMetrics.MAP_STATE_PUT_KEY_SIZE,
+ sizeOfKeyAndUserKey(entry.getKey()));
+ sizeTrackingStateMetric.updateMetrics(
+ MapStateMetrics.MAP_STATE_PUT_VALUE_SIZE,
+ sizeOfUserValue(entry.getValue()));
+ }
+ }
+
+ if (latencyTrackingStateMetric != null
+ && latencyTrackingStateMetric.trackLatencyOnPutAll()) {
trackLatencyWithException(
- () -> original.putAll(map),
MapStateLatencyMetrics.MAP_STATE_PUT_ALL_LATENCY);
+ () -> original.putAll(map),
MapStateMetrics.MAP_STATE_PUT_ALL_LATENCY);
} else {
original.putAll(map);
}
}
@Override
public void remove(UK key) throws Exception {
- if (latencyTrackingStateMetric.trackLatencyOnRemove()) {
+ if (latencyTrackingStateMetric != null
+ && latencyTrackingStateMetric.trackLatencyOnRemove()) {
trackLatencyWithException(
- () -> original.remove(key),
MapStateLatencyMetrics.MAP_STATE_REMOVE_LATENCY);
+ () -> original.remove(key),
MapStateMetrics.MAP_STATE_REMOVE_LATENCY);
} else {
original.remove(key);
}
}
@Override
public boolean contains(UK key) throws Exception {
- if (latencyTrackingStateMetric.trackLatencyOnContains()) {
+ if (latencyTrackingStateMetric != null
+ && latencyTrackingStateMetric.trackLatencyOnContains()) {
return trackLatencyWithException(
- () -> original.contains(key),
- MapStateLatencyMetrics.MAP_STATE_CONTAINS_LATENCY);
+ () -> original.contains(key),
MapStateMetrics.MAP_STATE_CONTAINS_LATENCY);
} else {
return original.contains(key);
}
}
@Override
public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
- if (latencyTrackingStateMetric.trackLatencyOnEntriesInit()) {
+ if (latencyTrackingStateMetric != null
+ && latencyTrackingStateMetric.trackLatencyOnEntriesInit()) {
return trackLatencyWithException(
() -> new IterableWrapper<>(original.entries()),
- MapStateLatencyMetrics.MAP_STATE_ENTRIES_INIT_LATENCY);
+ MapStateMetrics.MAP_STATE_ENTRIES_INIT_LATENCY);
} else {
return new IterableWrapper<>(original.entries());
}
}
@Override
public Iterable<UK> keys() throws Exception {
- if (latencyTrackingStateMetric.trackLatencyOnKeysInit()) {
+ if (latencyTrackingStateMetric != null
+ && latencyTrackingStateMetric.trackLatencyOnKeysInit()) {
return trackLatencyWithException(
() -> new IterableWrapper<>(original.keys()),
- MapStateLatencyMetrics.MAP_STATE_KEYS_INIT_LATENCY);
+ MapStateMetrics.MAP_STATE_KEYS_INIT_LATENCY);
} else {
return new IterableWrapper<>(original.keys());
}
}
@Override
public Iterable<UV> values() throws Exception {
Review Comment:
If calculate all the values returned by this method, I believe the overhead
might be quite significant. Therefore, I added sampling logic in the
Iterator.next method, ensuring that all data traversal operations using the
Iterator will be included in the statistics. WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]