[ 
https://issues.apache.org/jira/browse/BEAM-9935?focusedWorklogId=432837&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-432837
 ]

ASF GitHub Bot logged work on BEAM-9935:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/May/20 21:16
            Start Date: 13/May/20 21:16
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on a change in pull request #11653:
URL: https://github.com/apache/beam/pull/11653#discussion_r424735747



##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -237,30 +235,72 @@ def try_split(self, fraction_of_remainder, 
total_buffer_size):
           current_element_progress = (
               current_element_progress_object.fraction_completed)
       # Now figure out where to split.
-      # The units here (except for keep_of_element_remainder) are all in
-      # terms of number of (possibly fractional) elements.
-      remainder = total_buffer_size - self.index - current_element_progress
-      keep = remainder * fraction_of_remainder
-      if current_element_progress < 1:
-        keep_of_element_remainder = keep / (1 - current_element_progress)
-        # If it's less than what's left of the current element,
-        # try splitting at the current element.
-        if keep_of_element_remainder < 1:
-          split = self.receivers[0].try_split(
-              keep_of_element_remainder
-          )  # type: Optional[Tuple[operations.SdfSplitResultsPrimary, 
operations.SdfSplitResultsResidual]]
-          if split:
-            element_primary, element_residual = split
-            self.stop = self.index + 1
-            return self.index - 1, element_primary, element_residual, self.stop
-      # Otherwise, split at the closest element boundary.
-      # pylint: disable=round-builtin
-      stop_index = (
-          self.index + max(1, int(round(current_element_progress + keep))))
-      if stop_index < self.stop:
-        self.stop = stop_index
-        return self.stop - 1, None, None, self.stop
-    return None
+      split = self._compute_split(
+          self.index,
+          current_element_progress,
+          self.stop,
+          fraction_of_remainder,
+          total_buffer_size,
+          allowed_split_points,
+          self.receivers[0].try_split)
+      if split:
+        self.stop = split[-1]
+      return split
+
+  @staticmethod
+  def _compute_split(
+      index,
+      current_element_progress,
+      stop,
+      fraction_of_remainder,
+      total_buffer_size,

Review comment:
       The part I missed was that this is provided by the runner as part of the 
DesiredSplit, rather than something computed by the SDK itself. I didn't see 
that change. This all makes sense. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 432837)
    Time Spent: 0.5h  (was: 20m)

> Resolve differences in allowed_split_point implementations
> ----------------------------------------------------------
>
>                 Key: BEAM-9935
>                 URL: https://issues.apache.org/jira/browse/BEAM-9935
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-go, sdk-java-harness, sdk-py-harness
>            Reporter: Luke Cwik
>            Assignee: Robert Bradshaw
>            Priority: Blocker
>             Fix For: 2.22.0
>
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> [Java SDK 
> harness|https://github.com/apache/beam/blob/d82d061aa303430f3d2853f397f3130fae6200cd/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java#L223]
>  doesn't support it yet which is also safe.
> [Go SDK 
> harness|https://github.com/apache/beam/blob/d82d061aa303430f3d2853f397f3130fae6200cd/sdks/go/pkg/beam/core/runtime/exec/datasource.go#L273]
>  only supports splits if points are specified and it doesn't use the fraction 
> at all.
> [Python SDK 
> harness|https://github.com/apache/beam/blob/d82d061aa303430f3d2853f397f3130fae6200cd/sdks/python/apache_beam/runners/worker/bundle_processor.py#L947]
>  ignores the split points meaning that it may return an invalid split 
> location based upon the runners limitations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to