Abacn commented on code in PR #17604:
URL: https://github.com/apache/beam/pull/17604#discussion_r881978749
##########
sdks/python/apache_beam/io/filebasedsource.py:
##########
@@ -449,3 +458,102 @@ def expand(self, pvalue):
| 'ReadRange' >> ParDo(
_ReadRange(
self._source_from_file, with_filename=self._with_filename)))
+
+
+class ReadAllFilesContinuously(PTransform):
+ """A file source that reads files continuously.
+
+ Pipeline authors should not use this directly. This is to be used by Read
+ PTransform authors who wishes to implement file-based Read transforms that
+ read files continuously.
+
+ Unlike ``ReadAllFiles``, patterns are provided as constructor parameter at
+ the pipeline definition time.
+
+ ReadAllFilesContinuously is experimental. No backwards-compatibility
+ guarantees. Due to the limitation on Reshuffle, current implementation does
+ not scale.
+ """
+ ARGS_FOR_MATCH = {
+ 'interval',
+ 'has_deduplication',
+ 'start_timestamp',
+ 'stop_timestamp',
+ 'match_updated_files',
+ 'apply_windowing'
+ }
+
+ def __init__(self,
+ file_pattern, # type: str
+ splittable, # type: bool
+ compression_type,
+ desired_bundle_size, # type: int
+ min_bundle_size, # type: int
+ source_from_file, # type: Callable[[str], iobase.BoundedSource]
+ with_filename=False, # type: bool
+ **kwargs # parameters for MatchContinuously
+ ):
+ """
+ Args:
+ file_pattern: a file pattern to match
+ splittable: If False, files won't be split into sub-ranges. If True,
+ files may or may not be split into data ranges.
+ compression_type: A ``CompressionType`` object that specifies the
+ compression type of the files that will be processed. If
+ ``CompressionType.AUTO``, system will try to automatically
+ determine the compression type based on the extension of
+ files.
+ desired_bundle_size: the desired size of data ranges that should be
+ generated when splitting a file into data ranges.
+ min_bundle_size: minimum size of data ranges that should be generated
when
+ splitting a file into data ranges.
+ source_from_file: a function that produces a ``BoundedSource`` given a
+ file name. System will use this function to generate
+ ``BoundedSource`` objects for file paths. Note that
file
+ paths passed to this will be for individual files, not
+ for file patterns even if the ``PCollection`` of files
+ processed by the transform consist of file patterns.
+ with_filename: If True, returns a Key Value with the key being the file
+ name and the value being the actual data. If False, it only returns
+ the data.
+
+ refer to ``MatchContinuously`` for additional args including 'interval',
+ 'has_deduplication', 'start_timestamp', 'stop_timestamp',
+ 'match_updated_files'.
+ """
+ self._file_pattern = file_pattern
+ self._splittable = splittable
+ self._compression_type = compression_type
+ self._desired_bundle_size = desired_bundle_size
+ self._min_bundle_size = min_bundle_size
+ self._source_from_file = source_from_file
+ self._with_filename = with_filename
+ self._kwargs_for_match = {
+ k: v
+ for (k, v) in kwargs.items() if k in self.ARGS_FOR_MATCH
+ }
+
+ def expand(self, pbegin):
+ # imported locally to avoid circular import
+ from apache_beam.io.fileio import MatchContinuously
Review Comment:
This pattern appears in a couple of places throughout the sdk. I understand
that this may indicate a violation of hierarchy (like a lower-leveled sdk is
importing a higher-leveled one) and ReadAllFilesContinuously should not belong
to filebasedsource.py.. Will fix.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]