Abacn commented on code in PR #34331:
URL: https://github.com/apache/beam/pull/34331#discussion_r2068707560


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -262,82 +333,133 @@ private static final class SharedStateHolder {
    * fetch backlog.
    */
   private static class KafkaLatestOffsetEstimator
-      implements GrowableOffsetRangeTracker.RangeEndEstimator {
-
+      implements GrowableOffsetRangeTracker.RangeEndEstimator, Closeable {
+    private static final 
AtomicReferenceFieldUpdater<KafkaLatestOffsetEstimator, @Nullable Runnable>
+        CURRENT_REFRESH_TASK =
+            (AtomicReferenceFieldUpdater<KafkaLatestOffsetEstimator, @Nullable 
Runnable>)
+                AtomicReferenceFieldUpdater.newUpdater(
+                    KafkaLatestOffsetEstimator.class, Runnable.class, 
"currentRefreshTask");
+    private final Executor executor;
     private final Consumer<byte[], byte[]> offsetConsumer;
     private final TopicPartition topicPartition;
-    private final Supplier<Long> memoizedBacklog;
+    private long lastRefreshEndOffset;
+    private long nextRefreshNanos;
+    private volatile @Nullable Runnable currentRefreshTask;
 
     KafkaLatestOffsetEstimator(
-        Consumer<byte[], byte[]> offsetConsumer, TopicPartition 
topicPartition) {
+        final Consumer<byte[], byte[]> offsetConsumer, final TopicPartition 
topicPartition) {
+      this.executor = Executors.newSingleThreadExecutor();
       this.offsetConsumer = offsetConsumer;
       this.topicPartition = topicPartition;
-      memoizedBacklog =
-          Suppliers.memoizeWithExpiration(
-              () -> {
-                synchronized (offsetConsumer) {
-                  return Preconditions.checkStateNotNull(
-                      offsetConsumer
-                          .endOffsets(Collections.singleton(topicPartition))
-                          .get(topicPartition),
-                      "No end offset found for partition %s.",
-                      topicPartition);
-                }
-              },
-              1,
-              TimeUnit.SECONDS);
+      this.lastRefreshEndOffset = -1L;
+      this.nextRefreshNanos = Long.MIN_VALUE;
+      this.currentRefreshTask = null;
     }
 
     @Override
-    protected void finalize() {
-      try {
-        Closeables.close(offsetConsumer, true);
-        LOG.info("Offset Estimator consumer was closed for {}", 
topicPartition);
-      } catch (Exception anyException) {
-        LOG.warn("Failed to close offset consumer for {}", topicPartition);
+    public long estimate() {
+      final @Nullable Runnable task = currentRefreshTask; // volatile load 
(acquire)

Review Comment:
   I see the conversion is resolved, merging for now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to