This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ee62cbe Merge pull request #14252 from Add types to FileMatching
ee62cbe is described below
commit ee62cbe821bc7a770e5e500de37edda90b8d0261
Author: Andreas Bergmeier <[email protected]>
AuthorDate: Wed Mar 17 07:42:49 2021 +0100
Merge pull request #14252 from Add types to FileMatching
* Add types to FileMatching
Makes usage with static typing (pyright, mypy) easier and code assists
easier.
* Limit line length
* Try to fix syntax
* Increase indentation
* Import more typing
* Import Union
* Need to switch order of Readable File
* fixes
---
sdks/python/apache_beam/io/fileio.py | 64 +++++++++++++++++++++---------------
1 file changed, 38 insertions(+), 26 deletions(-)
diff --git a/sdks/python/apache_beam/io/fileio.py
b/sdks/python/apache_beam/io/fileio.py
index 1100991..fa13b03 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -100,7 +100,10 @@ from typing import BinaryIO # pylint:
disable=unused-import
from typing import Callable
from typing import DefaultDict
from typing import Dict
+from typing import Iterable
+from typing import List
from typing import Tuple
+from typing import Union
from past.builtins import unicode
@@ -154,7 +157,7 @@ class _MatchAllFn(beam.DoFn):
def __init__(self, empty_match_treatment):
self._empty_match_treatment = empty_match_treatment
- def process(self, file_pattern):
+ def process(self, file_pattern: str) -> List[filesystem.FileMetadata]:
# TODO: Should we batch the lookups?
match_results = filesystems.FileSystems.match([file_pattern])
match_result = match_results[0]
@@ -175,12 +178,12 @@ class MatchFiles(beam.PTransform):
of ``FileMetadata`` objects."""
def __init__(
self,
- file_pattern,
+ file_pattern: str,
empty_match_treatment=EmptyMatchTreatment.ALLOW_IF_WILDCARD):
self._file_pattern = file_pattern
self._empty_match_treatment = empty_match_treatment
- def expand(self, pcoll):
+ def expand(self, pcoll) -> beam.PCollection[filesystem.FileMetadata]:
return pcoll.pipeline | beam.Create([self._file_pattern]) | MatchAll()
@@ -192,16 +195,42 @@ class MatchAll(beam.PTransform):
def __init__(self, empty_match_treatment=EmptyMatchTreatment.ALLOW):
self._empty_match_treatment = empty_match_treatment
- def expand(self, pcoll):
+ def expand(
+ self,
+ pcoll: beam.PCollection,
+ ) -> beam.PCollection[filesystem.FileMetadata]:
return pcoll | beam.ParDo(_MatchAllFn(self._empty_match_treatment))
+class ReadableFile(object):
+ """A utility class for accessing files."""
+ def __init__(self, metadata, compression=None):
+ self.metadata = metadata
+ self._compression = compression
+
+ def open(self, mime_type='text/plain', compression_type=None):
+ compression = (
+ compression_type or self._compression or
+ filesystems.CompressionTypes.AUTO)
+ return filesystems.FileSystems.open(
+ self.metadata.path, mime_type=mime_type, compression_type=compression)
+
+ def read(self, mime_type='application/octet-stream'):
+ return self.open(mime_type).read()
+
+ def read_utf8(self):
+ return self.open().read().decode('utf-8')
+
+
class _ReadMatchesFn(beam.DoFn):
def __init__(self, compression, skip_directories):
self._compression = compression
self._skip_directories = skip_directories
- def process(self, file_metadata):
+ def process(
+ self,
+ file_metadata: Union[str, filesystem.FileMetadata],
+ ) -> Iterable[ReadableFile]:
metadata = (
filesystem.FileMetadata(file_metadata, 0) if isinstance(
file_metadata, (str, unicode)) else file_metadata)
@@ -218,26 +247,6 @@ class _ReadMatchesFn(beam.DoFn):
yield ReadableFile(metadata, self._compression)
-class ReadableFile(object):
- """A utility class for accessing files."""
- def __init__(self, metadata, compression=None):
- self.metadata = metadata
- self._compression = compression
-
- def open(self, mime_type='text/plain', compression_type=None):
- compression = (
- compression_type or self._compression or
- filesystems.CompressionTypes.AUTO)
- return filesystems.FileSystems.open(
- self.metadata.path, mime_type=mime_type, compression_type=compression)
-
- def read(self, mime_type='application/octet-stream'):
- return self.open(mime_type).read()
-
- def read_utf8(self):
- return self.open().read().decode('utf-8')
-
-
class ReadMatches(beam.PTransform):
"""Converts each result of MatchFiles() or MatchAll() to a ReadableFile.
@@ -246,7 +255,10 @@ class ReadMatches(beam.PTransform):
self._compression = compression
self._skip_directories = skip_directories
- def expand(self, pcoll):
+ def expand(
+ self,
+ pcoll: beam.PCollection[Union[str, filesystem.FileMetadata]],
+ ) -> beam.PCollection[ReadableFile]:
return pcoll | beam.ParDo(
_ReadMatchesFn(self._compression, self._skip_directories))