sjvanrossum commented on code in PR #32986:
URL: https://github.com/apache/beam/pull/32986#discussion_r1890030711


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -215,20 +233,329 @@ private ReadFromKafkaDoFn(
    * must run clean up tasks when {@link #teardown()} is called.
    */
   private static final class SharedStateHolder {
-
-    private static final Map<Long, LoadingCache<KafkaSourceDescriptor, 
KafkaLatestOffsetEstimator>>
-        OFFSET_ESTIMATOR_CACHE = new ConcurrentHashMap<>();
     private static final Map<Long, LoadingCache<KafkaSourceDescriptor, 
AverageRecordSize>>
         AVG_RECORD_SIZE_CACHE = new ConcurrentHashMap<>();
+    private static final Map<
+            Long, LoadingCache<Optional<ImmutableSet<String>>, 
ConsumerExecutionContext>>
+        CONSUMER_EXECUTION_CONTEXT_CACHE = new ConcurrentHashMap<>();
+  }
+
+  static final class TopicPartitionPollState implements AutoCloseable {
+    private static final List<ConsumerRecord<byte[], byte[]>> CLOSED_SENTINEL 
= Arrays.asList();
+
+    private final AtomicBoolean closed;
+    private final LinkedTransferQueue<List<ConsumerRecord<byte[], byte[]>>> 
queue;
+    private final TopicPartition topicPartition;
+    private final OffsetRange offsetRange;
+
+    TopicPartitionPollState(final TopicPartition topicPartition, final 
OffsetRange offsetRange) {
+      this.closed = new AtomicBoolean();
+      this.queue = new LinkedTransferQueue<>();
+      this.topicPartition = topicPartition;
+      this.offsetRange = offsetRange;
+    }
+
+    TopicPartition getTopicPartition() {
+      return this.topicPartition;
+    }
+
+    OffsetRange getOffsetRange() {
+      return this.offsetRange;
+    }
+
+    boolean hasWaitingConsumer() {
+      return !this.closed.get() && queue.hasWaitingConsumer();
+    }
+
+    Optional<List<ConsumerRecord<byte[], byte[]>>> take() throws 
InterruptedException {
+      if (this.closed.get()) {
+        return Optional.empty();
+      }
+      return Optional.of(queue.take()).filter(records -> records != 
CLOSED_SENTINEL);

Review Comment:
   Resolving since this class has been removed in the redesign.
   If I recall correctly, instances of this class would only be shared between 
the Kafka consumer thread and the current SDF thread. Setting `closed` to 
`true` and inserting the sentinel element was intended to unblock queue 
consumers awaiting the result of `take()`. Sharing `CancellableQueue` would 
have been my preferred option if this class had stuck around.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to