chamikaramj commented on a change in pull request #14439:
URL: https://github.com/apache/beam/pull/14439#discussion_r609010361
##########
File path: sdks/python/apache_beam/io/iobase.py
##########
@@ -1499,33 +1499,36 @@ def source(self):
return self._source_bundle.source
def try_split(self, fraction_of_remainder):
- consumed_fraction = self.range_tracker().fraction_consumed()
- fraction = (
- consumed_fraction + (1 - consumed_fraction) * fraction_of_remainder)
- position = self.range_tracker().position_at_fraction(fraction)
- # Need to stash current stop_pos before splitting since
- # range_tracker.split will update its stop_pos if splits
- # successfully.
- stop_pos = self._source_bundle.stop_position
- split_result = self.range_tracker().try_split(position)
- if split_result:
- split_pos, split_fraction = split_result
- primary_weight = self._source_bundle.weight * split_fraction
- residual_weight = self._source_bundle.weight - primary_weight
- # Update self to primary weight and end position.
- self._source_bundle = SourceBundle(
- primary_weight,
- self._source_bundle.source,
- self._source_bundle.start_position,
- split_pos)
- return (
- self,
- _SDFBoundedSourceRestriction(
- SourceBundle(
- residual_weight,
- self._source_bundle.source,
- split_pos,
- stop_pos)))
+ try:
+ consumed_fraction = self.range_tracker().fraction_consumed()
+ fraction = (
+ consumed_fraction + (1 - consumed_fraction) * fraction_of_remainder)
+ position = self.range_tracker().position_at_fraction(fraction)
+ # Need to stash current stop_pos before splitting since
+ # range_tracker.split will update its stop_pos if splits
+ # successfully.
+ stop_pos = self._source_bundle.stop_position
+ split_result = self.range_tracker().try_split(position)
+ if split_result:
+ split_pos, split_fraction = split_result
+ primary_weight = self._source_bundle.weight * split_fraction
+ residual_weight = self._source_bundle.weight - primary_weight
+ # Update self to primary weight and end position.
+ self._source_bundle = SourceBundle(
+ primary_weight,
+ self._source_bundle.source,
+ self._source_bundle.start_position,
+ split_pos)
+ return (
+ self,
+ _SDFBoundedSourceRestriction(
+ SourceBundle(
+ residual_weight,
+ self._source_bundle.source,
+ split_pos,
+ stop_pos)))
+ except Exception:
+ return None
Review comment:
Add a comment that mentions that state of this object would not change
when returning "None" here (probably we should also add pydocs that mention
this).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]