[ 
https://issues.apache.org/jira/browse/BEAM-9346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Szuberski resolved BEAM-9346.
-----------------------------------
    Fix Version/s: Not applicable
       Resolution: Fixed

> TFRecordIO inefficient read from sideinput causing pipeline to be slow
> ----------------------------------------------------------------------
>
>                 Key: BEAM-9346
>                 URL: https://issues.apache.org/jira/browse/BEAM-9346
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core
>            Reporter: Ban Piao
>            Assignee: Piotr Szuberski
>            Priority: Major
>              Labels: dataflow, easyfix, performance
>             Fix For: Not applicable
>
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> In TFRecordIO, Reify.viewInGlobalWindow(input.apply(View.asList()), 
> ListCoder.of(resultCoder)) is an inefficient way of reading large set of side 
> input.
> Pipeline can be sped up significantly by combinging the PCollection<ResultT> 
> to a single element PCollection<List<ResultT>>.
> Sample code: 
>  
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L412
>  from
> ```
> return input
>             .getPipeline()
>             .apply(Reify.viewInGlobalWindow(input.apply(View.asList()), 
> ListCoder.of(resultCoder)));
> ```
> to
> ```
> return input.apply("ToList", Combine.globally(new ToListCombineFn<>()));
> ```
> where ToListCombineFn is defined as
> ```
> public static class ToListCombineFn<ResultT> extends CombineFn<ResultT, 
> List<ResultT>, List<ResultT>> {
>     @Override
>     public List<ResultT> createAccumulator() {
>       return new ArrayList<>();
>     }
>     @Override
>     public List<ResultT> addInput(List<ResultT> mutableAccumulator, ResultT 
> input) {
>       mutableAccumulator.add(input);
>       return mutableAccumulator;
>     }
>     @Override
>     public List<ResultT> mergeAccumulators(Iterable<List<ResultT>> 
> accumulators) {
>       Iterator<List<ResultT>> iter = accumulators.iterator();
>       if (!iter.hasNext()) {
>         return new ArrayList<>();
>       }
>       List<ResultT> merged = iter.next();
>       while (iter.hasNext()) {
>         merged.addAll(iter.next());
>       }
>       return merged;
>     }
>     @Override
>     public List<ResultT> extractOutput(List<ResultT> accumulator) {
>       return accumulator;
>     }
>   }
> ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to