autumnust commented on a change in pull request #2721: Add offset look-back 
option in Kafka consumer
URL: https://github.com/apache/incubator-gobblin/pull/2721#discussion_r317379980
 
 

 ##########
 File path: 
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
 ##########
 @@ -451,7 +459,21 @@ private WorkUnit 
getWorkUnitForTopicPartition(KafkaPartition partition, SourceSt
         LOG.warn(
             offsetNotFoundMsg + "This partition will start from the earliest 
offset: " + offsets.getEarliestOffset());
         offsets.startAtEarliestOffset();
-      } else {
+      } else if (offsetOption.equals(OFFSET_LOOKBACK)) {
+        long lookbackOffsetRange = 
state.getPropAsLong("kafka.offset.lookback", 0L);
+        long offset = offsets.getLatestOffset() - lookbackOffsetRange;
+        LOG.warn(offsetNotFoundMsg + "This partition will start from 
latest-lookback [ " + offsets.getLatestOffset() + " - " + lookbackOffsetRange + 
" ]  start offset: " + offset);
+        try {
+          offsets.startAt(offset);
+        } catch (StartOffsetOutOfRangeException e) {
+          String offsetOutOfRangeMsg = String.format(
+                  "Start offset for partition %s is out of range. Start offset 
= %d, earliest offset = %d, latest offset = %d.",
+                  partition, offsets.getStartOffset(), 
offsets.getEarliestOffset(), offsets.getLatestOffset());
+          LOG.warn(offsetOutOfRangeMsg + "This partition will start from the 
latest offset: " + offsets.getLatestOffset());
 
 Review comment:
   In `KafkaSource` there's a configuration `RESET_ON_OFFSET_OUT_OF_RANGE` 
which configures a specific behavior when offset is out of bound.  Let's use 
that configuration here to determine the behavior for consistency. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to