nbali commented on PR #16773:
URL: https://github.com/apache/beam/pull/16773#issuecomment-1095646442

   > I looked into this, and I think the right fix is to have a separate 
`ReadBoundedFromKafkaDoFn` that indicates statically that it is bounded per 
element.
   
   As far as I understand:
   `ReadFromKafkaDoFn` is the `DoFn` we are talking about that has 
`@UnboundedPerElement` right now. It influences bounded/unbounded nature by 
creating `OffsetRangeTracker` vs `GrowableOffsetRangeTracker` in 
`ReadFromKafkaDoFn.restrictionTracker(...)`. That calculation is being 
influenced by the `OffsetRange` input which is being created in 
`ReadFromKafkaDoFn.initialRestriction(...)` based on the properties of the 
input `KafkaSourceDescriptor`. `KafkaSourceDescriptor` is the `@Element` 
itself, so it's a runtime value. How would you determine during the beam graph 
building phase that the runtime value that is being dynamically 
generated/created will be unbounded or bounded? I mean when the developer 
provides the `stopReadTime` through the `KafkaIO` methods it is doable, but we 
also have `KafkaIO.readSourceDescriptors()` - in that case `KafkaIO` has no 
idea if the descriptors will be unbounded/bounded so automatic detection can't 
have a 100% coverage. Or did you mean to make a hybrid s
 olution where it tries to detect it, but can be overridden manually?
   
   
   > we would want the above change (which is safe) along with this experiment
   
   I don't get this. If the bounded/unbounded separation is implemented, why 
would having this experiment make any sense for `KafkaIO`? The 
bounded/unbounded would be already handled properly according to the annotation 
- so there would be no batch->streaming protection to prevent.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to