[ 
https://issues.apache.org/jira/browse/BEAM-10111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashwin Ramaswami updated BEAM-10111:
------------------------------------
    Description: 
It would be good to be able to read from / write to archive files (.zip, .tar) 
using fileio. The difference between this proposal and what we already have 
with CompressionTypes is that this would allow converting one file -> multiple 
files and vice versa. Here's how it might look like:

*Reading all contents from archive files:*

{code:python}
    files = (
        p
        | fileio.MatchFiles('hdfs://path/to/*.zip')
        | fileio.Extract()
        | fileio.MatchAll()
        | fileio.ReadMatches()
        | beam.Map(lambda x: (x.metadata.path, 
x.metadata._parent_archive_paths, x.read_utf8()))
    )
{code}

*Nested archive example:* (look for all inside of .tar inside of .zip)

{code:python}
    files = (
        p
        | fileio.MatchFiles('hdfs://path/to/*.zip')
        | fileio.Extract()
        | fileio.MatchAll('*.tar')
        | fileio.Extract()
        | fileio.MatchAll() # gets all entries
        | fileio.ReadMatches()
        | beam.Map(lambda x: (x.metadata.path, x.read_utf8()))
    )
{code}

Note that in this case, this would involve modifying MatchAll() to take an 
argument, which would filter the files in the pcollection in the earlier stage 
of the pipeline.

*Reading from archive files and explicitly specifying the archive type (when it 
can't be inferred by the file extension):*

{code:python}
    files = (
        p
        | fileio.MatchFiles('hdfs://path/to/archive')
        | fileio.Extract(, archivesystem=ArchiveSystem.TAR)
        | fileio.MatchAll(archive_path='*.txt')
        | fileio.ReadMatches()
        | beam.Map(lambda x: (x.metadata.path, x.read_utf8()))
    )
{code}

`ArchiveSystem` would be a generic class, just like `FileSystem`, which would 
allow for different implementations of methods such as `list()` and 
`extract()`. It would be implemented for .zip, .tar, etc.

*Writing multiple files to an archive file: *

{code:python}
    files = (
        p
        | fileio.MatchFiles('hdfs://path/to/files/*.txt')
        | fileio.ReadMatches()
        | fileio.Compress(archivesystem=ArchiveSystem.ZIP)
        | textio.WriteToText("output.zip")
    )
{code}

*Writing to a .tar.gz file: *

{code:python}
    files = (
        p
        | fileio.MatchFiles('hdfs://path/to/files/*.txt')
        | fileio.ReadMatches()
        | fileio.Compress(archivesystem=ArchiveSystem.TAR)
        | textio.WriteToText("output.tar.gz")
    )
{code}

  was:
It would be good to be able to read from / write to archive files (.zip, .tar) 
using fileio. The difference between this proposal and what we already have 
with CompressionTypes is that this would allow converting one file -> multiple 
files and vice versa. Here's how it might look like:

*Reading all contents from archive files:*

{code:python}
    files = (
        p
        | fileio.MatchFiles('hdfs://path/to/*.zip')
        | fileio.Extract()
        | fileio.MatchAll()
        | fileio.ReadMatches()
        | beam.Map(lambda x: (x.metadata.path, 
x.metadata._parent_archive_paths, x.read_utf8()))
    )
{code}

*Nested archive example:* (look for "`*`" inside of "`*.tar`" inside of 
"`*.zip`")

{code:python}
    files = (
        p
        | fileio.MatchFiles('hdfs://path/to/*.zip')
        | fileio.Extract()
        | fileio.MatchAll('*.tar')
        | fileio.Extract()
        | fileio.MatchAll() # gets all entries
        | fileio.ReadMatches()
        | beam.Map(lambda x: (x.metadata.path, x.read_utf8()))
    )
{code}

Note that in this case, this would involve modifying MatchAll() to take an 
argument, which would filter the files in the pcollection in the earlier stage 
of the pipeline.

*Reading from archive files and explicitly specifying the archive type (when it 
can't be inferred by the file extension):*

{code:python}
    files = (
        p
        | fileio.MatchFiles('hdfs://path/to/archive')
        | fileio.Extract(, archivesystem=ArchiveSystem.TAR)
        | fileio.MatchAll(archive_path='*.txt')
        | fileio.ReadMatches()
        | beam.Map(lambda x: (x.metadata.path, x.read_utf8()))
    )
{code}

`ArchiveSystem` would be a generic class, just like `FileSystem`, which would 
allow for different implementations of methods such as `list()` and 
`extract()`. It would be implemented for .zip, .tar, etc.

*Writing multiple files to an archive file: *

{code:python}
    files = (
        p
        | fileio.MatchFiles('hdfs://path/to/files/*.txt')
        | fileio.ReadMatches()
        | fileio.Compress(archivesystem=ArchiveSystem.ZIP)
        | textio.WriteToText("output.zip")
    )
{code}

*Writing to a .tar.gz file: *

{code:python}
    files = (
        p
        | fileio.MatchFiles('hdfs://path/to/files/*.txt')
        | fileio.ReadMatches()
        | fileio.Compress(archivesystem=ArchiveSystem.TAR)
        | textio.WriteToText("output.tar.gz")
    )
{code}


> Create methods in fileio to read from / write to archive files
> --------------------------------------------------------------
>
>                 Key: BEAM-10111
>                 URL: https://issues.apache.org/jira/browse/BEAM-10111
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-py-files
>            Reporter: Ashwin Ramaswami
>            Assignee: Ashwin Ramaswami
>            Priority: P2
>
> It would be good to be able to read from / write to archive files (.zip, 
> .tar) using fileio. The difference between this proposal and what we already 
> have with CompressionTypes is that this would allow converting one file -> 
> multiple files and vice versa. Here's how it might look like:
> *Reading all contents from archive files:*
> {code:python}
>     files = (
>         p
>         | fileio.MatchFiles('hdfs://path/to/*.zip')
>         | fileio.Extract()
>         | fileio.MatchAll()
>         | fileio.ReadMatches()
>         | beam.Map(lambda x: (x.metadata.path, 
> x.metadata._parent_archive_paths, x.read_utf8()))
>     )
> {code}
> *Nested archive example:* (look for all inside of .tar inside of .zip)
> {code:python}
>     files = (
>         p
>         | fileio.MatchFiles('hdfs://path/to/*.zip')
>         | fileio.Extract()
>         | fileio.MatchAll('*.tar')
>         | fileio.Extract()
>         | fileio.MatchAll() # gets all entries
>         | fileio.ReadMatches()
>         | beam.Map(lambda x: (x.metadata.path, x.read_utf8()))
>     )
> {code}
> Note that in this case, this would involve modifying MatchAll() to take an 
> argument, which would filter the files in the pcollection in the earlier 
> stage of the pipeline.
> *Reading from archive files and explicitly specifying the archive type (when 
> it can't be inferred by the file extension):*
> {code:python}
>     files = (
>         p
>         | fileio.MatchFiles('hdfs://path/to/archive')
>         | fileio.Extract(, archivesystem=ArchiveSystem.TAR)
>         | fileio.MatchAll(archive_path='*.txt')
>         | fileio.ReadMatches()
>         | beam.Map(lambda x: (x.metadata.path, x.read_utf8()))
>     )
> {code}
> `ArchiveSystem` would be a generic class, just like `FileSystem`, which would 
> allow for different implementations of methods such as `list()` and 
> `extract()`. It would be implemented for .zip, .tar, etc.
> *Writing multiple files to an archive file: *
> {code:python}
>     files = (
>         p
>         | fileio.MatchFiles('hdfs://path/to/files/*.txt')
>         | fileio.ReadMatches()
>         | fileio.Compress(archivesystem=ArchiveSystem.ZIP)
>         | textio.WriteToText("output.zip")
>     )
> {code}
> *Writing to a .tar.gz file: *
> {code:python}
>     files = (
>         p
>         | fileio.MatchFiles('hdfs://path/to/files/*.txt')
>         | fileio.ReadMatches()
>         | fileio.Compress(archivesystem=ArchiveSystem.TAR)
>         | textio.WriteToText("output.tar.gz")
>     )
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to