rkhachatryan commented on a change in pull request #15200:
URL: https://github.com/apache/flink/pull/15200#discussion_r647574512
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogListState.java
##########
@@ -39,53 +44,83 @@
extends AbstractChangelogState<K, N, List<V>, InternalListState<K, N,
V>>
implements InternalListState<K, N, V> {
- ChangelogListState(InternalListState<K, N, V> delegatedState) {
- super(delegatedState);
+ ChangelogListState(
+ InternalListState<K, N, V> delegatedState,
+ KvStateChangeLogger<List<V>, N> changeLogger) {
+ super(delegatedState, changeLogger);
}
@Override
public void update(List<V> values) throws Exception {
+ changeLogger.stateUpdated(values, getCurrentNamespace());
delegatedState.update(values);
}
@Override
public void addAll(List<V> values) throws Exception {
+ changeLogger.stateAdded(values, getCurrentNamespace());
delegatedState.addAll(values);
}
@Override
public void updateInternal(List<V> valueToStore) throws Exception {
+ changeLogger.stateUpdated(valueToStore, getCurrentNamespace());
delegatedState.updateInternal(valueToStore);
}
Review comment:
TLDR: adding a new operation `SET_INTERNAL`
As discussed offline: `update` treats the provided value as user-supplied
and so it performs some checks and protective copying (e.g. for heap state
backend).
If left as is, then on recovery a different method might be called
(`updateInternal` instead of `update` or vice versa). It probably won't cause
any issues for now, but I'll separate these operations to be on the safe side.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]