sjvanrossum commented on code in PR #32889:
URL: https://github.com/apache/beam/pull/32889#discussion_r1823518401


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -711,23 +710,28 @@ private void setupInitialOffset(PartitionState<K, V> 
pState) {
   // Called from setupInitialOffset() at the start and then periodically from 
offsetFetcher thread.
   private void updateLatestOffsets() {
     Consumer<byte[], byte[]> offsetConsumer = 
Preconditions.checkStateNotNull(this.offsetConsumer);
-    for (PartitionState<K, V> p : partitionStates) {
-      try {
-        Instant fetchTime = Instant.now();
-        ConsumerSpEL.evaluateSeek2End(offsetConsumer, p.topicPartition);
-        long offset = offsetConsumer.position(p.topicPartition);
-        p.setLatestOffset(offset, fetchTime);
-      } catch (Exception e) {
-        if (closed.get()) { // Ignore the exception if the reader is closed.
-          break;
-        }
+    List<TopicPartition> topicPartitions =
+        Preconditions.checkStateNotNull(source.getSpec().getTopicPartitions());
+    Instant fetchTime = Instant.now();
+    try {
+      Map<TopicPartition, Long> endOffsets = 
offsetConsumer.endOffsets(topicPartitions);

Review Comment:
   We don't need them at all since the offset consumers unconditionally report 
Kafka's high watermark for the `TopicPartition` as the end offset. For main 
consumers with `isolation.level` set to `read_committed ` which will only 
consume up to `min(high watermark, last stable offset)` and will not be able to 
make any meaningful progress on the difference between the two.
   
   It's not entirely trivial to remove the offset consumer since some methods 
of the source or SDF may be called concurrently and consumers are unfortunately 
not thread-safe. The rewrite I have in progress to decouple the lifetime of the 
consumer from the SDF enables us to remove the offset consumer since the end 
offset is updated in the consumer thread and stored in an `AtomicLong`. I think 
that change alone should be sufficient, but it implies that the consumer thread 
has full control over when the end offset is updated so the runner may receive 
a slightly stale value although never less than the current position.
   
   I left that change out so this PR doesn't get too cluttered with changes, 
but it's on my to do list.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to