Abacn commented on code in PR #34331: URL: https://github.com/apache/beam/pull/34331#discussion_r2068707560
########## sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java: ########## @@ -262,82 +333,133 @@ private static final class SharedStateHolder { * fetch backlog. */ private static class KafkaLatestOffsetEstimator - implements GrowableOffsetRangeTracker.RangeEndEstimator { - + implements GrowableOffsetRangeTracker.RangeEndEstimator, Closeable { + private static final AtomicReferenceFieldUpdater<KafkaLatestOffsetEstimator, @Nullable Runnable> + CURRENT_REFRESH_TASK = + (AtomicReferenceFieldUpdater<KafkaLatestOffsetEstimator, @Nullable Runnable>) + AtomicReferenceFieldUpdater.newUpdater( + KafkaLatestOffsetEstimator.class, Runnable.class, "currentRefreshTask"); + private final Executor executor; private final Consumer<byte[], byte[]> offsetConsumer; private final TopicPartition topicPartition; - private final Supplier<Long> memoizedBacklog; + private long lastRefreshEndOffset; + private long nextRefreshNanos; + private volatile @Nullable Runnable currentRefreshTask; KafkaLatestOffsetEstimator( - Consumer<byte[], byte[]> offsetConsumer, TopicPartition topicPartition) { + final Consumer<byte[], byte[]> offsetConsumer, final TopicPartition topicPartition) { + this.executor = Executors.newSingleThreadExecutor(); this.offsetConsumer = offsetConsumer; this.topicPartition = topicPartition; - memoizedBacklog = - Suppliers.memoizeWithExpiration( - () -> { - synchronized (offsetConsumer) { - return Preconditions.checkStateNotNull( - offsetConsumer - .endOffsets(Collections.singleton(topicPartition)) - .get(topicPartition), - "No end offset found for partition %s.", - topicPartition); - } - }, - 1, - TimeUnit.SECONDS); + this.lastRefreshEndOffset = -1L; + this.nextRefreshNanos = Long.MIN_VALUE; + this.currentRefreshTask = null; } @Override - protected void finalize() { - try { - Closeables.close(offsetConsumer, true); - LOG.info("Offset Estimator consumer was closed for {}", topicPartition); - } catch (Exception anyException) { - LOG.warn("Failed to close offset consumer for {}", topicPartition); + public long estimate() { + final @Nullable Runnable task = currentRefreshTask; // volatile load (acquire) Review Comment: I see the conversion is resolved, merging for now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org