je-ik commented on pull request #13592:
URL: https://github.com/apache/beam/pull/13592#issuecomment-749735258


   > I was thinking about using `Coder#structuralValue` but I'm concerning 
about the additional overhead from encoding. The cacheKey is created very 
frequently(at least twice per element) and it's not cheap for coder to encode a 
value.
   
   That should not be a problem as in most cases, processing a single `element` 
in this case means emitting quite many elements downstream. The cost of the 
structuralValue should be pretty much amortized I would say. We could do 
something to calculate it only once per element?
   > 
   > As you mentioned, a DoFn instance could process multiple sources 
especially the source allows initial split(and we cannot assume that 
CheckpointMark contains the source info, although in most case it does), that's 
why I decided to use `UnboundedSourceRestriction` to locate a reader. 
DirectRunner is using timers(InMemoryTimerInternals) and states(in memory as 
well) to reschedule checkpoints. It should be a reference instead of a deep 
copy?
   
   I don't think we can use a reference either, because the UnboundedSource is 
Serializable and any runner is free to clone it (which is what DirectRunner 
does, afaik). The best solution would seem to be to mark each initially split 
(via @SplitRestriction) restriction with unique ID and then transfer this ID to 
all residual restrictions. There should be always be at most one "active" 
(either currently being processed or having non-null residual) restriction, so 
we could use that for identifying the reader without referencing Source (which 
might be problematic, as implementing hashCode and/or equals for 
UnboundedSource will not be a common practice, I'm afraid).
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to