[
https://issues.apache.org/jira/browse/BEAM-11104?focusedWorklogId=778204&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-778204
]
ASF GitHub Bot logged work on BEAM-11104:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 03/Jun/22 16:59
Start Date: 03/Jun/22 16:59
Worklog Time Spent: 10m
Work Description: damccorm commented on code in PR #17956:
URL: https://github.com/apache/beam/pull/17956#discussion_r889155710
##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -6422,7 +6422,26 @@ resource utilization.
{{< /highlight >}}
{{< highlight go >}}
-This is not supported yet, see BEAM-11104.
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit
func(Record)) sdf.ProcessContinuation {
Review Comment:
Can we put this in the snippets folder (example below in Watermark
estimation section)? I know we haven't been clean on that before, but it:
(a) makes sure that the code actually compiles
(b) makes it easier to reuse (e.g. I know Dataflow has docs that use
snippets from Beam)
##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -6422,7 +6422,26 @@ resource utilization.
{{< /highlight >}}
{{< highlight go >}}
-This is not supported yet, see BEAM-11104.
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit
func(Record)) sdf.ProcessContinuation {
+ position := rt.GetRestriction().(offsetrange.Restriction).Start
+ for {
+ records, err := fn.ExternalService.readNextRecords(position)
+ if err == fn.ExternalService.ThrottlingErr {
+ return sdf.ResumeProcessingIn(60 * time.Seconds)
+ }
+ if len(records) == 0 {
+ return sdf.ResumeProcessingIn(10 * time.Seconds)
Review Comment:
Maybe add a comment along the lines of `// Wait for data to be available`?
Might be nice to have a similar comment for the throttling case and the finish
execution case as well.
##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -6422,7 +6422,26 @@ resource utilization.
{{< /highlight >}}
{{< highlight go >}}
-This is not supported yet, see BEAM-11104.
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit
func(Record)) sdf.ProcessContinuation {
Review Comment:
Could you return an `err` parameter as well (it can just return nil)?
Something I realized w/ Bundle Finalization is that its much more helpful if we
provide the parameters that surround the one we are demonstrating because it
allows users to see the ordering we require.
Side note unrelated to this PR: We probably need better ordering error
messages, they are pretty confusing right now.
##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -6422,7 +6422,26 @@ resource utilization.
{{< /highlight >}}
{{< highlight go >}}
-This is not supported yet, see BEAM-11104.
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit
func(Record)) sdf.ProcessContinuation {
Review Comment:
I'm actually also curious about what process continuation we should return
when we return an err response actually - is it nil? Might be worth including
that as an option if for example, `records, err :=
fn.ExternalService.readNextRecords(position)` returns a non-nil, non-throttling
error respone
Issue Time Tracking
-------------------
Worklog Id: (was: 778204)
Time Spent: 27h 10m (was: 27h)
> [Go SDK] DoFn Self Checkpointing
> --------------------------------
>
> Key: BEAM-11104
> URL: https://issues.apache.org/jira/browse/BEAM-11104
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-go
> Reporter: Robert Burke
> Assignee: Jack McCluskey
> Priority: P3
> Fix For: 2.40.0
>
> Time Spent: 27h 10m
> Remaining Estimate: 0h
>
> Allow SplittableDoFns to self checkpoint.
> Design doc:
> [https://docs.google.com/document/d/1_JbzjY9JR07ZK5v7PcZevUfzHPsqwzfV7W6AouNpMPk/edit?usp=sharing]
>
> Feature is written E2E and users will be able to return ProcessContinuations
> from SDFs as of 2.39.0 but the full behavior has not been fully validated. An
> integration test that validates self-checkpointing is working as-intended
> will need to be written and passing before the feature is no longer
> considered experimental and this ticket is marked as resolved.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)