[ 
https://issues.apache.org/jira/browse/BEAM-11020?focusedWorklogId=499600&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-499600
 ]

ASF GitHub Bot logged work on BEAM-11020:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Oct/20 19:42
            Start Date: 12/Oct/20 19:42
    Worklog Time Spent: 10m 
      Work Description: robertwb commented on a change in pull request #13070:
URL: https://github.com/apache/beam/pull/13070#discussion_r503484424



##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -354,12 +354,12 @@ func (n *DataSource) Split(splits []int64, frac float64, 
bufSize int64) (SplitRe
                return SplitResult{PI: s - 1, RI: s}, nil
        }
        // Otherwise, perform a sub-element split.
-       p, r, err := su.Split(fr)
+       ps, rs, err := su.Split(fr)
        if err != nil {
                return SplitResult{}, err
        }
 
-       if p == nil || r == nil { // Unsuccessful split.
+       if len(ps) == 0 || len(rs) == 0 { // Unsuccessful split.

Review comment:
       Nit: nil seemed more explicit, why was this changed? (On that note, if 
one is empty must the other be empty as well, or is it OK to have one non-empty 
and treat that as an unsuccessful split?)

##########
File path: sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go
##########
@@ -180,8 +180,8 @@ func (tracker *Tracker) TrySplit(fraction float64) 
(primary, residual interface{
 
 // GetProgress reports progress based on the claimed size and unclaimed sizes 
of the restriction.
 func (tracker *Tracker) GetProgress() (done, remaining float64) {
-       done = float64(tracker.claimed - tracker.rest.Start)
-       remaining = float64(tracker.rest.End - tracker.claimed)
+       done = float64((tracker.claimed + 1) - tracker.rest.Start)

Review comment:
       Ws his an existing bug? 

##########
File path: sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go
##########
@@ -103,11 +103,11 @@ func TestDynamicSplit(t *testing.T) {
                        // with the input coder to the path.
                        // TODO(BEAM-10579) Switch to using splittable unit's 
input coder
                        // once that is implemented.
-                       p, err := decodeDynSplitElm(splitRes.split.PS, cdr)
+                       p, err := decodeDynSplitElm(splitRes.split.PS[0], cdr)

Review comment:
       Is there a convention here you could use to assert that there's only one 
element while getting it, rather than let any (unexpected?) other elements in 
the list be dropped? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 499600)
    Time Spent: 50m  (was: 40m)

> Fix multi-window SDF splitting in Go.
> -------------------------------------
>
>                 Key: BEAM-11020
>                 URL: https://issues.apache.org/jira/browse/BEAM-11020
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Daniel Oliveira
>            Assignee: Daniel Oliveira
>            Priority: P2
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> When splitting an element that's being processed on multiple windows, 
> currently the element gets split based on the current progress of the current 
> element+window pair, and the primary and residual both contain all windows 
> from the original. This is incorrect as certain windows in the residual have 
> already been processed.
> The correct way to perform this split is to put the element + all fully 
> processed windows in the primary, the element + all fully unprocessed windows 
> in the residual, and then only split the element at the current element + 
> window pair.
> This fix should already be in Java and Python, so those can be referenced.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to