[
https://issues.apache.org/jira/browse/BEAM-14484?focusedWorklogId=772931&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-772931
]
ASF GitHub Bot logged work on BEAM-14484:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 20/May/22 17:17
Start Date: 20/May/22 17:17
Worklog Time Spent: 10m
Work Description: jrmccluskey commented on code in PR #17724:
URL: https://github.com/apache/beam/pull/17724#discussion_r878373219
##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -647,6 +653,46 @@ func (n *ProcessSizedElementsAndRestrictions) Split(f
float64) ([]*FullValue, []
return p, r, nil
}
+// Checkpoint splits the remaining work in a restriction into residuals to be
resumed
+// later by the runner. This is done iff the underlying Splittable DoFn
returns a resuming
+// ProcessContinuation. If the split occurs and the primary restriction is
marked as done
+// my the RTracker, the Checkpoint fails as this is a potential data-loss case.
+func (n *ProcessSizedElementsAndRestrictions) Checkpoint() ([]*FullValue,
error) {
+ // Get the watermark state immediately so that we don't overestimate
our current watermark.
+ var pWeState interface{}
+ var rWeState interface{}
+ rWeState = n.wesInv.Invoke(n.PDo.we)
+ pWeState = rWeState
+ // If we've processed elements, the initial watermark estimator state
will be set.
+ // In that case we should hold the output watermark at that initial
state so that we don't
+ // Advance past where the current elements are holding the watermark
+ if n.initWeS != nil {
+ pWeState = n.initWeS
+ }
Review Comment:
Routing through split was the better pattern for sure, so we can avoid this
Issue Time Tracking
-------------------
Worklog Id: (was: 772931)
Time Spent: 7h (was: 6h 50m)
> Improve error message surrounding primary returns in the self-checkpointing
> code
> --------------------------------------------------------------------------------
>
> Key: BEAM-14484
> URL: https://issues.apache.org/jira/browse/BEAM-14484
> Project: Beam
> Issue Type: Improvement
> Components: sdk-go
> Reporter: Jack McCluskey
> Assignee: Jack McCluskey
> Priority: P1
> Time Spent: 7h
> Remaining Estimate: 0h
>
> The error message in the Go SDK harness around returned primaries in the
> self-checkpointing code
> ([https://github.com/apache/beam/blob/ea1f292e9cf31fc8c4803b10d811f0d3ee184ae7/sdks/go/pkg/beam/core/runtime/exec/datasource.go#L375)]
> is unclear and should be made more explicit. It should also guide the user
> towards making sure that the restriction behaves properly in the
> self-checkpointing case.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)