Zakelly commented on a change in pull request #15420:
URL: https://github.com/apache/flink/pull/15420#discussion_r647960969



##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java
##########
@@ -71,22 +102,66 @@ public boolean contains(UK key) throws Exception {
 
     @Override
     public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
-        return delegatedState.entries();
+        Iterator<Map.Entry<UK, UV>> iterator = delegatedState.iterator();
+        return () -> getEntryIterator(iterator);
+    }
+
+    private Iterator<Map.Entry<UK, UV>> 
getEntryIterator(Iterator<Map.Entry<UK, UV>> iterator) {
+        final N currentNamespace = getCurrentNamespace();
+        return loggingIterator(
+                new Iterator<Map.Entry<UK, UV>>() {
+                    @Override
+                    public Map.Entry<UK, UV> next() {
+                        return loggingMapEntry(
+                                iterator.next(), changeLogger, changeWriter, 
currentNamespace);

Review comment:
       Yeah, that's what I mean.
   
   Actually, before this change, there is a lambda creation as well (see 
[here](https://github.com/apache/flink/commit/e76855e230c997fc1e1507e1a54a6a08b2cf4a31#diff-50480dff268c9d790da42d0edb430b91dff4012c861056faf076d465b6080910L86)).
 I make a simple performance test using JMH (I also attach the source code here 
[LambdaBench.java.zip](https://github.com/apache/flink/files/6621076/LambdaBench.java.zip)):
   ```
       @Benchmark
       public void lambdaGetWriter() throws Exception {
           Map.Entry<Integer, Integer> entry = this.entry;
           calling(getWriter(entry.getKey(), entry.getValue()));
       }
   
       @Benchmark
       public void lambdaPredefinedWriter() throws Exception {
           Map.Entry<Integer, Integer> entry = this.entry;
           calling((out) -> predefinedWriter.accept(entry, out));
       }
   
       @Benchmark
       public void lambdaInline() throws Exception {
           Map.Entry<Integer, Integer> entry = this.entry;
           calling((out) -> {
               write(entry.getKey(), out);
               write(entry.getValue(), out);
           });
       }
   
       private BiConsumerWithException<
               Entry<Integer, Integer>, Void, IOException> predefinedWriter =  
(entry, out) -> {
           write(entry.getKey(), out);
           write(entry.getValue(), out);
       };
   ```
   It simulate and compare the calling procedure of the lambda before and after 
the change. The result is here:
   ```
   Benchmark                            Mode  Cnt       Score       Error   
Units
   LambdaBench.lambdaGetWriter         thrpt   10  336522.176 ±  9458.113  
ops/ms
   LambdaBench.lambdaInline            thrpt   10  332052.779 ± 10503.985  
ops/ms
   LambdaBench.lambdaPredefinedWriter  thrpt   10  263978.110 ± 10960.900  
ops/ms
   ```
   The ```getWriter``` performs no worse than other ways. However, it's just a 
simulation and may be different from how it really is. We may do more 
investigation on the memory consumption, GC and performance after completing 
the ChangeLogStateBackend.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to