Zakelly commented on a change in pull request #15420:
URL: https://github.com/apache/flink/pull/15420#discussion_r647960969
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java
##########
@@ -71,22 +102,66 @@ public boolean contains(UK key) throws Exception {
@Override
public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
- return delegatedState.entries();
+ Iterator<Map.Entry<UK, UV>> iterator = delegatedState.iterator();
+ return () -> getEntryIterator(iterator);
+ }
+
+ private Iterator<Map.Entry<UK, UV>>
getEntryIterator(Iterator<Map.Entry<UK, UV>> iterator) {
+ final N currentNamespace = getCurrentNamespace();
+ return loggingIterator(
+ new Iterator<Map.Entry<UK, UV>>() {
+ @Override
+ public Map.Entry<UK, UV> next() {
+ return loggingMapEntry(
+ iterator.next(), changeLogger, changeWriter,
currentNamespace);
Review comment:
Yeah, that's what I mean.
Actually, before this change, there is a lambda creation as well (see
[here](https://github.com/apache/flink/commit/e76855e230c997fc1e1507e1a54a6a08b2cf4a31#diff-50480dff268c9d790da42d0edb430b91dff4012c861056faf076d465b6080910L86)).
I make a simple performance test using JMH (I also attach the source code here
[LambdaBench.java.zip](https://github.com/apache/flink/files/6621076/LambdaBench.java.zip)):
```
@Benchmark
public void lambdaGetWriter() throws Exception {
Map.Entry<Integer, Integer> entry = this.entry;
calling(getWriter(entry.getKey(), entry.getValue()));
}
@Benchmark
public void lambdaPredefinedWriter() throws Exception {
Map.Entry<Integer, Integer> entry = this.entry;
calling((out) -> predefinedWriter.accept(entry, out));
}
@Benchmark
public void lambdaInline() throws Exception {
Map.Entry<Integer, Integer> entry = this.entry;
calling((out) -> {
write(entry.getKey(), out);
write(entry.getValue(), out);
});
}
private BiConsumerWithException<
Entry<Integer, Integer>, Void, IOException> predefinedWriter =
(entry, out) -> {
write(entry.getKey(), out);
write(entry.getValue(), out);
};
```
It simulate and compare the calling procedure of the lambda before and after
the change. The result is here:
```
Benchmark Mode Cnt Score Error
Units
LambdaBench.lambdaGetWriter thrpt 10 336522.176 ± 9458.113
ops/ms
LambdaBench.lambdaInline thrpt 10 332052.779 ± 10503.985
ops/ms
LambdaBench.lambdaPredefinedWriter thrpt 10 263978.110 ± 10960.900
ops/ms
```
The ```getWriter``` performs no worse than other ways. However, it's just a
simulation and may be different from how it really is. We may do more
investigation on the memory consumption, GC and performance after completing
the ChangeLogStateBackend.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]