[
https://issues.apache.org/jira/browse/BEAM-11105?focusedWorklogId=755703&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-755703
]
ASF GitHub Bot logged work on BEAM-11105:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 12/Apr/22 11:34
Start Date: 12/Apr/22 11:34
Worklog Time Spent: 10m
Work Description: damccorm commented on code in PR #17267:
URL: https://github.com/apache/beam/pull/17267#discussion_r848331113
##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -327,6 +338,10 @@ func (n *ProcessSizedElementsAndRestrictions)
ProcessElement(_ context.Context,
}
}
+ if n.cweInv != nil {
+ n.PDo.we = n.cweInv.Invoke()
Review Comment:
It actually does - watermark estimation is done at the element/restriction
level. This will become more important when we're taking in watermark state
which is created while giving the user element/restriction access
##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -664,6 +686,21 @@ func (n *ProcessSizedElementsAndRestrictions) GetInputId()
string {
return indexToInputId(0)
}
+// GetOutputWatermark gets the current output watermark of the splittable unit
+// if one is defined, or returns nil otherwise.
+func (n *ProcessSizedElementsAndRestrictions) GetOutputWatermark()
map[string]*timestamppb.Timestamp {
+ if n.PDo.we != nil {
+ ow := timestamppb.New(n.PDo.we.CurrentWatermark())
+ owMap := make(map[string]*timestamppb.Timestamp)
+ for _, out := range n.outputs {
+ owMap[out] = ow
Review Comment:
I can't speak to why this decision was originally made (though I imagine it
makes things significantly easier for the runner since input watermark is an
important concept and := `min(all incoming output watermarks)`). It is worth
noting that we're not actually setting _every_ output to this value, we're just
setting the outputs of this transform to this value. At the runner level, I
believe this is unioned with other output watermarks that may exist and this
map is used to make triggering/windowing decisions
Issue Time Tracking
-------------------
Worklog Id: (was: 755703)
Time Spent: 1h 50m (was: 1h 40m)
> [Go SDK] Watermark Estimation
> -----------------------------
>
> Key: BEAM-11105
> URL: https://issues.apache.org/jira/browse/BEAM-11105
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-go
> Reporter: Robert Burke
> Assignee: Danny McCormick
> Priority: P3
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> Allow self checkpointing SplittableDoFns to specify a watermark estimator.
> (To be updated once [https://github.com/apache/beam/pull/13160] is merged and
> the programming guide updated with SDF content.)
--
This message was sent by Atlassian Jira
(v8.20.1#820001)