lostluck commented on code in PR #25160:
URL: https://github.com/apache/beam/pull/25160#discussion_r1089492429


##########
sdks/go/pkg/beam/io/mongodbio/read.go:
##########
@@ -442,53 +266,53 @@ func (fn *readFn) ProcessElement(
        }()
 
        for cursor.Next(ctx) {
-               value, err := decodeDocument(cursor, fn.Type.T)
+               id, value, err := decodeDocument(cursor, fn.Type.T)
                if err != nil {
                        return err
                }
 
-               emit(value)
-       }
-
-       return cursor.Err()
-}
+               result := cursorResult{nextID: id}
+               if !rt.TryClaim(result) {
+                       return cursor.Err()
+               }
 
-func mergeFilters(idFilter bson.M, customFilter bson.M) bson.M {
-       if len(idFilter) == 0 {
-               return customFilter
+               emit(value)
        }
 
-       if len(customFilter) == 0 {
-               return idFilter
-       }
+       result := cursorResult{isExhausted: true}
+       rt.TryClaim(result)

Review Comment:
   I think you're right in this case. 
   
   Formally, the promise around TryClaim is "if TryClaim returns successfully, 
I will process the claimed position", so if the underlying data/job mapping, 
changes to invalidate the restriction, it's reasonable to simply discard 
previously unknown input, since it wasn't originally there to start with. 
   
   As for process continuations, while they *technically* can be used for batch 
WRT the global window, you're right that support for them is largely for 
handling streaming, which is how Dataflow interprets transforms that can return 
process continuations.  It's possible it's a bug in the Go SDKs interpretation 
however, and it's the combination of a  Process Continuation and an Unbounded 
Restriction that should lead to a streaming interpretation but that would need 
to be vetted against existing runners. The documentation around that part of 
the model for SDK developers is spotty at best unfortunately.
   
   -------
   
   WRT actual splitting behavior, I know Dataflow is currently the only runner 
that does dynamic splitting. I believe both Flink and Spark will do the initial 
splitting stages, which amounts to each element will get a split of their 
initial restriction after creating the restriction.
   
   This is typically fine. Dynamic splitting or Liquid Sharding shows most of 
it's benefits in avoiding stragglers so that all the work ends closer together, 
providing better utilization of workers, and faster completion of a job.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to