[ 
https://issues.apache.org/jira/browse/BEAM-11995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17311797#comment-17311797
 ] 

Boyuan Zhang commented on BEAM-11995:
-------------------------------------

h3.  Goal
Current 
[FileIO|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java]
 and 
[TextIO|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java]
 reads 
[FiledBasedSource|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java]
 via 
[ReadAllViaFileBasedSource|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java].
 We want to turn ReadAllViaFileBasedSource into SDF implementation to gain 
benefits of dynamic split.

h3. Details
[ReadAllViaFileBasedSource|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java]
 is a composite transform which expands into "Split into ranges" -> "Reshuffle" 
-> "Read ranges": 
[code|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java#L63-L67].
 The "Read ranges" still uses 
[BoundedSource|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java]
 and 
[BoundedReader|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java#L108]
 APIs to read from file. When converting ReadAllViaFileBasedSource transform 
into SDF implementation, we can still use BoundedSource and BoundedReader APIs 
to read context since these have got hooked up with multiple kind of files: 
[query|https://github.com/apache/beam/search?q=ReadAllViaFileBasedSource]. 

Another option is to build SDF read for every FileBasedSource: 
[AvroSource|https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java],
 
[CompressedSource|https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java],
 
[TextSource|https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java]
 , 
[TFRecordSource|https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java#L488].
 The SDF should take a ReableFile as input and emit file context. Then we can 
replace all ReadAllViaFileBasedSource with this SDF implementation.

h3. Code examples
* Build SDF based on BoundedSource and BoundedReader API: 
[BoundedSourceAsSDFWrapperFn|https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L261]
* Build SDF to replace BoundedSource/BoundedReader: 
[ParquetIO|https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java#L716]

> Implement FileIO/TextIO on top of Splittable DoFn
> -------------------------------------------------
>
>                 Key: BEAM-11995
>                 URL: https://issues.apache.org/jira/browse/BEAM-11995
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-files
>            Reporter: Boyuan Zhang
>            Priority: P2
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to