sjvanrossum commented on code in PR #32889:
URL: https://github.com/apache/beam/pull/32889#discussion_r1823518401
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -711,23 +710,28 @@ private void setupInitialOffset(PartitionState<K, V>
pState) {
// Called from setupInitialOffset() at the start and then periodically from
offsetFetcher thread.
private void updateLatestOffsets() {
Consumer<byte[], byte[]> offsetConsumer =
Preconditions.checkStateNotNull(this.offsetConsumer);
- for (PartitionState<K, V> p : partitionStates) {
- try {
- Instant fetchTime = Instant.now();
- ConsumerSpEL.evaluateSeek2End(offsetConsumer, p.topicPartition);
- long offset = offsetConsumer.position(p.topicPartition);
- p.setLatestOffset(offset, fetchTime);
- } catch (Exception e) {
- if (closed.get()) { // Ignore the exception if the reader is closed.
- break;
- }
+ List<TopicPartition> topicPartitions =
+ Preconditions.checkStateNotNull(source.getSpec().getTopicPartitions());
+ Instant fetchTime = Instant.now();
+ try {
+ Map<TopicPartition, Long> endOffsets =
offsetConsumer.endOffsets(topicPartitions);
Review Comment:
We don't need them at all since the offset consumers unconditionally report
Kafka's high watermark for the `TopicPartition` as the end offset. For main
consumers with `isolation.level` set to `read_committed ` which will only
consume up to `min(high watermark, last stable offset)` and will not be able to
make any meaningful progress on the difference between the two.
It's not entirely trivial to remove the offset consumer since some methods
of the source or SDF may be called concurrently and consumers are unfortunately
not thread-safe. The rewrite I have in progress to decouple the lifetime of the
consumer from the SDF enables us to remove the offset consumer since the end
offset is updated in the consumer thread and stored in an `AtomicLong`. I think
that change alone should be sufficient, but it implies that the consumer thread
has full control over when the end offset is updated so the runner may receive
a slightly stale value although never less than the current position.
I left that change out so this PR doesn't get too cluttered with changes,
but it's on my to do list.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]