nbali commented on code in PR #26142:
URL: https://github.com/apache/beam/pull/26142#discussion_r1159017618
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -424,6 +425,29 @@ public ProcessContinuation processElement(
}
}
+ // see https://github.com/apache/beam/issues/25962
+ private ConsumerRecords<byte[], byte[]> poll(
+ Consumer<byte[], byte[]> consumer, TopicPartition topicPartition) {
+ final Stopwatch sw = Stopwatch.createStarted();
+ long previousPosition = -1;
+ while (true) {
+ final ConsumerRecords<byte[], byte[]> rawRecords =
+ consumer.poll(KAFKA_POLL_TIMEOUT.minus(sw.elapsed()));
+ if (!rawRecords.isEmpty()) {
+ // return as we have found some entries
+ return rawRecords;
+ }
+ if (previousPosition == (previousPosition =
consumer.position(topicPartition))) {
Review Comment:
(end of stream -> no offset change possible)
pre-3.2.0+EOS -> poll should block for timeout already, return empty, *both
position and timeout check should return*
post-3.2.0+EOS -> poll should block for timeout already, return empty, *both
position and timeout check should return*
(big gap -> timeout isn't enough to iterate over it)
pre-3.2.0+big gap -> poll should block for timeout already, return empty,
*timeout check should return*
post-3.2.0+big gap -> poll returns empty immediately, *every check fails*
(small gap -> timeout is enough to iterate over it)
pre-3.2.0+small gap -> poll blocks until it returns a non-empty result,
*non-empty check should return*
post-3.2.0+small gap -> poll returns empty immediately, *every check fails*
So _theoretically_ there isn't any scenario where the position check
triggers the return, but nothing else.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]