[ 
https://issues.apache.org/jira/browse/BEAM-11104?focusedWorklogId=778204&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-778204
 ]

ASF GitHub Bot logged work on BEAM-11104:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Jun/22 16:59
            Start Date: 03/Jun/22 16:59
    Worklog Time Spent: 10m 
      Work Description: damccorm commented on code in PR #17956:
URL: https://github.com/apache/beam/pull/17956#discussion_r889155710


##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -6422,7 +6422,26 @@ resource utilization.
 {{< /highlight >}}
 
 {{< highlight go >}}
-This is not supported yet, see BEAM-11104.
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit 
func(Record)) sdf.ProcessContinuation {

Review Comment:
   Can we put this in the snippets folder (example below in Watermark 
estimation section)? I know we haven't been clean on that before, but it:
   
   (a) makes sure that the code actually compiles
   (b) makes it easier to reuse (e.g. I know Dataflow has docs that use 
snippets from Beam)



##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -6422,7 +6422,26 @@ resource utilization.
 {{< /highlight >}}
 
 {{< highlight go >}}
-This is not supported yet, see BEAM-11104.
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit 
func(Record)) sdf.ProcessContinuation {
+  position := rt.GetRestriction().(offsetrange.Restriction).Start
+  for {
+    records, err := fn.ExternalService.readNextRecords(position)
+    if err == fn.ExternalService.ThrottlingErr {
+      return sdf.ResumeProcessingIn(60 * time.Seconds)
+    }
+    if len(records) == 0 {
+      return sdf.ResumeProcessingIn(10 * time.Seconds)

Review Comment:
   Maybe add a comment along the lines of `// Wait for data to be available`? 
Might be nice to have a similar comment for the throttling case and the finish 
execution case as well.



##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -6422,7 +6422,26 @@ resource utilization.
 {{< /highlight >}}
 
 {{< highlight go >}}
-This is not supported yet, see BEAM-11104.
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit 
func(Record)) sdf.ProcessContinuation {

Review Comment:
   Could you return an `err` parameter as well (it can just return nil)? 
Something I realized w/ Bundle Finalization is that its much more helpful if we 
provide the parameters that surround the one we are demonstrating because it 
allows users to see the ordering we require.
   
   Side note unrelated to this PR: We probably need better ordering error 
messages, they are pretty confusing right now.



##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -6422,7 +6422,26 @@ resource utilization.
 {{< /highlight >}}
 
 {{< highlight go >}}
-This is not supported yet, see BEAM-11104.
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit 
func(Record)) sdf.ProcessContinuation {

Review Comment:
   I'm actually also curious about what process continuation we should return 
when we return an err response actually - is it nil? Might be worth including 
that as an option if for example, `records, err := 
fn.ExternalService.readNextRecords(position)` returns a non-nil, non-throttling 
error respone





Issue Time Tracking
-------------------

    Worklog Id:     (was: 778204)
    Time Spent: 27h 10m  (was: 27h)

> [Go SDK] DoFn Self Checkpointing
> --------------------------------
>
>                 Key: BEAM-11104
>                 URL: https://issues.apache.org/jira/browse/BEAM-11104
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Robert Burke
>            Assignee: Jack McCluskey
>            Priority: P3
>             Fix For: 2.40.0
>
>          Time Spent: 27h 10m
>  Remaining Estimate: 0h
>
> Allow SplittableDoFns to self checkpoint.
> Design doc: 
> [https://docs.google.com/document/d/1_JbzjY9JR07ZK5v7PcZevUfzHPsqwzfV7W6AouNpMPk/edit?usp=sharing]
>  
> Feature is written E2E and users will be able to return ProcessContinuations 
> from SDFs as of 2.39.0 but the full behavior has not been fully validated. An 
> integration test that validates self-checkpointing is working as-intended 
> will need to be written and passing before the feature is no longer 
> considered experimental and this ticket is marked as resolved.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to