sjvanrossum commented on code in PR #32986:
URL: https://github.com/apache/beam/pull/32986#discussion_r1890224117
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -267,80 +284,121 @@ private static final class SharedStateHolder {
private static class KafkaLatestOffsetEstimator
implements GrowableOffsetRangeTracker.RangeEndEstimator {
- private final Consumer<byte[], byte[]> offsetConsumer;
- private final TopicPartition topicPartition;
- private final Supplier<Long> memoizedBacklog;
+ private final KafkaSourceDescriptor sourceDescriptor;
+ private final LoadingCache<Optional<ImmutableSet<String>>,
ConcurrentConsumer<byte[], byte[]>>
+ consumerExecutionContextCache;
+ private @MonotonicNonNull ConcurrentConsumer<byte[], byte[]>
consumerExecutionContextInstance;
Review Comment:
The context cache stores values as weak references so it's mainly there to
retain a strong reference to a healthy context. The intent is to make a context
eligible for eviction from this cache when the last remaining bundle processor
referring to this `ReadFromKafkaDoFn` instance has been evicted and eventually
collected to minimize unnecessary evictions from the context cache that would
result from a time or size based eviction policy.
The use of this field is a mess though so I'll clean that up. 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]