MeghaUpadhyay commented on a change in pull request #2721: Add offset look-back option in Kafka consumer URL: https://github.com/apache/incubator-gobblin/pull/2721#discussion_r317401628
########## File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java ########## @@ -451,7 +459,21 @@ private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, SourceSt LOG.warn( offsetNotFoundMsg + "This partition will start from the earliest offset: " + offsets.getEarliestOffset()); offsets.startAtEarliestOffset(); - } else { + } else if (offsetOption.equals(OFFSET_LOOKBACK)) { + long lookbackOffsetRange = state.getPropAsLong("kafka.offset.lookback", 0L); + long offset = offsets.getLatestOffset() - lookbackOffsetRange; + LOG.warn(offsetNotFoundMsg + "This partition will start from latest-lookback [ " + offsets.getLatestOffset() + " - " + lookbackOffsetRange + " ] start offset: " + offset); + try { + offsets.startAt(offset); + } catch (StartOffsetOutOfRangeException e) { + String offsetOutOfRangeMsg = String.format( + "Start offset for partition %s is out of range. Start offset = %d, earliest offset = %d, latest offset = %d.", + partition, offsets.getStartOffset(), offsets.getEarliestOffset(), offsets.getLatestOffset()); + LOG.warn(offsetOutOfRangeMsg + "This partition will start from the latest offset: " + offsets.getLatestOffset()); Review comment: sure , I have made the changes to use same functionality. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services