je-ik commented on pull request #13592: URL: https://github.com/apache/beam/pull/13592#issuecomment-749735258
> I was thinking about using `Coder#structuralValue` but I'm concerning about the additional overhead from encoding. The cacheKey is created very frequently(at least twice per element) and it's not cheap for coder to encode a value. That should not be a problem as in most cases, processing a single `element` in this case means emitting quite many elements downstream. The cost of the structuralValue should be pretty much amortized I would say. We could do something to calculate it only once per element? > > As you mentioned, a DoFn instance could process multiple sources especially the source allows initial split(and we cannot assume that CheckpointMark contains the source info, although in most case it does), that's why I decided to use `UnboundedSourceRestriction` to locate a reader. DirectRunner is using timers(InMemoryTimerInternals) and states(in memory as well) to reschedule checkpoints. It should be a reference instead of a deep copy? I don't think we can use a reference either, because the UnboundedSource is Serializable and any runner is free to clone it (which is what DirectRunner does, afaik). The best solution would seem to be to mark each initially split (via @SplitRestriction) restriction with unique ID and then transfer this ID to all residual restrictions. There should be always be at most one "active" (either currently being processed or having non-null residual) restriction, so we could use that for identifying the reader without referencing Source (which might be problematic, as implementing hashCode and/or equals for UnboundedSource will not be a common practice, I'm afraid). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
