lostluck commented on a change in pull request #11653:
URL: https://github.com/apache/beam/pull/11653#discussion_r424735747
##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -237,30 +235,72 @@ def try_split(self, fraction_of_remainder,
total_buffer_size):
current_element_progress = (
current_element_progress_object.fraction_completed)
# Now figure out where to split.
- # The units here (except for keep_of_element_remainder) are all in
- # terms of number of (possibly fractional) elements.
- remainder = total_buffer_size - self.index - current_element_progress
- keep = remainder * fraction_of_remainder
- if current_element_progress < 1:
- keep_of_element_remainder = keep / (1 - current_element_progress)
- # If it's less than what's left of the current element,
- # try splitting at the current element.
- if keep_of_element_remainder < 1:
- split = self.receivers[0].try_split(
- keep_of_element_remainder
- ) # type: Optional[Tuple[operations.SdfSplitResultsPrimary,
operations.SdfSplitResultsResidual]]
- if split:
- element_primary, element_residual = split
- self.stop = self.index + 1
- return self.index - 1, element_primary, element_residual, self.stop
- # Otherwise, split at the closest element boundary.
- # pylint: disable=round-builtin
- stop_index = (
- self.index + max(1, int(round(current_element_progress + keep))))
- if stop_index < self.stop:
- self.stop = stop_index
- return self.stop - 1, None, None, self.stop
- return None
+ split = self._compute_split(
+ self.index,
+ current_element_progress,
+ self.stop,
+ fraction_of_remainder,
+ total_buffer_size,
+ allowed_split_points,
+ self.receivers[0].try_split)
+ if split:
+ self.stop = split[-1]
+ return split
+
+ @staticmethod
+ def _compute_split(
+ index,
+ current_element_progress,
+ stop,
+ fraction_of_remainder,
+ total_buffer_size,
Review comment:
The part I missed was that this is provided by the runner as part of the
DesiredSplit, rather than something computed by the SDK itself. I didn't see
that change. This all makes sense. Thanks!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]