sjvanrossum commented on code in PR #32986:
URL: https://github.com/apache/beam/pull/32986#discussion_r1890035467


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -215,20 +233,329 @@ private ReadFromKafkaDoFn(
    * must run clean up tasks when {@link #teardown()} is called.
    */
   private static final class SharedStateHolder {
-
-    private static final Map<Long, LoadingCache<KafkaSourceDescriptor, 
KafkaLatestOffsetEstimator>>
-        OFFSET_ESTIMATOR_CACHE = new ConcurrentHashMap<>();
     private static final Map<Long, LoadingCache<KafkaSourceDescriptor, 
AverageRecordSize>>
         AVG_RECORD_SIZE_CACHE = new ConcurrentHashMap<>();
+    private static final Map<
+            Long, LoadingCache<Optional<ImmutableSet<String>>, 
ConsumerExecutionContext>>
+        CONSUMER_EXECUTION_CONTEXT_CACHE = new ConcurrentHashMap<>();
+  }
+
+  static final class TopicPartitionPollState implements AutoCloseable {
+    private static final List<ConsumerRecord<byte[], byte[]>> CLOSED_SENTINEL 
= Arrays.asList();
+
+    private final AtomicBoolean closed;
+    private final LinkedTransferQueue<List<ConsumerRecord<byte[], byte[]>>> 
queue;
+    private final TopicPartition topicPartition;
+    private final OffsetRange offsetRange;
+
+    TopicPartitionPollState(final TopicPartition topicPartition, final 
OffsetRange offsetRange) {
+      this.closed = new AtomicBoolean();
+      this.queue = new LinkedTransferQueue<>();
+      this.topicPartition = topicPartition;
+      this.offsetRange = offsetRange;
+    }
+
+    TopicPartition getTopicPartition() {
+      return this.topicPartition;
+    }
+
+    OffsetRange getOffsetRange() {
+      return this.offsetRange;
+    }
+
+    boolean hasWaitingConsumer() {
+      return !this.closed.get() && queue.hasWaitingConsumer();
+    }
+
+    Optional<List<ConsumerRecord<byte[], byte[]>>> take() throws 
InterruptedException {

Review Comment:
   I don't think I have a need for a blocking queue between the Kafka consumer 
and SDF threads at the moment, but I had thought about reusing 
`CancellableQueue` if that exchange pattern stuck around. 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to