nbali commented on PR #16773: URL: https://github.com/apache/beam/pull/16773#issuecomment-1095646442
> I looked into this, and I think the right fix is to have a separate `ReadBoundedFromKafkaDoFn` that indicates statically that it is bounded per element. As far as I understand: `ReadFromKafkaDoFn` is the `DoFn` we are talking about that has `@UnboundedPerElement` right now. It influences bounded/unbounded nature by creating `OffsetRangeTracker` vs `GrowableOffsetRangeTracker` in `ReadFromKafkaDoFn.restrictionTracker(...)`. That calculation is being influenced by the `OffsetRange` input which is being created in `ReadFromKafkaDoFn.initialRestriction(...)` based on the properties of the input `KafkaSourceDescriptor`. `KafkaSourceDescriptor` is the `@Element` itself, so it's a runtime value. How would you determine during the beam graph building phase that the runtime value that is being dynamically generated/created will be unbounded or bounded? I mean when the developer provides the `stopReadTime` through the `KafkaIO` methods it is doable, but we also have `KafkaIO.readSourceDescriptors()` - in that case `KafkaIO` has no idea if the descriptors will be unbounded/bounded so automatic detection can't have a 100% coverage. Or did you mean to make a hybrid s olution where it tries to detect it, but can be overridden manually? > we would want the above change (which is safe) along with this experiment I don't get this. If the bounded/unbounded separation is implemented, why would having this experiment make any sense for `KafkaIO`? The bounded/unbounded would be already handled properly according to the annotation - so there would be no batch->streaming protection to prevent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
