kennknowles commented on PR #16773:
URL: https://github.com/apache/beam/pull/16773#issuecomment-1096900856

   > > I looked into this, and I think the right fix is to have a separate 
ReadBoundedFromKafkaDoFn that indicates statically that it is bounded per 
element.
   
   > As far as I understand:
   > `ReadFromKafkaDoFn` is the `DoFn` we are talking about that has 
`@UnboundedPerElement` right now. It influences bounded/unbounded nature by 
creating `OffsetRangeTracker` vs `GrowableOffsetRangeTracker` in 
`ReadFromKafkaDoFn.restrictionTracker(...)`. That calculation is being 
influenced by the `OffsetRange` input which is being created in 
`ReadFromKafkaDoFn.initialRestriction(...)` based on the properties of the 
input `KafkaSourceDescriptor`. `KafkaSourceDescriptor` is the `@Element` 
itself, so it's a runtime value. How would you determine during the beam graph 
building phase that the runtime value that is being dynamically 
generated/created will be unbounded or bounded? I mean when the developer 
provides the stopReadTime through the KafkaIO methods it is doable, but we also 
have `KafkaIO.readSourceDescriptors()` - in that case `KafkaIO` has no idea if 
the descriptors will be unbounded/bounded so automatic detection can't have a 
100% coverage. Or did you mean to make a hybrid sol
 ution where it tries to detect it, but can be overridden manually?
   
   Thanks for all the details. Just to make sure the premises are clear:
   
    - Bounded/unbounded is by definition a static property of a PCollection. 
Bounded means it is finite and you can know it statically. Unbounded means it 
could be finite or infinite and you can only find out dynamically.
    - Bounded/unbounded output per element is a static property of a DoFn
    - A DoFn must declare whether it is bounded or unbounded per element, and 
is responsible for adhering to that contract, or it may crash or hang or any 
number of problems.
   
   So two separate code paths - either entry points or a conditional based on 
`stopReadTime` - provides two static contexts for setting whether it will be 
bounded or unbounded.
   
   I think if you start at the `DoFn` and work outwards you would find all the 
things that need to be forked in order to plumb the boundedness all the way 
through. For example `readSourceDescriptors` may need either a flag saying "all 
descriptors will be bounded" or a forked version that means the same thing.
   
   > > we would want the above change (which is safe) along with this experiment
   > 
   > I don't get this. If the bounded/unbounded separation is implemented, why 
would having this experiment make any sense for `KafkaIO`? The 
bounded/unbounded would be already handled properly according to the annotation 
- so there would be no batch->streaming protection to prevent.
   
   I agree. Not sure what I was thinking about. I thought there was still a 
case where you end up using this to force something. But ideally you would not 
need it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to