sjvanrossum commented on code in PR #32986:
URL: https://github.com/apache/beam/pull/32986#discussion_r1890224117


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -267,80 +284,121 @@ private static final class SharedStateHolder {
   private static class KafkaLatestOffsetEstimator
       implements GrowableOffsetRangeTracker.RangeEndEstimator {
 
-    private final Consumer<byte[], byte[]> offsetConsumer;
-    private final TopicPartition topicPartition;
-    private final Supplier<Long> memoizedBacklog;
+    private final KafkaSourceDescriptor sourceDescriptor;
+    private final LoadingCache<Optional<ImmutableSet<String>>, 
ConcurrentConsumer<byte[], byte[]>>
+        consumerExecutionContextCache;
+    private @MonotonicNonNull ConcurrentConsumer<byte[], byte[]> 
consumerExecutionContextInstance;

Review Comment:
   The context cache stores values as weak references so it's mainly there to 
retain a strong reference to a healthy context. The intent is to make a context 
eligible for eviction from this cache when the last remaining bundle processor 
referring to this `ReadFromKafkaDoFn` instance has been evicted and eventually 
collected to minimize unnecessary evictions from the context cache that would 
result from a time or size based eviction policy.
   
   The use of this field is a mess though so I'll clean that up. 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to