johnjcasey commented on code in PR #17604:
URL: https://github.com/apache/beam/pull/17604#discussion_r880541333


##########
sdks/python/apache_beam/io/avroio.py:
##########
@@ -176,20 +181,70 @@ def __init__(
         name and the value being the actual data. If False, it only returns
         the data.
     """
-    source_from_file = partial(
+    self._source_from_file = partial(
         _create_avro_source, min_bundle_size=min_bundle_size)
-    self._read_all_files = filebasedsource.ReadAllFiles(
+    self._desired_bundle_size = desired_bundle_size
+    self._min_bundle_size = min_bundle_size
+    self._with_filename = with_filename
+    self.label = label
+
+  def _set_read_all_files(self):
+    """Helper function to set _read_all_files PTransform in constructor."""
+    return filebasedsource.ReadAllFiles(
         True,
         CompressionTypes.AUTO,
-        desired_bundle_size,
-        min_bundle_size,
-        source_from_file,
-        with_filename)
-
-    self.label = label
+        self._desired_bundle_size,
+        self._min_bundle_size,
+        self._source_from_file,
+        self._with_filename)
 
   def expand(self, pvalue):
-    return pvalue | self.label >> self._read_all_files
+    return pvalue | self.label >> self._set_read_all_files()
+
+
+class ReadAllFromAvroContinuously(ReadAllFromAvro):
+  """A ``PTransform`` for reading avro files in given file patterns.
+  This PTransform acts as a Source and produces continuously a ``PCollection``
+  of strings.
+
+  For more details, see ``ReadAllFromAvro`` for avro parsing settings;
+  see ``apache_beam.io.fileio.MatchContinuously`` for watching settings.
+
+  ReadAllFromAvroContinuously is experimental.  No backwards-compatibility
+  guarantees. Due to the limitation on Reshuffle, current implementation does
+  not scale.
+  """
+  def __init__(self, file_pattern, label='ReadAllFilesContinuously', **kwargs):
+    """Initialize the ``ReadAllFromAvroContinuously`` transform.
+
+    Accepts args for constructor args of both ``ReadAllFromAvro`` and
+    ``apache_beam.io.fileio.MatchContinuously``.
+    """
+    kwargs_for_match = {
+        k: v
+        for (k, v) in kwargs.items()
+        if k in filebasedsource.ReadAllFilesContinuously.ARGS_FOR_MATCH
+    }
+    kwargs_for_read = {
+        k: v
+        for (k, v) in kwargs.items()
+        if k not in filebasedsource.ReadAllFilesContinuously.ARGS_FOR_MATCH

Review Comment:
   I don't think we should use not in here, because there could be a kwarg that 
applies to neither that a user passes in



##########
sdks/python/apache_beam/io/avroio.py:
##########
@@ -176,20 +181,70 @@ def __init__(
         name and the value being the actual data. If False, it only returns
         the data.
     """
-    source_from_file = partial(
+    self._source_from_file = partial(
         _create_avro_source, min_bundle_size=min_bundle_size)
-    self._read_all_files = filebasedsource.ReadAllFiles(
+    self._desired_bundle_size = desired_bundle_size
+    self._min_bundle_size = min_bundle_size
+    self._with_filename = with_filename
+    self.label = label
+
+  def _set_read_all_files(self):
+    """Helper function to set _read_all_files PTransform in constructor."""
+    return filebasedsource.ReadAllFiles(
         True,
         CompressionTypes.AUTO,
-        desired_bundle_size,
-        min_bundle_size,
-        source_from_file,
-        with_filename)
-
-    self.label = label
+        self._desired_bundle_size,
+        self._min_bundle_size,
+        self._source_from_file,
+        self._with_filename)
 
   def expand(self, pvalue):
-    return pvalue | self.label >> self._read_all_files
+    return pvalue | self.label >> self._set_read_all_files()
+
+
+class ReadAllFromAvroContinuously(ReadAllFromAvro):
+  """A ``PTransform`` for reading avro files in given file patterns.
+  This PTransform acts as a Source and produces continuously a ``PCollection``
+  of strings.
+
+  For more details, see ``ReadAllFromAvro`` for avro parsing settings;
+  see ``apache_beam.io.fileio.MatchContinuously`` for watching settings.
+
+  ReadAllFromAvroContinuously is experimental.  No backwards-compatibility
+  guarantees. Due to the limitation on Reshuffle, current implementation does

Review Comment:
   can we link to the limitation here?



##########
sdks/python/apache_beam/io/filebasedsource.py:
##########
@@ -449,3 +458,102 @@ def expand(self, pvalue):
         | 'ReadRange' >> ParDo(
             _ReadRange(
                 self._source_from_file, with_filename=self._with_filename)))
+
+
+class ReadAllFilesContinuously(PTransform):
+  """A file source that reads files continuously.
+
+  Pipeline authors should not use this directly. This is to be used by Read
+  PTransform authors who wishes to implement file-based Read transforms that
+  read files continuously.
+
+  Unlike ``ReadAllFiles``, patterns are provided as constructor parameter at
+  the pipeline definition time.
+
+  ReadAllFilesContinuously is experimental. No backwards-compatibility
+  guarantees. Due to the limitation on Reshuffle, current implementation does
+  not scale.
+  """
+  ARGS_FOR_MATCH = {
+      'interval',
+      'has_deduplication',
+      'start_timestamp',
+      'stop_timestamp',
+      'match_updated_files',
+      'apply_windowing'
+  }
+
+  def __init__(self,
+               file_pattern, # type: str
+               splittable,  # type: bool
+               compression_type,
+               desired_bundle_size,  # type: int
+               min_bundle_size,  # type: int
+               source_from_file,  # type: Callable[[str], iobase.BoundedSource]
+               with_filename=False,  # type: bool
+               **kwargs  # parameters for MatchContinuously
+              ):
+    """
+    Args:
+      file_pattern: a file pattern to match
+      splittable: If False, files won't be split into sub-ranges. If True,
+                  files may or may not be split into data ranges.
+      compression_type: A ``CompressionType`` object that specifies the
+                  compression type of the files that will be processed. If
+                  ``CompressionType.AUTO``, system will try to automatically
+                  determine the compression type based on the extension of
+                  files.
+      desired_bundle_size: the desired size of data ranges that should be
+                           generated when splitting a file into data ranges.
+      min_bundle_size: minimum size of data ranges that should be generated 
when
+                           splitting a file into data ranges.
+      source_from_file: a function that produces a ``BoundedSource`` given a
+                        file name. System will use this function to generate
+                        ``BoundedSource`` objects for file paths. Note that 
file
+                        paths passed to this will be for individual files, not
+                        for file patterns even if the ``PCollection`` of files
+                        processed by the transform consist of file patterns.
+      with_filename: If True, returns a Key Value with the key being the file
+        name and the value being the actual data. If False, it only returns
+        the data.
+
+    refer to ``MatchContinuously`` for additional args including 'interval',
+    'has_deduplication', 'start_timestamp', 'stop_timestamp',
+    'match_updated_files'.
+    """
+    self._file_pattern = file_pattern
+    self._splittable = splittable
+    self._compression_type = compression_type
+    self._desired_bundle_size = desired_bundle_size
+    self._min_bundle_size = min_bundle_size
+    self._source_from_file = source_from_file
+    self._with_filename = with_filename
+    self._kwargs_for_match = {
+        k: v
+        for (k, v) in kwargs.items() if k in self.ARGS_FOR_MATCH
+    }
+
+  def expand(self, pbegin):
+    # imported locally to avoid circular import
+    from apache_beam.io.fileio import MatchContinuously

Review Comment:
   Is this convention, or is there another way to do this? I'm not a python 
style expert on this stuff



##########
sdks/python/apache_beam/io/textio.py:
##########
@@ -605,16 +608,65 @@ def __init__(
     self._desired_bundle_size = desired_bundle_size
     self._min_bundle_size = min_bundle_size
     self._compression_type = compression_type
-    self._read_all_files = ReadAllFiles(
+    self._with_filename = with_filename
+
+  def _set_read_all_files(self):
+    """Helper function to build a ReadAllFiles PTransform."""
+    return ReadAllFiles(
         True,
-        compression_type,
-        desired_bundle_size,
-        min_bundle_size,
-        source_from_file,
-        with_filename)
+        self._compression_type,
+        self._desired_bundle_size,
+        self._min_bundle_size,
+        self._source_from_file,
+        self._with_filename)
 
   def expand(self, pvalue):
-    return pvalue | 'ReadAllFiles' >> self._read_all_files
+    return pvalue | 'ReadAllFiles' >> self._set_read_all_files()
+
+
+class ReadAllFromTextContinuously(ReadAllFromText):
+  """A ``PTransform`` for reading text files in given file patterns.
+  This PTransform acts as a Source and produces continuously a ``PCollection``
+  of strings.
+
+  For more details, see ``ReadAllFromText`` for text parsing settings;
+  see ``apache_beam.io.fileio.MatchContinuously`` for watching settings.
+
+  ReadAllFromTextContinuously is experimental.  No backwards-compatibility
+  guarantees. Due to the limitation on Reshuffle, current implementation does
+  not scale.
+  """
+  def __init__(self, file_pattern, **kwargs):
+    """Initialize the ``ReadAllFromTextContinuously`` transform.
+
+    Accepts args for constructor args of both ``ReadAllFromText`` and
+    ``apache_beam.io.fileio.MatchContinuously``.
+    """
+    kwargs_for_match = {
+        k: v
+        for (k, v) in kwargs.items()
+        if k in ReadAllFilesContinuously.ARGS_FOR_MATCH
+    }
+    kwargs_for_read = {
+        k: v
+        for (k, v) in kwargs.items()
+        if k not in ReadAllFilesContinuously.ARGS_FOR_MATCH

Review Comment:
   same comment as above



##########
sdks/python/apache_beam/io/filebasedsource.py:
##########
@@ -338,15 +338,25 @@ def default_output_coder(self):
 
 class _ExpandIntoRanges(DoFn):
   def __init__(
-      self, splittable, compression_type, desired_bundle_size, 
min_bundle_size):
+      self,
+      splittable,
+      compression_type,
+      desired_bundle_size,
+      min_bundle_size,
+      do_match=True):
     self._desired_bundle_size = desired_bundle_size
     self._min_bundle_size = min_bundle_size
     self._splittable = splittable
     self._compression_type = compression_type
+    self._do_match = do_match
 
   def process(self, element, *args, **kwargs):
-    match_results = FileSystems.match([element])
-    for metadata in match_results[0].metadata_list:
+    if self._do_match:

Review Comment:
   why are we splitting doing matching or not here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to