echauchot commented on a change in pull request #11055: [BEAM-9436] Improve GBK in spark structured streaming runner URL: https://github.com/apache/beam/pull/11055#discussion_r397246846
########## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java ########## @@ -65,9 +65,15 @@ public GroupAlsoByWindowViaOutputBufferFn( @Override public Iterator<WindowedValue<KV<K, Iterable<InputT>>>> call( - KV<K, Iterable<WindowedValue<InputT>>> kv) throws Exception { - K key = kv.getKey(); - Iterable<WindowedValue<InputT>> values = kv.getValue(); + K key, Iterator<WindowedValue<KV<K, InputT>>> iterator) throws Exception { + + // we have to meterialize the Iterator because ReduceFnRunner.processElements expects + // ArrayList<WindowedValue<InputT>> and not Iterator<WindowedValue<KV<K, InputT>>> Review comment: I have already taken a look. Each ReduceFnRunner instance works in parallel in the parallel groups of the flatmapGroups and works on a materialization of the elements of the group. See among other things ReduceFnRunner line 326: reduceFnRunner collects all the windows to merge them. Changing the signature of the processElement method will not change the need for materialization. The important is to avoid materializing the whole collection in one place (which we do not do). We materialize only the part of the collection for each group. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services