Eugene Kirpichov created BEAM-521:
-------------------------------------
Summary: Execute some file-based reads via composite transform
instead of custom source
Key: BEAM-521
URL: https://issues.apache.org/jira/browse/BEAM-521
Project: Beam
Issue Type: Improvement
Reporter: Eugene Kirpichov
The BoundedSource API is intended for cases where the source can provide
meaningfull progress, dynamic splitting and size estimation. E.g. it's a good
fit for processing a moderate number of large files, or a key-value table.
However, existing runners have scalability limitations on how many bundles a
BoundedSource can split into, and this leads to it being a very poor fit for
the case of processing many small files: the source ends up splitting in a too
large number of bundles (at least 1 per file) overwhelming the runner.
This is a frequent use case, and the power of BoundedSource API is not needed
in this case: small files don't need to be dynamically split, progress
estimation is not needed, and size estimation is a "nice-to-have" but not
entirely necessary.
In this case, it'd be better to execute the read not as a raw
Read.from(BoundedSource) executed natively by the runner, but as a
ParDo(splitIntoBundles) + fusion break + ParDo(read each bundle). That way the
bundles end up as a simple PCollection with no scalability limitations, and
most likely much smaller per-bundle overhead.
Implementation options:
- The BoundedSource API could provide a hint method telling Read.from() to
expand in this way
- Individual connectors, such as TextIO.Read, could switch between expanding
into Read.from() or into this composite transform depending on parameters (e.g.
TextIO.Read.withCompressionType(GZ) would always expand into the composite
transform, because for compressed files BoundedSource API is unnecessary)
- Something else?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)