Dippatel98 commented on code in PR #32456: URL: https://github.com/apache/beam/pull/32456#discussion_r1768853704
########## sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java: ########## @@ -460,7 +460,28 @@ public ProcessContinuation processElement( } return ProcessContinuation.resume(); } + long skippedRecords = 0L; + final Stopwatch sw = Stopwatch.createStarted(); for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) { + // If the Kafka consumer returns a record with an offset that is already processed + // the record can be safely skipped. This is needed because there is a possibility + // that the seek() above fails to move the offset to the desired position. In which + // case poll() would return records that are already cnsumed. + if (rawRecord.offset() < startOffset) { + // If the start offset is not reached even after skipping the records for 10 seconds + // then the processing is stopped with a backoff to give the Kakfa server some time + // catch up. + if (sw.elapsed().getSeconds() > 10L) { + LOG.error( + "The expected offset was not reached even after skipping consumed records for 10 seconds," Review Comment: Done. ########## sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java: ########## @@ -460,7 +460,28 @@ public ProcessContinuation processElement( } return ProcessContinuation.resume(); } + long skippedRecords = 0L; + final Stopwatch sw = Stopwatch.createStarted(); for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) { + // If the Kafka consumer returns a record with an offset that is already processed + // the record can be safely skipped. This is needed because there is a possibility + // that the seek() above fails to move the offset to the desired position. In which + // case poll() would return records that are already cnsumed. + if (rawRecord.offset() < startOffset) { + // If the start offset is not reached even after skipping the records for 10 seconds + // then the processing is stopped with a backoff to give the Kakfa server some time + // catch up. + if (sw.elapsed().getSeconds() > 10L) { + LOG.error( + "The expected offset was not reached even after skipping consumed records for 10 seconds," + + " the processing of this bundle will be attempted at a later time."); + return ProcessContinuation.resume() + .withResumeDelay(org.joda.time.Duration.standardSeconds(10L)); + } else { + skippedRecords++; + continue; + } + } if (!tracker.tryClaim(rawRecord.offset())) { Review Comment: Moved the log here, but I don't think the if statement is needed here. Since the skippedRecord is always going to be > 0 if it reaches the logging statement. ########## sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java: ########## @@ -511,6 +532,9 @@ public ProcessContinuation processElement( } } } + LOG.warn( + "{} records were skipped because of seek returning an earlier position.", Review Comment: Done. ########## sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java: ########## @@ -460,7 +460,28 @@ public ProcessContinuation processElement( } return ProcessContinuation.resume(); } + long skippedRecords = 0L; + final Stopwatch sw = Stopwatch.createStarted(); Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org