[ 
https://issues.apache.org/jira/browse/BEAM-11105?focusedWorklogId=755703&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-755703
 ]

ASF GitHub Bot logged work on BEAM-11105:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Apr/22 11:34
            Start Date: 12/Apr/22 11:34
    Worklog Time Spent: 10m 
      Work Description: damccorm commented on code in PR #17267:
URL: https://github.com/apache/beam/pull/17267#discussion_r848331113


##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -327,6 +338,10 @@ func (n *ProcessSizedElementsAndRestrictions) 
ProcessElement(_ context.Context,
                }
        }
 
+       if n.cweInv != nil {
+               n.PDo.we = n.cweInv.Invoke()

Review Comment:
   It actually does - watermark estimation is done at the element/restriction 
level. This will become more important when we're taking in watermark state 
which is created while giving the user element/restriction access



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -664,6 +686,21 @@ func (n *ProcessSizedElementsAndRestrictions) GetInputId() 
string {
        return indexToInputId(0)
 }
 
+// GetOutputWatermark gets the current output watermark of the splittable unit
+// if one is defined, or returns nil otherwise.
+func (n *ProcessSizedElementsAndRestrictions) GetOutputWatermark() 
map[string]*timestamppb.Timestamp {
+       if n.PDo.we != nil {
+               ow := timestamppb.New(n.PDo.we.CurrentWatermark())
+               owMap := make(map[string]*timestamppb.Timestamp)
+               for _, out := range n.outputs {
+                       owMap[out] = ow

Review Comment:
   I can't speak to why this decision was originally made (though I imagine it 
makes things significantly easier for the runner since input watermark is an 
important concept and := `min(all incoming output watermarks)`). It is worth 
noting that we're not actually setting _every_ output to this value, we're just 
setting the outputs of this transform to this value. At the runner level, I 
believe this is unioned with other output watermarks that may exist and this 
map is used to make triggering/windowing decisions





Issue Time Tracking
-------------------

    Worklog Id:     (was: 755703)
    Time Spent: 1h 50m  (was: 1h 40m)

> [Go SDK] Watermark Estimation
> -----------------------------
>
>                 Key: BEAM-11105
>                 URL: https://issues.apache.org/jira/browse/BEAM-11105
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Robert Burke
>            Assignee: Danny McCormick
>            Priority: P3
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Allow self checkpointing SplittableDoFns to specify a watermark estimator.
> (To be updated once [https://github.com/apache/beam/pull/13160] is merged and 
> the programming guide updated with SDF content.)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to