lukecwik commented on a change in pull request #12016: URL: https://github.com/apache/beam/pull/12016#discussion_r448096760
########## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py ########## @@ -91,7 +91,8 @@ def __init__( bundle_repeat=0, use_state_iterables=False, provision_info=None, # type: Optional[ExtendedProvisionInfo] - progress_request_frequency=None): + progress_request_frequency=None, + is_drain = False): Review comment: ```suggestion is_drain=False): ``` ########## File path: sdks/python/apache_beam/runners/portability/portable_runner_test.py ########## @@ -242,6 +242,15 @@ def process(self, kv, index=beam.DoFn.StateParam(index_state_spec)): # Inherits all other tests from fn_api_runner_test.FnApiRunnerTest + def test_sdf_default_truncate_when_bounded(self): + raise unittest.SkipTest("Portable runners don't support drain now") + + def test_sdf_default_truncate_when_unbounded(self): + raise unittest.SkipTest("Portable runners don't support drain now") + + def test_sdf_with_truncate(self): + raise unittest.SkipTest("Portable runners don't support drain now") Review comment: ```suggestion raise unittest.SkipTest("Portable runners don't yet support drain") def test_sdf_default_truncate_when_unbounded(self): raise unittest.SkipTest("Portable runners don't yet support drain") def test_sdf_with_truncate(self): raise unittest.SkipTest("Portable runners don't yet support drain") ``` ########## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py ########## @@ -1613,6 +1672,40 @@ def restriction_size(self, element, restriction): return restriction.size() +class SimpleUnboundedOffsetRangeRestrictionTracker( Review comment: ```suggestion class UnboundedOffsetRestrictionTracker( ``` ########## File path: sdks/python/apache_beam/io/iobase.py ########## @@ -1243,6 +1243,19 @@ def try_claim(self, position): """ raise NotImplementedError + def is_bounded(self): + """Identify whether the output produced by the current restriction is + bounded. + + The value is important for the default behavior of truncate when the + pipeline starts to drain. If the current restriction is + bounded, it will be processed completely by default. If the restriction is + unbounded, it will be truncated into null and finish processing immediately. + + The API is required to be implemented. Review comment: ```suggestion """Returns whether the amount of work represented by the current restriction is bounded. The boundedness of the restriction is used to determine the default behavior of how to truncate restrictions when a pipeline is being `drained <https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8/edit#>`_. If the restriction is bounded, then the entire restriction will be processed otherwise the restriction will be processed till a checkpoint is possible. The API is required to be implemented. Returns: ``True`` if the restriction represents a finite amount of work. Otherwise, returns ``False``. ``` I think what I suggested for the link is correct based upon https://devguide.python.org/documenting/#external-links ########## File path: sdks/python/apache_beam/transforms/core.py ########## @@ -320,6 +320,19 @@ def split_and_size(self, element, restriction): for part in self.split(element, restriction): yield part, self.restriction_size(element, part) + def truncate(self, element, restriction): + """Truncate the given restriction into finite amount of work when the + pipeline starts to drain. + + By default, if the restriction is bounded, it will return the entire + restriction. If the restriction is unbounded, it will not return anything. + + It's recommended to implement this API if more granularity is required. Review comment: ```suggestion """Truncates the provided restriction into a restriction representing a finite amount of work when the pipeline is `draining <https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8/edit# for additional details about drain.>_`. By default, if the restriction is bounded then the restriction will be returned otherwise None will be returned. This API is optional and should only be implemented if more granularity is required. Return a truncated finite restriction if further processing is required otherwise return None to represent that no further processing of this restriction is required. ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org