boyuanzz commented on a change in pull request #12016:
URL: https://github.com/apache/beam/pull/12016#discussion_r448060178



##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -320,6 +320,22 @@ def split_and_size(self, element, restriction):
     for part in self.split(element, restriction):
       yield part, self.restriction_size(element, part)
 
+  def truncate(self, element, restriction):
+    """Truncate the given restriction into finite amount of work when the
+    pipeline starts to drain.
+
+    By default, if the restriction is bounded, it will return the entire 
current
+    restriction. If the restriction is unbounded, it will return None.
+
+    The method throws NotImplementError when RestrictionTracker.is_bounded() is
+    not implemented.
+
+    It's recommended to implement this API if more granularity is required.
+    """
+    restriction_tracker = self.create_tracker(restriction)
+    if restriction_tracker.is_bounded():
+      return restriction

Review comment:
       I think we should return here since we expect `truncate` to produce only 
one truncated restriction per input restriction.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to