johnjcasey commented on code in PR #17604:
URL: https://github.com/apache/beam/pull/17604#discussion_r880541333
##########
sdks/python/apache_beam/io/avroio.py:
##########
@@ -176,20 +181,70 @@ def __init__(
name and the value being the actual data. If False, it only returns
the data.
"""
- source_from_file = partial(
+ self._source_from_file = partial(
_create_avro_source, min_bundle_size=min_bundle_size)
- self._read_all_files = filebasedsource.ReadAllFiles(
+ self._desired_bundle_size = desired_bundle_size
+ self._min_bundle_size = min_bundle_size
+ self._with_filename = with_filename
+ self.label = label
+
+ def _set_read_all_files(self):
+ """Helper function to set _read_all_files PTransform in constructor."""
+ return filebasedsource.ReadAllFiles(
True,
CompressionTypes.AUTO,
- desired_bundle_size,
- min_bundle_size,
- source_from_file,
- with_filename)
-
- self.label = label
+ self._desired_bundle_size,
+ self._min_bundle_size,
+ self._source_from_file,
+ self._with_filename)
def expand(self, pvalue):
- return pvalue | self.label >> self._read_all_files
+ return pvalue | self.label >> self._set_read_all_files()
+
+
+class ReadAllFromAvroContinuously(ReadAllFromAvro):
+ """A ``PTransform`` for reading avro files in given file patterns.
+ This PTransform acts as a Source and produces continuously a ``PCollection``
+ of strings.
+
+ For more details, see ``ReadAllFromAvro`` for avro parsing settings;
+ see ``apache_beam.io.fileio.MatchContinuously`` for watching settings.
+
+ ReadAllFromAvroContinuously is experimental. No backwards-compatibility
+ guarantees. Due to the limitation on Reshuffle, current implementation does
+ not scale.
+ """
+ def __init__(self, file_pattern, label='ReadAllFilesContinuously', **kwargs):
+ """Initialize the ``ReadAllFromAvroContinuously`` transform.
+
+ Accepts args for constructor args of both ``ReadAllFromAvro`` and
+ ``apache_beam.io.fileio.MatchContinuously``.
+ """
+ kwargs_for_match = {
+ k: v
+ for (k, v) in kwargs.items()
+ if k in filebasedsource.ReadAllFilesContinuously.ARGS_FOR_MATCH
+ }
+ kwargs_for_read = {
+ k: v
+ for (k, v) in kwargs.items()
+ if k not in filebasedsource.ReadAllFilesContinuously.ARGS_FOR_MATCH
Review Comment:
I don't think we should use not in here, because there could be a kwarg that
applies to neither that a user passes in
##########
sdks/python/apache_beam/io/avroio.py:
##########
@@ -176,20 +181,70 @@ def __init__(
name and the value being the actual data. If False, it only returns
the data.
"""
- source_from_file = partial(
+ self._source_from_file = partial(
_create_avro_source, min_bundle_size=min_bundle_size)
- self._read_all_files = filebasedsource.ReadAllFiles(
+ self._desired_bundle_size = desired_bundle_size
+ self._min_bundle_size = min_bundle_size
+ self._with_filename = with_filename
+ self.label = label
+
+ def _set_read_all_files(self):
+ """Helper function to set _read_all_files PTransform in constructor."""
+ return filebasedsource.ReadAllFiles(
True,
CompressionTypes.AUTO,
- desired_bundle_size,
- min_bundle_size,
- source_from_file,
- with_filename)
-
- self.label = label
+ self._desired_bundle_size,
+ self._min_bundle_size,
+ self._source_from_file,
+ self._with_filename)
def expand(self, pvalue):
- return pvalue | self.label >> self._read_all_files
+ return pvalue | self.label >> self._set_read_all_files()
+
+
+class ReadAllFromAvroContinuously(ReadAllFromAvro):
+ """A ``PTransform`` for reading avro files in given file patterns.
+ This PTransform acts as a Source and produces continuously a ``PCollection``
+ of strings.
+
+ For more details, see ``ReadAllFromAvro`` for avro parsing settings;
+ see ``apache_beam.io.fileio.MatchContinuously`` for watching settings.
+
+ ReadAllFromAvroContinuously is experimental. No backwards-compatibility
+ guarantees. Due to the limitation on Reshuffle, current implementation does
Review Comment:
can we link to the limitation here?
##########
sdks/python/apache_beam/io/filebasedsource.py:
##########
@@ -449,3 +458,102 @@ def expand(self, pvalue):
| 'ReadRange' >> ParDo(
_ReadRange(
self._source_from_file, with_filename=self._with_filename)))
+
+
+class ReadAllFilesContinuously(PTransform):
+ """A file source that reads files continuously.
+
+ Pipeline authors should not use this directly. This is to be used by Read
+ PTransform authors who wishes to implement file-based Read transforms that
+ read files continuously.
+
+ Unlike ``ReadAllFiles``, patterns are provided as constructor parameter at
+ the pipeline definition time.
+
+ ReadAllFilesContinuously is experimental. No backwards-compatibility
+ guarantees. Due to the limitation on Reshuffle, current implementation does
+ not scale.
+ """
+ ARGS_FOR_MATCH = {
+ 'interval',
+ 'has_deduplication',
+ 'start_timestamp',
+ 'stop_timestamp',
+ 'match_updated_files',
+ 'apply_windowing'
+ }
+
+ def __init__(self,
+ file_pattern, # type: str
+ splittable, # type: bool
+ compression_type,
+ desired_bundle_size, # type: int
+ min_bundle_size, # type: int
+ source_from_file, # type: Callable[[str], iobase.BoundedSource]
+ with_filename=False, # type: bool
+ **kwargs # parameters for MatchContinuously
+ ):
+ """
+ Args:
+ file_pattern: a file pattern to match
+ splittable: If False, files won't be split into sub-ranges. If True,
+ files may or may not be split into data ranges.
+ compression_type: A ``CompressionType`` object that specifies the
+ compression type of the files that will be processed. If
+ ``CompressionType.AUTO``, system will try to automatically
+ determine the compression type based on the extension of
+ files.
+ desired_bundle_size: the desired size of data ranges that should be
+ generated when splitting a file into data ranges.
+ min_bundle_size: minimum size of data ranges that should be generated
when
+ splitting a file into data ranges.
+ source_from_file: a function that produces a ``BoundedSource`` given a
+ file name. System will use this function to generate
+ ``BoundedSource`` objects for file paths. Note that
file
+ paths passed to this will be for individual files, not
+ for file patterns even if the ``PCollection`` of files
+ processed by the transform consist of file patterns.
+ with_filename: If True, returns a Key Value with the key being the file
+ name and the value being the actual data. If False, it only returns
+ the data.
+
+ refer to ``MatchContinuously`` for additional args including 'interval',
+ 'has_deduplication', 'start_timestamp', 'stop_timestamp',
+ 'match_updated_files'.
+ """
+ self._file_pattern = file_pattern
+ self._splittable = splittable
+ self._compression_type = compression_type
+ self._desired_bundle_size = desired_bundle_size
+ self._min_bundle_size = min_bundle_size
+ self._source_from_file = source_from_file
+ self._with_filename = with_filename
+ self._kwargs_for_match = {
+ k: v
+ for (k, v) in kwargs.items() if k in self.ARGS_FOR_MATCH
+ }
+
+ def expand(self, pbegin):
+ # imported locally to avoid circular import
+ from apache_beam.io.fileio import MatchContinuously
Review Comment:
Is this convention, or is there another way to do this? I'm not a python
style expert on this stuff
##########
sdks/python/apache_beam/io/textio.py:
##########
@@ -605,16 +608,65 @@ def __init__(
self._desired_bundle_size = desired_bundle_size
self._min_bundle_size = min_bundle_size
self._compression_type = compression_type
- self._read_all_files = ReadAllFiles(
+ self._with_filename = with_filename
+
+ def _set_read_all_files(self):
+ """Helper function to build a ReadAllFiles PTransform."""
+ return ReadAllFiles(
True,
- compression_type,
- desired_bundle_size,
- min_bundle_size,
- source_from_file,
- with_filename)
+ self._compression_type,
+ self._desired_bundle_size,
+ self._min_bundle_size,
+ self._source_from_file,
+ self._with_filename)
def expand(self, pvalue):
- return pvalue | 'ReadAllFiles' >> self._read_all_files
+ return pvalue | 'ReadAllFiles' >> self._set_read_all_files()
+
+
+class ReadAllFromTextContinuously(ReadAllFromText):
+ """A ``PTransform`` for reading text files in given file patterns.
+ This PTransform acts as a Source and produces continuously a ``PCollection``
+ of strings.
+
+ For more details, see ``ReadAllFromText`` for text parsing settings;
+ see ``apache_beam.io.fileio.MatchContinuously`` for watching settings.
+
+ ReadAllFromTextContinuously is experimental. No backwards-compatibility
+ guarantees. Due to the limitation on Reshuffle, current implementation does
+ not scale.
+ """
+ def __init__(self, file_pattern, **kwargs):
+ """Initialize the ``ReadAllFromTextContinuously`` transform.
+
+ Accepts args for constructor args of both ``ReadAllFromText`` and
+ ``apache_beam.io.fileio.MatchContinuously``.
+ """
+ kwargs_for_match = {
+ k: v
+ for (k, v) in kwargs.items()
+ if k in ReadAllFilesContinuously.ARGS_FOR_MATCH
+ }
+ kwargs_for_read = {
+ k: v
+ for (k, v) in kwargs.items()
+ if k not in ReadAllFilesContinuously.ARGS_FOR_MATCH
Review Comment:
same comment as above
##########
sdks/python/apache_beam/io/filebasedsource.py:
##########
@@ -338,15 +338,25 @@ def default_output_coder(self):
class _ExpandIntoRanges(DoFn):
def __init__(
- self, splittable, compression_type, desired_bundle_size,
min_bundle_size):
+ self,
+ splittable,
+ compression_type,
+ desired_bundle_size,
+ min_bundle_size,
+ do_match=True):
self._desired_bundle_size = desired_bundle_size
self._min_bundle_size = min_bundle_size
self._splittable = splittable
self._compression_type = compression_type
+ self._do_match = do_match
def process(self, element, *args, **kwargs):
- match_results = FileSystems.match([element])
- for metadata in match_results[0].metadata_list:
+ if self._do_match:
Review Comment:
why are we splitting doing matching or not here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]