This is an automated email from the ASF dual-hosted git repository.
shirshanka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2fc2f62 [GOBBLIN-873] Add offset look-back option in Kafka consumer
2fc2f62 is described below
commit 2fc2f62d679b279cbb6d066e6ec378a72037f23a
Author: meupadhyay <[email protected]>
AuthorDate: Sun Sep 8 22:55:53 2019 -0700
[GOBBLIN-873] Add offset look-back option in Kafka consumer
Closes #2721 from MeghaUpadhyay/master
---
.../extractor/extract/kafka/KafkaSource.java | 51 ++++++++++++++++++++--
.../main/resources/templates/kafka-hdfs.template | 1 +
2 files changed, 49 insertions(+), 3 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index f8c6f4a..62485f0 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -90,7 +90,9 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
public static final String LATEST_OFFSET = "latest";
public static final String EARLIEST_OFFSET = "earliest";
public static final String NEAREST_OFFSET = "nearest";
+ public static final String OFFSET_LOOKBACK = "offset_lookback";
public static final String BOOTSTRAP_WITH_OFFSET = "bootstrap.with.offset";
+ public static final String KAFKA_OFFSET_LOOKBACK = "kafka.offset.lookback";
public static final String DEFAULT_BOOTSTRAP_WITH_OFFSET = LATEST_OFFSET;
public static final String TOPICS_MOVE_TO_LATEST_OFFSET =
"topics.move.to.latest.offset";
public static final String RESET_ON_OFFSET_OUT_OF_RANGE =
"reset.on.offset.out.of.range";
@@ -440,8 +442,14 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
offsets.startAtLatestOffset();
} else if (previousOffsetNotFound) {
- // When previous offset cannot be found, either start at earliest offset
or latest offset, or skip the partition
- // (no need to create an empty workunit in this case since there's no
offset to persist).
+ /**
+ * When previous offset cannot be found, either start at earliest
offset, latest offset, go back with (latest - lookback)
+ * (long value to be deducted from latest offset in order to avoid data
loss) or skip the partition
+ * (no need to create an empty workunit in this case since there's no
offset to persist).
+ * In case of no previous state OFFSET_LOOKBACK will make sure to avoid
consuming huge amount of data (earlist) and data loss (latest offset)
+ * lookback can be set to any long value where (latest-lookback) is
nearest offset for each partition. If computed offset is out of range then
+ * partition will be consumed from latest offset
+ **/
String offsetNotFoundMsg = String.format("Previous offset for partition
%s does not exist. ", partition);
String offsetOption = state.getProp(BOOTSTRAP_WITH_OFFSET,
DEFAULT_BOOTSTRAP_WITH_OFFSET).toLowerCase();
if (offsetOption.equals(LATEST_OFFSET)) {
@@ -451,7 +459,44 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
LOG.warn(
offsetNotFoundMsg + "This partition will start from the earliest
offset: " + offsets.getEarliestOffset());
offsets.startAtEarliestOffset();
- } else {
+ } else if (offsetOption.equals(OFFSET_LOOKBACK)) {
+ long lookbackOffsetRange = state.getPropAsLong(KAFKA_OFFSET_LOOKBACK ,
0L);
+ long latestOffset = offsets.getLatestOffset();
+ long offset = latestOffset - lookbackOffsetRange;
+ LOG.warn(offsetNotFoundMsg + "This partition will start from
latest-lookback [ " + latestOffset + " - " + lookbackOffsetRange + " ] start
offset: " + offset);
+ try {
+ offsets.startAt(offset);
+ } catch (StartOffsetOutOfRangeException e) {
+ // Increment counts, which will be reported as job metrics
+ if (offsets.getStartOffset() <= offsets.getLatestOffset()) {
+ this.offsetTooEarlyCount.incrementAndGet();
+ } else {
+ this.offsetTooLateCount.incrementAndGet();
+ }
+
+ // When above computed offset (latest-lookback) is out of range,
either start at earliest, latest or nearest offset, or skip the
+ // partition. If skipping, need to create an empty workunit so that
previousOffset is persisted.
+ String offsetOutOfRangeMsg = String.format(
+ "Start offset for partition %s is out of range. Start offset
= %d, earliest offset = %d, latest offset = %d.",
+ partition, offsets.getStartOffset(),
offsets.getEarliestOffset(), offsets.getLatestOffset());
+ offsetOption =
+ state.getProp(RESET_ON_OFFSET_OUT_OF_RANGE,
DEFAULT_RESET_ON_OFFSET_OUT_OF_RANGE).toLowerCase();
+ if (offsetOption.equals(LATEST_OFFSET) ||
(offsetOption.equals(NEAREST_OFFSET)
+ && offsets.getStartOffset() >= offsets.getLatestOffset())) {
+ LOG.warn(
+ offsetOutOfRangeMsg + "This partition will start from the
latest offset: " + offsets.getLatestOffset());
+ offsets.startAtLatestOffset();
+ } else if (offsetOption.equals(EARLIEST_OFFSET) ||
offsetOption.equals(NEAREST_OFFSET)) {
+ LOG.warn(offsetOutOfRangeMsg + "This partition will start from the
earliest offset: " + offsets
+ .getEarliestOffset());
+ offsets.startAtEarliestOffset();
+ } else {
+ LOG.warn(offsetOutOfRangeMsg + "This partition will be skipped.");
+ return createEmptyWorkUnit(partition, previousOffset,
previousOffsetFetchEpochTime, topicSpecificState);
+ }
+ }
+ }
+ else {
LOG.warn(offsetNotFoundMsg + "This partition will be skipped.");
return null;
}
diff --git a/gobblin-runtime/src/main/resources/templates/kafka-hdfs.template
b/gobblin-runtime/src/main/resources/templates/kafka-hdfs.template
index 773f848..5aad7a6 100644
--- a/gobblin-runtime/src/main/resources/templates/kafka-hdfs.template
+++ b/gobblin-runtime/src/main/resources/templates/kafka-hdfs.template
@@ -43,6 +43,7 @@ source.kafka.json.schema=${kafka.schema.json}
topic.whitelist=${kafka.topics}
org.apache.gobblin.kafka.consumerClient.class="org.apache.gobblin.kafka.client.Kafka09ConsumerClient$Factory"
+# Accepted values - latest, earliest and with offset lookback (latest offset -
lookback)
bootstrap.with.offset=earliest
kafka.workunit.packer.type=SINGLE_LEVEL
mr.job.max.mappers=10