lostluck commented on code in PR #25160:
URL: https://github.com/apache/beam/pull/25160#discussion_r1089557996
##########
sdks/go/pkg/beam/io/mongodbio/read.go:
##########
@@ -442,53 +266,53 @@ func (fn *readFn) ProcessElement(
}()
for cursor.Next(ctx) {
- value, err := decodeDocument(cursor, fn.Type.T)
+ id, value, err := decodeDocument(cursor, fn.Type.T)
if err != nil {
return err
}
- emit(value)
- }
-
- return cursor.Err()
-}
+ result := cursorResult{nextID: id}
+ if !rt.TryClaim(result) {
+ return cursor.Err()
+ }
-func mergeFilters(idFilter bson.M, customFilter bson.M) bson.M {
- if len(idFilter) == 0 {
- return customFilter
+ emit(value)
}
- if len(customFilter) == 0 {
- return idFilter
- }
+ result := cursorResult{isExhausted: true}
+ rt.TryClaim(result)
Review Comment:
OK seeing everything in context, I agree 100% this is the correct move
because the tracker is managing itself around the external cursor, and can't do
the more elegant approach. Ideally, the "last" call to TryClaim that returns
true will automatically ensure subsequent claim calls return false.
"This is the last position. I'll return true, but set my state as stopped
for future iterations".
##########
sdks/go/pkg/beam/io/mongodbio/read.go:
##########
@@ -442,53 +266,53 @@ func (fn *readFn) ProcessElement(
}()
for cursor.Next(ctx) {
- value, err := decodeDocument(cursor, fn.Type.T)
+ id, value, err := decodeDocument(cursor, fn.Type.T)
if err != nil {
return err
}
- emit(value)
- }
-
- return cursor.Err()
-}
+ result := cursorResult{nextID: id}
+ if !rt.TryClaim(result) {
+ return cursor.Err()
+ }
Review Comment:
I was going to comment that this should be *before* the work of decoding,
but since we appear to not know which ID we're claiming until after decoding,
this is fine.
Essentially, work should happen after a successful claim, but we're stuck
with the API's we're given, and working non-ideal code is better than
non-existent code that does what we need.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]