AnandInguva commented on code in PR #25393:
URL: https://github.com/apache/beam/pull/25393#discussion_r1101891603
##########
sdks/python/apache_beam/ml/inference/utils.py:
##########
@@ -46,3 +61,101 @@ def _convert_to_result(
y in zip(batch, predictions_per_tensor)
]
return [PredictionResult(x, y, model_id) for x, y in zip(batch, predictions)]
+
+
+class _ConvertIterToSingleton(beam.DoFn):
+ """
+ Internal only; No backwards compatibility.
+
+ The MatchContinuously transform examines all files present in a given
+ directory and returns those that have timestamps older than the
+ pipeline's start time. This can produce an Iterable rather than a
+ Singleton. This class only returns the file path when it is first
+ encountered, and it is cached as part of the side input caching mechanism.
+ If the path is seen again, it will not return anything.
+ By doing this, we can ensure that the output of this transform can be wrapped
+ with beam.pvalue.AsSingleton().
+ """
+ COUNT_STATE = CombiningValueStateSpec('count', combine_fn=sum)
+
+ def process(self, element, count_state=beam.DoFn.StateParam(COUNT_STATE)):
+ counter = count_state.read()
+ if counter == 0:
+ count_state.add(1)
+ yield element[1]
+
+
+class _GetLatestFileByTimeStamp(beam.DoFn):
+ """
+ Internal only; No backwards compatibility.
+
+ This DoFn checks the timestamps of files against the time that the pipeline
+ began running. It returns the files that were modified after the pipeline
+ started. If no such files are found, it returns a default file as fallback.
+ """
+ TIME_STATE = CombiningValueStateSpec(
+ 'count', combine_fn=partial(max, default=_START_TIME_STAMP))
+
+ def process(
+ self, element, time_state=beam.DoFn.StateParam(TIME_STATE)
+ ) -> List[Tuple[str, ModelMetdata]]:
+ _, file_metadata = element
+ new_ts = file_metadata.last_updated_in_seconds
+ old_ts = time_state.read()
+ if new_ts > old_ts:
+ # time_state.clear()
+ time_state.add(new_ts)
+ model_path = file_metadata.path
+ else:
+ model_path = ''
+
+ model_name = os.path.splitext(os.path.basename(model_path))[0]
+ return [
+ (model_path, ModelMetdata(model_id=model_path, model_name=model_name))
+ ]
+
+
+class WatchFilePattern(beam.PTransform):
+ def __init__(
+ self,
+ file_pattern,
+ interval=360,
+ stop_timestamp=MAX_TIMESTAMP,
+ ):
+ """
+ Watches a directory for updates to files matching a given file pattern.
Review Comment:
Updated doc string
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]