scwhittle commented on code in PR #32456: URL: https://github.com/apache/beam/pull/32456#discussion_r1762879402
########## sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java: ########## @@ -461,6 +461,11 @@ public ProcessContinuation processElement( return ProcessContinuation.resume(); } for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) { + // If the Kafka consumer returns a record with an offset that is already processed + // the record can be safely skipped. + if (rawRecord.offset() < startOffset) { + continue; Review Comment: ProcessContinuation.resume() will start again from the same restriction at a later point. The bundle stops processing and the runner will reschedule the restriction for processing later. Even without a delay it gives a chance for other processing to start, but a delay in this case is probably desirable because we observed it took several minutes to resolve. So if the seek never works, it would be a problem, but if the seek eventually works (assuming it is some delay in noticing the partition offsets) that would be a way to wait for it without skipping/reading a bunch of events. +1 to putting the logic to handle the failed seek (either a resume or skipping records) right after the seek itself instead of in the main read loop. Perhaps we should read and skip, counting how many we skipped and log a warning that X records were read/skipped due to seek not returning to the same poisition. And if we do that for 10 seconds or something and don't reach the start offset we can stop and return a PRocessingContinuation.resume() with a delay. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org