scwhittle commented on code in PR #32456:
URL: https://github.com/apache/beam/pull/32456#discussion_r1762879402


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -461,6 +461,11 @@ public ProcessContinuation processElement(
           return ProcessContinuation.resume();
         }
         for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
+          // If the Kafka consumer returns a record with an offset that is 
already processed
+          // the record can be safely skipped.
+          if (rawRecord.offset() < startOffset) {
+            continue;

Review Comment:
   ProcessContinuation.resume() will start again from the same restriction at a 
later point.  The bundle stops processing and the runner will reschedule the 
restriction for processing later. Even without a delay it gives a chance for 
other processing to start, but a delay in this case is probably desirable 
because we observed it took several minutes to resolve.
   
   So if the seek never works, it would be a problem, but if the seek 
eventually works (assuming it is some delay in noticing the partition offsets) 
that would be a way to wait for it without skipping/reading a bunch of events.
   
   +1 to putting the logic to handle the failed seek (either a resume or 
skipping records) right after the seek itself instead of in the main read loop. 
Perhaps we should read and skip, counting how many we skipped and log a warning 
that X records were read/skipped due to seek not returning to the same 
poisition.  And if we do that for 10 seconds or something and don't reach the 
start offset we can stop and return a PRocessingContinuation.resume() with a 
delay.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to