damccorm commented on code in PR #17956:
URL: https://github.com/apache/beam/pull/17956#discussion_r889155710
##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -6422,7 +6422,26 @@ resource utilization.
{{< /highlight >}}
{{< highlight go >}}
-This is not supported yet, see BEAM-11104.
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit
func(Record)) sdf.ProcessContinuation {
Review Comment:
Can we put this in the snippets folder (example below in Watermark
estimation section)? I know we haven't been clean on that before, but it:
(a) makes sure that the code actually compiles
(b) makes it easier to reuse (e.g. I know Dataflow has docs that use
snippets from Beam)
##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -6422,7 +6422,26 @@ resource utilization.
{{< /highlight >}}
{{< highlight go >}}
-This is not supported yet, see BEAM-11104.
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit
func(Record)) sdf.ProcessContinuation {
+ position := rt.GetRestriction().(offsetrange.Restriction).Start
+ for {
+ records, err := fn.ExternalService.readNextRecords(position)
+ if err == fn.ExternalService.ThrottlingErr {
+ return sdf.ResumeProcessingIn(60 * time.Seconds)
+ }
+ if len(records) == 0 {
+ return sdf.ResumeProcessingIn(10 * time.Seconds)
Review Comment:
Maybe add a comment along the lines of `// Wait for data to be available`?
Might be nice to have a similar comment for the throttling case and the finish
execution case as well.
##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -6422,7 +6422,26 @@ resource utilization.
{{< /highlight >}}
{{< highlight go >}}
-This is not supported yet, see BEAM-11104.
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit
func(Record)) sdf.ProcessContinuation {
Review Comment:
Could you return an `err` parameter as well (it can just return nil)?
Something I realized w/ Bundle Finalization is that its much more helpful if we
provide the parameters that surround the one we are demonstrating because it
allows users to see the ordering we require.
Side note unrelated to this PR: We probably need better ordering error
messages, they are pretty confusing right now.
##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -6422,7 +6422,26 @@ resource utilization.
{{< /highlight >}}
{{< highlight go >}}
-This is not supported yet, see BEAM-11104.
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit
func(Record)) sdf.ProcessContinuation {
Review Comment:
I'm actually also curious about what process continuation we should return
when we return an err response actually - is it nil? Might be worth including
that as an option if for example, `records, err :=
fn.ExternalService.readNextRecords(position)` returns a non-nil, non-throttling
error respone
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]