damccorm commented on code in PR #17267:
URL: https://github.com/apache/beam/pull/17267#discussion_r848331113


##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -327,6 +338,10 @@ func (n *ProcessSizedElementsAndRestrictions) 
ProcessElement(_ context.Context,
                }
        }
 
+       if n.cweInv != nil {
+               n.PDo.we = n.cweInv.Invoke()

Review Comment:
   It actually does - watermark estimation is done at the element/restriction 
level. This will become more important when we're taking in watermark state 
which is created while giving the user element/restriction access



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -664,6 +686,21 @@ func (n *ProcessSizedElementsAndRestrictions) GetInputId() 
string {
        return indexToInputId(0)
 }
 
+// GetOutputWatermark gets the current output watermark of the splittable unit
+// if one is defined, or returns nil otherwise.
+func (n *ProcessSizedElementsAndRestrictions) GetOutputWatermark() 
map[string]*timestamppb.Timestamp {
+       if n.PDo.we != nil {
+               ow := timestamppb.New(n.PDo.we.CurrentWatermark())
+               owMap := make(map[string]*timestamppb.Timestamp)
+               for _, out := range n.outputs {
+                       owMap[out] = ow

Review Comment:
   I can't speak to why this decision was originally made (though I imagine it 
makes things significantly easier for the runner since input watermark is an 
important concept and := `min(all incoming output watermarks)`). It is worth 
noting that we're not actually setting _every_ output to this value, we're just 
setting the outputs of this transform to this value. At the runner level, I 
believe this is unioned with other output watermarks that may exist and this 
map is used to make triggering/windowing decisions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to