This is an automated email from the ASF dual-hosted git repository.

shirshanka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fc2f62  [GOBBLIN-873] Add offset look-back option in Kafka consumer
2fc2f62 is described below

commit 2fc2f62d679b279cbb6d066e6ec378a72037f23a
Author: meupadhyay <[email protected]>
AuthorDate: Sun Sep 8 22:55:53 2019 -0700

    [GOBBLIN-873] Add offset look-back option in Kafka consumer
    
    Closes #2721 from MeghaUpadhyay/master
---
 .../extractor/extract/kafka/KafkaSource.java       | 51 ++++++++++++++++++++--
 .../main/resources/templates/kafka-hdfs.template   |  1 +
 2 files changed, 49 insertions(+), 3 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index f8c6f4a..62485f0 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -90,7 +90,9 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
   public static final String LATEST_OFFSET = "latest";
   public static final String EARLIEST_OFFSET = "earliest";
   public static final String NEAREST_OFFSET = "nearest";
+  public static final String OFFSET_LOOKBACK = "offset_lookback";
   public static final String BOOTSTRAP_WITH_OFFSET = "bootstrap.with.offset";
+  public static final String KAFKA_OFFSET_LOOKBACK = "kafka.offset.lookback";
   public static final String DEFAULT_BOOTSTRAP_WITH_OFFSET = LATEST_OFFSET;
   public static final String TOPICS_MOVE_TO_LATEST_OFFSET = 
"topics.move.to.latest.offset";
   public static final String RESET_ON_OFFSET_OUT_OF_RANGE = 
"reset.on.offset.out.of.range";
@@ -440,8 +442,14 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
       offsets.startAtLatestOffset();
     } else if (previousOffsetNotFound) {
 
-      // When previous offset cannot be found, either start at earliest offset 
or latest offset, or skip the partition
-      // (no need to create an empty workunit in this case since there's no 
offset to persist).
+      /**
+       * When previous offset cannot be found, either start at earliest 
offset, latest offset, go back with (latest - lookback)
+       * (long value to be deducted from latest offset in order to avoid data 
loss) or skip the partition
+       * (no need to create an empty workunit in this case since there's no 
offset to persist).
+       * In case of no previous state OFFSET_LOOKBACK will make sure to avoid 
consuming huge amount of data (earlist) and data loss (latest offset)
+       * lookback can be set to any long value where (latest-lookback) is 
nearest offset for each partition. If computed offset is out of range then
+       * partition will be consumed from latest offset
+       **/
       String offsetNotFoundMsg = String.format("Previous offset for partition 
%s does not exist. ", partition);
       String offsetOption = state.getProp(BOOTSTRAP_WITH_OFFSET, 
DEFAULT_BOOTSTRAP_WITH_OFFSET).toLowerCase();
       if (offsetOption.equals(LATEST_OFFSET)) {
@@ -451,7 +459,44 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
         LOG.warn(
             offsetNotFoundMsg + "This partition will start from the earliest 
offset: " + offsets.getEarliestOffset());
         offsets.startAtEarliestOffset();
-      } else {
+      } else if (offsetOption.equals(OFFSET_LOOKBACK)) {
+        long lookbackOffsetRange = state.getPropAsLong(KAFKA_OFFSET_LOOKBACK , 
0L);
+        long latestOffset = offsets.getLatestOffset();
+        long offset = latestOffset - lookbackOffsetRange;
+        LOG.warn(offsetNotFoundMsg + "This partition will start from 
latest-lookback [ " + latestOffset + " - " + lookbackOffsetRange + " ]  start 
offset: " + offset);
+        try {
+          offsets.startAt(offset);
+        } catch (StartOffsetOutOfRangeException e) {
+          // Increment counts, which will be reported as job metrics
+          if (offsets.getStartOffset() <= offsets.getLatestOffset()) {
+            this.offsetTooEarlyCount.incrementAndGet();
+          } else {
+            this.offsetTooLateCount.incrementAndGet();
+          }
+
+          // When above computed offset (latest-lookback) is out of range, 
either start at earliest, latest or nearest offset, or skip the
+          // partition. If skipping, need to create an empty workunit so that 
previousOffset is persisted.
+          String offsetOutOfRangeMsg = String.format(
+                  "Start offset for partition %s is out of range. Start offset 
= %d, earliest offset = %d, latest offset = %d.",
+                  partition, offsets.getStartOffset(), 
offsets.getEarliestOffset(), offsets.getLatestOffset());
+          offsetOption =
+                  state.getProp(RESET_ON_OFFSET_OUT_OF_RANGE, 
DEFAULT_RESET_ON_OFFSET_OUT_OF_RANGE).toLowerCase();
+          if (offsetOption.equals(LATEST_OFFSET) || 
(offsetOption.equals(NEAREST_OFFSET)
+                  && offsets.getStartOffset() >= offsets.getLatestOffset())) {
+            LOG.warn(
+                    offsetOutOfRangeMsg + "This partition will start from the 
latest offset: " + offsets.getLatestOffset());
+            offsets.startAtLatestOffset();
+          } else if (offsetOption.equals(EARLIEST_OFFSET) || 
offsetOption.equals(NEAREST_OFFSET)) {
+            LOG.warn(offsetOutOfRangeMsg + "This partition will start from the 
earliest offset: " + offsets
+                    .getEarliestOffset());
+            offsets.startAtEarliestOffset();
+          } else {
+            LOG.warn(offsetOutOfRangeMsg + "This partition will be skipped.");
+            return createEmptyWorkUnit(partition, previousOffset, 
previousOffsetFetchEpochTime, topicSpecificState);
+          }
+        }
+      }
+      else {
         LOG.warn(offsetNotFoundMsg + "This partition will be skipped.");
         return null;
       }
diff --git a/gobblin-runtime/src/main/resources/templates/kafka-hdfs.template 
b/gobblin-runtime/src/main/resources/templates/kafka-hdfs.template
index 773f848..5aad7a6 100644
--- a/gobblin-runtime/src/main/resources/templates/kafka-hdfs.template
+++ b/gobblin-runtime/src/main/resources/templates/kafka-hdfs.template
@@ -43,6 +43,7 @@ source.kafka.json.schema=${kafka.schema.json}
 
 topic.whitelist=${kafka.topics}
 
org.apache.gobblin.kafka.consumerClient.class="org.apache.gobblin.kafka.client.Kafka09ConsumerClient$Factory"
+# Accepted values - latest, earliest and with offset lookback (latest offset - 
lookback)
 bootstrap.with.offset=earliest
 kafka.workunit.packer.type=SINGLE_LEVEL
 mr.job.max.mappers=10

Reply via email to