This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ee62cbe  Merge pull request #14252 from Add types to FileMatching
ee62cbe is described below

commit ee62cbe821bc7a770e5e500de37edda90b8d0261
Author: Andreas Bergmeier <[email protected]>
AuthorDate: Wed Mar 17 07:42:49 2021 +0100

    Merge pull request #14252 from Add types to FileMatching
    
    * Add types to FileMatching
    
    Makes usage with static typing (pyright, mypy) easier and code assists 
easier.
    
    * Limit line length
    
    * Try to fix syntax
    
    * Increase indentation
    
    * Import more typing
    
    * Import Union
    
    * Need to switch order of Readable File
    
    * fixes
---
 sdks/python/apache_beam/io/fileio.py | 64 +++++++++++++++++++++---------------
 1 file changed, 38 insertions(+), 26 deletions(-)

diff --git a/sdks/python/apache_beam/io/fileio.py 
b/sdks/python/apache_beam/io/fileio.py
index 1100991..fa13b03 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -100,7 +100,10 @@ from typing import BinaryIO  # pylint: 
disable=unused-import
 from typing import Callable
 from typing import DefaultDict
 from typing import Dict
+from typing import Iterable
+from typing import List
 from typing import Tuple
+from typing import Union
 
 from past.builtins import unicode
 
@@ -154,7 +157,7 @@ class _MatchAllFn(beam.DoFn):
   def __init__(self, empty_match_treatment):
     self._empty_match_treatment = empty_match_treatment
 
-  def process(self, file_pattern):
+  def process(self, file_pattern: str) -> List[filesystem.FileMetadata]:
     # TODO: Should we batch the lookups?
     match_results = filesystems.FileSystems.match([file_pattern])
     match_result = match_results[0]
@@ -175,12 +178,12 @@ class MatchFiles(beam.PTransform):
   of ``FileMetadata`` objects."""
   def __init__(
       self,
-      file_pattern,
+      file_pattern: str,
       empty_match_treatment=EmptyMatchTreatment.ALLOW_IF_WILDCARD):
     self._file_pattern = file_pattern
     self._empty_match_treatment = empty_match_treatment
 
-  def expand(self, pcoll):
+  def expand(self, pcoll) -> beam.PCollection[filesystem.FileMetadata]:
     return pcoll.pipeline | beam.Create([self._file_pattern]) | MatchAll()
 
 
@@ -192,16 +195,42 @@ class MatchAll(beam.PTransform):
   def __init__(self, empty_match_treatment=EmptyMatchTreatment.ALLOW):
     self._empty_match_treatment = empty_match_treatment
 
-  def expand(self, pcoll):
+  def expand(
+      self,
+      pcoll: beam.PCollection,
+  ) -> beam.PCollection[filesystem.FileMetadata]:
     return pcoll | beam.ParDo(_MatchAllFn(self._empty_match_treatment))
 
 
+class ReadableFile(object):
+  """A utility class for accessing files."""
+  def __init__(self, metadata, compression=None):
+    self.metadata = metadata
+    self._compression = compression
+
+  def open(self, mime_type='text/plain', compression_type=None):
+    compression = (
+        compression_type or self._compression or
+        filesystems.CompressionTypes.AUTO)
+    return filesystems.FileSystems.open(
+        self.metadata.path, mime_type=mime_type, compression_type=compression)
+
+  def read(self, mime_type='application/octet-stream'):
+    return self.open(mime_type).read()
+
+  def read_utf8(self):
+    return self.open().read().decode('utf-8')
+
+
 class _ReadMatchesFn(beam.DoFn):
   def __init__(self, compression, skip_directories):
     self._compression = compression
     self._skip_directories = skip_directories
 
-  def process(self, file_metadata):
+  def process(
+      self,
+      file_metadata: Union[str, filesystem.FileMetadata],
+  ) -> Iterable[ReadableFile]:
     metadata = (
         filesystem.FileMetadata(file_metadata, 0) if isinstance(
             file_metadata, (str, unicode)) else file_metadata)
@@ -218,26 +247,6 @@ class _ReadMatchesFn(beam.DoFn):
     yield ReadableFile(metadata, self._compression)
 
 
-class ReadableFile(object):
-  """A utility class for accessing files."""
-  def __init__(self, metadata, compression=None):
-    self.metadata = metadata
-    self._compression = compression
-
-  def open(self, mime_type='text/plain', compression_type=None):
-    compression = (
-        compression_type or self._compression or
-        filesystems.CompressionTypes.AUTO)
-    return filesystems.FileSystems.open(
-        self.metadata.path, mime_type=mime_type, compression_type=compression)
-
-  def read(self, mime_type='application/octet-stream'):
-    return self.open(mime_type).read()
-
-  def read_utf8(self):
-    return self.open().read().decode('utf-8')
-
-
 class ReadMatches(beam.PTransform):
   """Converts each result of MatchFiles() or MatchAll() to a ReadableFile.
 
@@ -246,7 +255,10 @@ class ReadMatches(beam.PTransform):
     self._compression = compression
     self._skip_directories = skip_directories
 
-  def expand(self, pcoll):
+  def expand(
+      self,
+      pcoll: beam.PCollection[Union[str, filesystem.FileMetadata]],
+  ) -> beam.PCollection[ReadableFile]:
     return pcoll | beam.ParDo(
         _ReadMatchesFn(self._compression, self._skip_directories))
 

Reply via email to