Eugene Kirpichov created BEAM-521:
-------------------------------------

             Summary: Execute some file-based reads via composite transform 
instead of custom source
                 Key: BEAM-521
                 URL: https://issues.apache.org/jira/browse/BEAM-521
             Project: Beam
          Issue Type: Improvement
            Reporter: Eugene Kirpichov


The BoundedSource API is intended for cases where the source can provide 
meaningfull progress, dynamic splitting and size estimation. E.g. it's a good 
fit for processing a moderate number of large files, or a key-value table.

However, existing runners have scalability limitations on how many bundles a 
BoundedSource can split into, and this leads to it being a very poor fit for 
the case of processing many small files: the source ends up splitting in a too 
large number of bundles (at least 1 per file) overwhelming the runner.

This is a frequent use case, and the power of BoundedSource API is not needed 
in this case: small files don't need to be dynamically split, progress 
estimation is not needed, and size estimation is a "nice-to-have" but not 
entirely necessary.

In this case, it'd be better to execute the read not as a raw 
Read.from(BoundedSource) executed natively by the runner, but as a 
ParDo(splitIntoBundles) + fusion break + ParDo(read each bundle). That way the 
bundles end up as a simple PCollection with no scalability limitations, and 
most likely much smaller per-bundle overhead.

Implementation options:
- The BoundedSource API could provide a hint method telling Read.from() to 
expand in this way
- Individual connectors, such as TextIO.Read, could switch between expanding 
into Read.from() or into this composite transform depending on parameters (e.g. 
TextIO.Read.withCompressionType(GZ) would always expand into the composite 
transform, because for compressed files BoundedSource API is unnecessary)
- Something else?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to