boyuanzz commented on a change in pull request #12016:
URL: https://github.com/apache/beam/pull/12016#discussion_r454089322
##########
File path: sdks/python/apache_beam/runners/worker/operations.py
##########
@@ -743,11 +743,36 @@ def pcollection_count_monitoring_infos(self,
tag_to_pcollection_id):
return infos
+class SdfTruncateSizedRestrictions(DoOperation):
+ def __init__(self, *args, **kwargs):
+ super(SdfTruncateSizedRestrictions, self).__init__(*args, **kwargs)
+ self.sdf_process_op = None
+
+ def current_element_progress(self):
+ # type: () -> Optional[iobase.RestrictionProgress]
+ return self.sdf_process_op.current_element_progress()
+
+ def try_split(self, fraction_of_remainder): # type: (...) -> Optional[Any]
+ result = self.sdf_process_op.try_split(fraction_of_remainder)
+ if result is not None:
+ return result
+ return None
+
+ def add_receiver(self, operation, output_index=0):
Review comment:
I did this for performance concerns. The `receivers` in python is a map
of <output_index, list of receivers>. It's not efficient to get only one
element from a map of <output_index, list of receivers> every time when
`try_split` and `progress` are called. We can cache the receiver either when
the first time `try_split` or `progress` is called, or when receivers are added.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]