scwhittle commented on code in PR #34201:
URL: https://github.com/apache/beam/pull/34201#discussion_r2095586246


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -173,19 +172,6 @@ public boolean advance() throws IOException {
         elementsReadBySplit.inc();
 
         ConsumerRecord<byte[], byte[]> rawRecord = pState.recordIter.next();
-        long expected = pState.nextOffset;
-        long offset = rawRecord.offset();
-
-        if (offset < expected) { // -- (a)
-          // this can happen when compression is enabled in Kafka (seems to be 
fixed in 0.10)

Review Comment:
   Thanks for checking!



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -627,29 +626,6 @@ public ProcessContinuation processElement(
         // Visible progress within the consumer polling timeout.
         // Partially or fully claim and process records in this batch.
         for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
-          // If the Kafka consumer returns a record with an offset that is 
already processed
-          // the record can be safely skipped. This is needed because there is 
a possibility
-          // that the seek() above fails to move the offset to the desired 
position. In which
-          // case poll() would return records that are already cnsumed.
-          if (rawRecord.offset() < startOffset) {
-            // If the start offset is not reached even after skipping the 
records for 10 seconds
-            // then the processing is stopped with a backoff to give the Kakfa 
server some time
-            // catch up.
-            if (sw.elapsed().getSeconds() > 10L) {
-              LOG.error(
-                  "The expected offset ({}) was not reached even after"
-                      + " skipping consumed records for 10 seconds. The offset 
we could"
-                      + " reach was {}. The processing of this bundle will be 
attempted"
-                      + " at a later time.",
-                  expectedOffset,
-                  rawRecord.offset());
-              consumer.pause(Collections.singleton(topicPartition));
-              return ProcessContinuation.resume()
-                  
.withResumeDelay(org.joda.time.Duration.standardSeconds(10L));
-            }
-            skippedRecords++;
-            continue;
-          }
           if (skippedRecords > 0L) {

Review Comment:
   can remove skippedRecords now too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to