rkhachatryan commented on a change in pull request #15420:
URL: https://github.com/apache/flink/pull/15420#discussion_r647792583
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java
##########
@@ -71,22 +102,66 @@ public boolean contains(UK key) throws Exception {
@Override
public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
- return delegatedState.entries();
+ Iterator<Map.Entry<UK, UV>> iterator = delegatedState.iterator();
+ return () -> getEntryIterator(iterator);
+ }
+
+ private Iterator<Map.Entry<UK, UV>>
getEntryIterator(Iterator<Map.Entry<UK, UV>> iterator) {
+ final N currentNamespace = getCurrentNamespace();
+ return loggingIterator(
+ new Iterator<Map.Entry<UK, UV>>() {
+ @Override
+ public Map.Entry<UK, UV> next() {
+ return loggingMapEntry(
+ iterator.next(), changeLogger, changeWriter,
currentNamespace);
Review comment:
Good idea. I added a commit to #15200 to implement this:
e76855e230c997fc1e1507e1a54a6a08b2cf4a31
Is that what you mean?
I'm a bit concerned about the performance, as we'll be creating a lambda on
each state change. Which can be significant with Heap State backend.
WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]