Ban Piao created BEAM-9346:
------------------------------
Summary: TFRecordIO inefficient read from sideinput causing
pipeline to be slow
Key: BEAM-9346
URL: https://issues.apache.org/jira/browse/BEAM-9346
Project: Beam
Issue Type: Improvement
Components: sdk-java-core
Reporter: Ban Piao
In TFRecordIO, Reify.viewInGlobalWindow(input.apply(View.asList()),
ListCoder.of(resultCoder)) is an inefficient way of reading large set of side
input.
Pipeline can be sped up significantly by combinging the PCollection<ResultT> to
a single element PCollection<List<ResultT>>.
Sample code:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L412
from
```
return input
.getPipeline()
.apply(Reify.viewInGlobalWindow(input.apply(View.asList()),
ListCoder.of(resultCoder)));
```
to
```
return input.apply("ToList", Combine.globally(new ToListCombineFn<>()));
```
where ToListCombineFn is defined as
```
public static class ToListCombineFn<ResultT> extends CombineFn<ResultT,
List<ResultT>, List<ResultT>> {
@Override
public List<ResultT> createAccumulator() {
return new ArrayList<>();
}
@Override
public List<ResultT> addInput(List<ResultT> mutableAccumulator, ResultT
input) {
mutableAccumulator.add(input);
return mutableAccumulator;
}
@Override
public List<ResultT> mergeAccumulators(Iterable<List<ResultT>>
accumulators) {
Iterator<List<ResultT>> iter = accumulators.iterator();
if (!iter.hasNext()) {
return new ArrayList<>();
}
List<ResultT> merged = iter.next();
while (iter.hasNext()) {
merged.addAll(iter.next());
}
return merged;
}
@Override
public List<ResultT> extractOutput(List<ResultT> accumulator) {
return accumulator;
}
}
```
--
This message was sent by Atlassian Jira
(v8.3.4#803005)