[
https://issues.apache.org/jira/browse/BEAM-10111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ashwin Ramaswami updated BEAM-10111:
------------------------------------
Description:
It would be good to be able to read from / write to archive files (.zip, .tar)
using fileio. The difference between this proposal and what we already have
with CompressionTypes is that this would allow converting one file -> multiple
files and vice versa. Here's how it might look like:
*Reading all contents from archive files:*
{code:python}
files = (
p
| fileio.MatchFiles('hdfs://path/to/*.zip')
| fileio.Extract()
| fileio.MatchAll()
| fileio.ReadMatches()
| beam.Map(lambda x: (x.metadata.path,
x.metadata._parent_archive_paths, x.read_utf8()))
)
{code}
*Nested archive example:* (look for all inside of .tar inside of .zip)
{code:python}
files = (
p
| fileio.MatchFiles('hdfs://path/to/*.zip')
| fileio.Extract()
| fileio.MatchAll('*.tar')
| fileio.Extract()
| fileio.MatchAll() # gets all entries
| fileio.ReadMatches()
| beam.Map(lambda x: (x.metadata.path, x.read_utf8()))
)
{code}
Note that in this case, this would involve modifying MatchAll() to take an
argument, which would filter the files in the pcollection in the earlier stage
of the pipeline.
*Reading from archive files and explicitly specifying the archive type (when it
can't be inferred by the file extension):*
{code:python}
files = (
p
| fileio.MatchFiles('hdfs://path/to/archive')
| fileio.Extract(archivesystem=ArchiveSystem.TAR)
| fileio.MatchAll(archive_path='*.txt')
| fileio.ReadMatches()
| beam.Map(lambda x: (x.metadata.path, x.read_utf8()))
)
{code}
`ArchiveSystem` would be a generic class, just like `FileSystem`, which would
allow for different implementations of methods such as `list()` and
`extract()`. It would be implemented for .zip, .tar, etc.
*Writing multiple files to an archive file: *
{code:python}
files = (
p
| fileio.MatchFiles('hdfs://path/to/files/*.txt')
| fileio.ReadMatches()
| fileio.Compress(archivesystem=ArchiveSystem.ZIP)
| textio.WriteToText("output.zip")
)
{code}
*Writing to a .tar.gz file: *
{code:python}
files = (
p
| fileio.MatchFiles('hdfs://path/to/files/*.txt')
| fileio.ReadMatches()
| fileio.Compress(archivesystem=ArchiveSystem.TAR)
| textio.WriteToText("output.tar.gz")
)
{code}
was:
It would be good to be able to read from / write to archive files (.zip, .tar)
using fileio. The difference between this proposal and what we already have
with CompressionTypes is that this would allow converting one file -> multiple
files and vice versa. Here's how it might look like:
*Reading all contents from archive files:*
{code:python}
files = (
p
| fileio.MatchFiles('hdfs://path/to/*.zip')
| fileio.Extract()
| fileio.MatchAll()
| fileio.ReadMatches()
| beam.Map(lambda x: (x.metadata.path,
x.metadata._parent_archive_paths, x.read_utf8()))
)
{code}
*Nested archive example:* (look for all inside of .tar inside of .zip)
{code:python}
files = (
p
| fileio.MatchFiles('hdfs://path/to/*.zip')
| fileio.Extract()
| fileio.MatchAll('*.tar')
| fileio.Extract()
| fileio.MatchAll() # gets all entries
| fileio.ReadMatches()
| beam.Map(lambda x: (x.metadata.path, x.read_utf8()))
)
{code}
Note that in this case, this would involve modifying MatchAll() to take an
argument, which would filter the files in the pcollection in the earlier stage
of the pipeline.
*Reading from archive files and explicitly specifying the archive type (when it
can't be inferred by the file extension):*
{code:python}
files = (
p
| fileio.MatchFiles('hdfs://path/to/archive')
| fileio.Extract(, archivesystem=ArchiveSystem.TAR)
| fileio.MatchAll(archive_path='*.txt')
| fileio.ReadMatches()
| beam.Map(lambda x: (x.metadata.path, x.read_utf8()))
)
{code}
`ArchiveSystem` would be a generic class, just like `FileSystem`, which would
allow for different implementations of methods such as `list()` and
`extract()`. It would be implemented for .zip, .tar, etc.
*Writing multiple files to an archive file: *
{code:python}
files = (
p
| fileio.MatchFiles('hdfs://path/to/files/*.txt')
| fileio.ReadMatches()
| fileio.Compress(archivesystem=ArchiveSystem.ZIP)
| textio.WriteToText("output.zip")
)
{code}
*Writing to a .tar.gz file: *
{code:python}
files = (
p
| fileio.MatchFiles('hdfs://path/to/files/*.txt')
| fileio.ReadMatches()
| fileio.Compress(archivesystem=ArchiveSystem.TAR)
| textio.WriteToText("output.tar.gz")
)
{code}
> Create methods in fileio to read from / write to archive files
> --------------------------------------------------------------
>
> Key: BEAM-10111
> URL: https://issues.apache.org/jira/browse/BEAM-10111
> Project: Beam
> Issue Type: Improvement
> Components: io-py-files
> Reporter: Ashwin Ramaswami
> Assignee: Ashwin Ramaswami
> Priority: P2
>
> It would be good to be able to read from / write to archive files (.zip,
> .tar) using fileio. The difference between this proposal and what we already
> have with CompressionTypes is that this would allow converting one file ->
> multiple files and vice versa. Here's how it might look like:
> *Reading all contents from archive files:*
> {code:python}
> files = (
> p
> | fileio.MatchFiles('hdfs://path/to/*.zip')
> | fileio.Extract()
> | fileio.MatchAll()
> | fileio.ReadMatches()
> | beam.Map(lambda x: (x.metadata.path,
> x.metadata._parent_archive_paths, x.read_utf8()))
> )
> {code}
> *Nested archive example:* (look for all inside of .tar inside of .zip)
> {code:python}
> files = (
> p
> | fileio.MatchFiles('hdfs://path/to/*.zip')
> | fileio.Extract()
> | fileio.MatchAll('*.tar')
> | fileio.Extract()
> | fileio.MatchAll() # gets all entries
> | fileio.ReadMatches()
> | beam.Map(lambda x: (x.metadata.path, x.read_utf8()))
> )
> {code}
> Note that in this case, this would involve modifying MatchAll() to take an
> argument, which would filter the files in the pcollection in the earlier
> stage of the pipeline.
> *Reading from archive files and explicitly specifying the archive type (when
> it can't be inferred by the file extension):*
> {code:python}
> files = (
> p
> | fileio.MatchFiles('hdfs://path/to/archive')
> | fileio.Extract(archivesystem=ArchiveSystem.TAR)
> | fileio.MatchAll(archive_path='*.txt')
> | fileio.ReadMatches()
> | beam.Map(lambda x: (x.metadata.path, x.read_utf8()))
> )
> {code}
> `ArchiveSystem` would be a generic class, just like `FileSystem`, which would
> allow for different implementations of methods such as `list()` and
> `extract()`. It would be implemented for .zip, .tar, etc.
> *Writing multiple files to an archive file: *
> {code:python}
> files = (
> p
> | fileio.MatchFiles('hdfs://path/to/files/*.txt')
> | fileio.ReadMatches()
> | fileio.Compress(archivesystem=ArchiveSystem.ZIP)
> | textio.WriteToText("output.zip")
> )
> {code}
> *Writing to a .tar.gz file: *
> {code:python}
> files = (
> p
> | fileio.MatchFiles('hdfs://path/to/files/*.txt')
> | fileio.ReadMatches()
> | fileio.Compress(archivesystem=ArchiveSystem.TAR)
> | textio.WriteToText("output.tar.gz")
> )
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)