This is an automated email from the ASF dual-hosted git repository.

taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 7171f59  JIRA: [NEMO-275: Eager Garbage Collection for GroupByKey]
7171f59 is described below

commit 7171f59f0053c825d50ebc06d3d62da03185e015
Author: John Yang <[email protected]>
AuthorDate: Thu Nov 8 08:52:44 2018 +0900

    JIRA: [NEMO-275: Eager Garbage Collection for GroupByKey]
    
    JIRA: [NEMO-275: Eager Garbage Collection for 
GroupByKey](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-275)
    
    **Major changes:**
    - Instead of iterating over the accumulated elements and then clearing out 
all the elements at once, remove the elements while iterating
---
 .../compiler/frontend/beam/transform/GroupByKeyTransform.java  | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
index 4e8edf0..0f4cf5b 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
@@ -61,10 +61,12 @@ public final class GroupByKeyTransform<I> extends 
NoWatermarkEmitTransform<I, Wi
     if (keyToValues.isEmpty()) {
       LOG.warn("Beam GroupByKeyTransform received no data!");
     } else {
-      keyToValues.entrySet().stream().map(entry ->
-        WindowedValue.valueInGlobalWindow(KV.of(entry.getKey(), 
entry.getValue())))
-        .forEach(outputCollector::emit);
-      keyToValues.clear();
+      final Iterator<Map.Entry<Object, List>> iterator = 
keyToValues.entrySet().iterator();
+      while (iterator.hasNext()) {
+        final Map.Entry<Object, List> entry = iterator.next();
+        
outputCollector.emit(WindowedValue.valueInGlobalWindow(KV.of(entry.getKey(), 
entry.getValue())));
+        iterator.remove();
+      }
     }
   }
 

Reply via email to