damccorm commented on code in PR #17956:
URL: https://github.com/apache/beam/pull/17956#discussion_r889155710


##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -6422,7 +6422,26 @@ resource utilization.
 {{< /highlight >}}
 
 {{< highlight go >}}
-This is not supported yet, see BEAM-11104.
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit 
func(Record)) sdf.ProcessContinuation {

Review Comment:
   Can we put this in the snippets folder (example below in Watermark 
estimation section)? I know we haven't been clean on that before, but it:
   
   (a) makes sure that the code actually compiles
   (b) makes it easier to reuse (e.g. I know Dataflow has docs that use 
snippets from Beam)



##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -6422,7 +6422,26 @@ resource utilization.
 {{< /highlight >}}
 
 {{< highlight go >}}
-This is not supported yet, see BEAM-11104.
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit 
func(Record)) sdf.ProcessContinuation {
+  position := rt.GetRestriction().(offsetrange.Restriction).Start
+  for {
+    records, err := fn.ExternalService.readNextRecords(position)
+    if err == fn.ExternalService.ThrottlingErr {
+      return sdf.ResumeProcessingIn(60 * time.Seconds)
+    }
+    if len(records) == 0 {
+      return sdf.ResumeProcessingIn(10 * time.Seconds)

Review Comment:
   Maybe add a comment along the lines of `// Wait for data to be available`? 
Might be nice to have a similar comment for the throttling case and the finish 
execution case as well.



##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -6422,7 +6422,26 @@ resource utilization.
 {{< /highlight >}}
 
 {{< highlight go >}}
-This is not supported yet, see BEAM-11104.
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit 
func(Record)) sdf.ProcessContinuation {

Review Comment:
   Could you return an `err` parameter as well (it can just return nil)? 
Something I realized w/ Bundle Finalization is that its much more helpful if we 
provide the parameters that surround the one we are demonstrating because it 
allows users to see the ordering we require.
   
   Side note unrelated to this PR: We probably need better ordering error 
messages, they are pretty confusing right now.



##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -6422,7 +6422,26 @@ resource utilization.
 {{< /highlight >}}
 
 {{< highlight go >}}
-This is not supported yet, see BEAM-11104.
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit 
func(Record)) sdf.ProcessContinuation {

Review Comment:
   I'm actually also curious about what process continuation we should return 
when we return an err response actually - is it nil? Might be worth including 
that as an option if for example, `records, err := 
fn.ExternalService.readNextRecords(position)` returns a non-nil, non-throttling 
error respone



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to