[
https://issues.apache.org/jira/browse/BEAM-9346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Piotr Szuberski resolved BEAM-9346.
-----------------------------------
Fix Version/s: Not applicable
Resolution: Fixed
> TFRecordIO inefficient read from sideinput causing pipeline to be slow
> ----------------------------------------------------------------------
>
> Key: BEAM-9346
> URL: https://issues.apache.org/jira/browse/BEAM-9346
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-core
> Reporter: Ban Piao
> Assignee: Piotr Szuberski
> Priority: Major
> Labels: dataflow, easyfix, performance
> Fix For: Not applicable
>
> Time Spent: 2h 20m
> Remaining Estimate: 0h
>
> In TFRecordIO, Reify.viewInGlobalWindow(input.apply(View.asList()),
> ListCoder.of(resultCoder)) is an inefficient way of reading large set of side
> input.
> Pipeline can be sped up significantly by combinging the PCollection<ResultT>
> to a single element PCollection<List<ResultT>>.
> Sample code:
>
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L412
> from
> ```
> return input
> .getPipeline()
> .apply(Reify.viewInGlobalWindow(input.apply(View.asList()),
> ListCoder.of(resultCoder)));
> ```
> to
> ```
> return input.apply("ToList", Combine.globally(new ToListCombineFn<>()));
> ```
> where ToListCombineFn is defined as
> ```
> public static class ToListCombineFn<ResultT> extends CombineFn<ResultT,
> List<ResultT>, List<ResultT>> {
> @Override
> public List<ResultT> createAccumulator() {
> return new ArrayList<>();
> }
> @Override
> public List<ResultT> addInput(List<ResultT> mutableAccumulator, ResultT
> input) {
> mutableAccumulator.add(input);
> return mutableAccumulator;
> }
> @Override
> public List<ResultT> mergeAccumulators(Iterable<List<ResultT>>
> accumulators) {
> Iterator<List<ResultT>> iter = accumulators.iterator();
> if (!iter.hasNext()) {
> return new ArrayList<>();
> }
> List<ResultT> merged = iter.next();
> while (iter.hasNext()) {
> merged.addAll(iter.next());
> }
> return merged;
> }
> @Override
> public List<ResultT> extractOutput(List<ResultT> accumulator) {
> return accumulator;
> }
> }
> ```
--
This message was sent by Atlassian Jira
(v8.3.4#803005)