[GOBBLIN-408] Add more info to the KafkaExtractorTopicMetadata event for tracking execution times and rates
Closes #2285 from htran1/kafka_load_factor Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a3189d73 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a3189d73 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a3189d73 Branch: refs/heads/0.12.0 Commit: a3189d73360c13412d91d42bea05f6ded1e4006a Parents: 11182dc Author: Hung Tran <[email protected]> Authored: Tue Feb 20 11:16:32 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Tue Feb 20 11:16:32 2018 -0800 ---------------------------------------------------------------------- .../extractor/extract/kafka/KafkaExtractor.java | 81 ++++++++++++++++-- .../extractor/extract/kafka/KafkaSource.java | 87 ++++++++++++++++++-- .../workunit/packer/KafkaWorkUnitPacker.java | 18 ++++ 3 files changed, 173 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a3189d73/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java index 1ff0159..0ec3caf 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java @@ -69,7 +69,12 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { public static final String LOW_WATERMARK = "lowWatermark"; public static final String ACTUAL_HIGH_WATERMARK = "actualHighWatermark"; public static final String EXPECTED_HIGH_WATERMARK = "expectedHighWatermark"; + public static final String ELAPSED_TIME = "elapsedTime"; + public static final String PROCESSED_RECORD_COUNT = "processedRecordCount"; public static final String AVG_RECORD_PULL_TIME = "avgRecordPullTime"; + public static final String READ_RECORD_TIME = "readRecordTime"; + public static final String DECODE_RECORD_TIME = "decodeRecordTime"; + public static final String FETCH_MESSAGE_BUFFER_TIME = "fetchMessageBufferTime"; public static final String GOBBLIN_KAFKA_NAMESPACE = "gobblin.kafka"; public static final String KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME = "KafkaExtractorTopicMetadata"; @@ -87,6 +92,11 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { protected final Map<KafkaPartition, Integer> decodingErrorCount; private final Map<KafkaPartition, Double> avgMillisPerRecord; private final Map<KafkaPartition, Long> avgRecordSizes; + private final Map<KafkaPartition, Long> elapsedTime; + private final Map<KafkaPartition, Long> processedRecordCount; + private final Map<KafkaPartition, Long> decodeRecordTime; + private final Map<KafkaPartition, Long> fetchMessageBufferTime; + private final Map<KafkaPartition, Long> readRecordTime; private final Set<Integer> errorPartitions; private int undecodableMessageCount = 0; @@ -95,6 +105,10 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { private int currentPartitionIdx = INITIAL_PARTITION_IDX; private long currentPartitionRecordCount = 0; private long currentPartitionTotalSize = 0; + private long currentPartitionFetchDuration = 0; + private long currentPartitionDecodeRecordTime = 0; + private long currentPartitionFetchMessageBufferTime = 0; + private long currentPartitionReadRecordTime = 0; public KafkaExtractor(WorkUnitState state) { super(state); @@ -121,6 +135,11 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { this.decodingErrorCount = Maps.newHashMap(); this.avgMillisPerRecord = Maps.newHashMapWithExpectedSize(this.partitions.size()); this.avgRecordSizes = Maps.newHashMapWithExpectedSize(this.partitions.size()); + this.elapsedTime = Maps.newHashMapWithExpectedSize(this.partitions.size()); + this.processedRecordCount = Maps.newHashMapWithExpectedSize(this.partitions.size()); + this.decodeRecordTime = Maps.newHashMapWithExpectedSize(this.partitions.size()); + this.fetchMessageBufferTime = Maps.newHashMapWithExpectedSize(this.partitions.size()); + this.readRecordTime = Maps.newHashMapWithExpectedSize(this.partitions.size()); this.errorPartitions = Sets.newHashSet(); @@ -142,6 +161,8 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { @SuppressWarnings("unchecked") @Override public D readRecordImpl(D reuse) throws DataRecordException, IOException { + long readStartTime = System.nanoTime(); + while (!allPartitionsFinished()) { if (currentPartitionFinished()) { moveToNextPartition(); @@ -149,7 +170,9 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { } if (this.messageIterator == null || !this.messageIterator.hasNext()) { try { + long fetchStartTime = System.nanoTime(); this.messageIterator = fetchNextMessageBuffer(); + this.currentPartitionFetchMessageBufferTime += System.nanoTime() - fetchStartTime; } catch (Exception e) { LOG.error(String.format("Failed to fetch next message buffer for partition %s. Will skip this partition.", getCurrentPartition()), e); @@ -178,6 +201,9 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { this.nextWatermark.set(this.currentPartitionIdx, nextValidMessage.getNextOffset()); try { D record = null; + // track time for decode/convert depending on the record type + long decodeStartTime = System.nanoTime(); + if (nextValidMessage instanceof ByteArrayBasedKafkaRecord) { record = decodeRecord((ByteArrayBasedKafkaRecord)nextValidMessage); } else if (nextValidMessage instanceof DecodeableKafkaRecord){ @@ -194,8 +220,10 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { + " or DecodeableKafkaRecord"); } + this.currentPartitionDecodeRecordTime += System.nanoTime() - decodeStartTime; this.currentPartitionRecordCount++; this.currentPartitionTotalSize += nextValidMessage.getValueSizeInBytes(); + this.currentPartitionReadRecordTime += System.nanoTime() - readStartTime; return record; } catch (Throwable t) { this.errorPartitions.add(this.currentPartitionIdx); @@ -208,6 +236,8 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { } } LOG.info("Finished pulling topic " + this.topicName); + + this.currentPartitionReadRecordTime += System.nanoTime() - readStartTime; return null; } @@ -235,10 +265,13 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { LOG.info("Pulling topic " + this.topicName); this.currentPartitionIdx = 0; } else { - computeAvgMillisPerRecordForCurrentPartition(); + updateStatisticsForCurrentPartition(); this.currentPartitionIdx++; this.currentPartitionRecordCount = 0; - this.currentPartitionTotalSize = 0; + this.currentPartitionFetchDuration = 0; + this.currentPartitionDecodeRecordTime = 0; + this.currentPartitionFetchMessageBufferTime = 0; + this.currentPartitionReadRecordTime = 0; } this.messageIterator = null; @@ -251,16 +284,25 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { this.stopwatch.start(); } - private void computeAvgMillisPerRecordForCurrentPartition() { + private void updateStatisticsForCurrentPartition() { this.stopwatch.stop(); + if (this.currentPartitionRecordCount != 0) { + this.currentPartitionFetchDuration = this.stopwatch.elapsed(TimeUnit.MILLISECONDS); double avgMillisForCurrentPartition = - (double) this.stopwatch.elapsed(TimeUnit.MILLISECONDS) / (double) this.currentPartitionRecordCount; + (double) this.currentPartitionFetchDuration / (double) this.currentPartitionRecordCount; this.avgMillisPerRecord.put(this.getCurrentPartition(), avgMillisForCurrentPartition); long avgRecordSize = this.currentPartitionTotalSize / this.currentPartitionRecordCount; this.avgRecordSizes.put(this.getCurrentPartition(), avgRecordSize); + + this.elapsedTime.put(this.getCurrentPartition(), this.currentPartitionFetchDuration); + this.processedRecordCount.put(this.getCurrentPartition(), this.currentPartitionRecordCount); + this.decodeRecordTime.put(this.getCurrentPartition(), this.currentPartitionDecodeRecordTime); + this.fetchMessageBufferTime.put(this.getCurrentPartition(), this.currentPartitionFetchMessageBufferTime); + this.readRecordTime.put(this.getCurrentPartition(), this.currentPartitionReadRecordTime); } + this.stopwatch.reset(); } @@ -317,7 +359,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { @Override public void close() throws IOException { - computeAvgMillisPerRecordForCurrentPartition(); + updateStatisticsForCurrentPartition(); Map<KafkaPartition, Map<String, String>> tagsForPartitionsMap = Maps.newHashMap(); @@ -336,7 +378,36 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { tagsForPartition.put(PARTITION, Integer.toString(partition.getId())); tagsForPartition.put(LOW_WATERMARK, Long.toString(this.lowWatermark.get(i))); tagsForPartition.put(ACTUAL_HIGH_WATERMARK, Long.toString(this.nextWatermark.get(i))); + // These are used to compute the load factor, + // gobblin consumption rate relative to the kafka production rate. + // The gobblin rate is computed as (processed record count/elapsed time) + // The kafka rate is computed as (expected high watermark - previous latest offset) / + // (current offset fetch epoch time - previous offset fetch epoch time). tagsForPartition.put(EXPECTED_HIGH_WATERMARK, Long.toString(this.highWatermark.get(i))); + tagsForPartition.put(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME, + this.workUnitState.getProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME, + i))); + tagsForPartition.put(KafkaSource.OFFSET_FETCH_EPOCH_TIME, + this.workUnitState.getProp(KafkaUtils.getPartitionPropName(KafkaSource.OFFSET_FETCH_EPOCH_TIME, i))); + tagsForPartition.put(KafkaSource.PREVIOUS_LATEST_OFFSET, + this.workUnitState.getProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_LATEST_OFFSET, i))); + + if (this.processedRecordCount.containsKey(partition)) { + tagsForPartition.put(PROCESSED_RECORD_COUNT, Long.toString(this.processedRecordCount.get(partition))); + tagsForPartition.put(ELAPSED_TIME, Long.toString(this.elapsedTime.get(partition))); + tagsForPartition.put(DECODE_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis( + this.decodeRecordTime.get(partition)))); + tagsForPartition.put(FETCH_MESSAGE_BUFFER_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis( + this.fetchMessageBufferTime.get(partition)))); + tagsForPartition.put(READ_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis( + this.readRecordTime.get(partition)))); + } else { + tagsForPartition.put(PROCESSED_RECORD_COUNT, "0"); + tagsForPartition.put(ELAPSED_TIME, "0"); + tagsForPartition.put(DECODE_RECORD_TIME, "0"); + tagsForPartition.put(FETCH_MESSAGE_BUFFER_TIME, "0"); + tagsForPartition.put(READ_RECORD_TIME, "0"); + } tagsForPartitionsMap.put(partition, tagsForPartition); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a3189d73/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java index 69ebea6..b96412c 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java @@ -104,6 +104,9 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { public static final String ALL_TOPICS = "all"; public static final String AVG_RECORD_SIZE = "avg.record.size"; public static final String AVG_RECORD_MILLIS = "avg.record.millis"; + public static final String PREVIOUS_LATEST_OFFSET = "previousLatestOffset"; + public static final String OFFSET_FETCH_EPOCH_TIME = "offsetFetchEpochTime"; + public static final String PREVIOUS_OFFSET_FETCH_EPOCH_TIME = "previousOffsetFetchEpochTime"; public static final String GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS = "gobblin.kafka.consumerClient.class"; public static final String GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION = "gobblin.kafka.extract.allowTableTypeAndNamspaceCustomization"; @@ -116,6 +119,8 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { private final Set<String> moveToLatestTopics = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); private final Map<KafkaPartition, Long> previousOffsets = Maps.newConcurrentMap(); + private final Map<KafkaPartition, Long> previousExpectedHighWatermarks = Maps.newConcurrentMap(); + private final Map<KafkaPartition, Long> previousOffsetFetchEpochTimes = Maps.newConcurrentMap(); private final Set<KafkaPartition> partitionsToBeProcessed = Sets.newConcurrentHashSet(); @@ -298,7 +303,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { Map<String, State> topicSpecificStateMap, SourceState state) { // in case the previous offset not been set - getAllPreviousOffsets(state); + getAllPreviousOffsetState(state); // For each partition that has a previous offset, create an empty WorkUnit for it if // it is not in this.partitionsToBeProcessed. @@ -309,6 +314,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { if (!this.isDatasetStateEnabled.get() || this.topicsToProcess.contains(topicName)) { long previousOffset = entry.getValue(); WorkUnit emptyWorkUnit = createEmptyWorkUnit(partition, previousOffset, + this.previousOffsetFetchEpochTimes.get(partition), Optional.fromNullable(topicSpecificStateMap.get(partition.getTopicName()))); if (workUnits.containsKey(topicName)) { @@ -368,6 +374,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { boolean failedToGetKafkaOffsets = false; try (Timer.Context context = this.metricContext.timer(OFFSET_FETCH_TIMER).time()) { + offsets.setOffsetFetchEpochTime(System.currentTimeMillis()); offsets.setEarliestOffset(this.kafkaConsumerClient.get().getEarliestOffset(partition)); offsets.setLatestOffset(this.kafkaConsumerClient.get().getLatestOffset(partition)); } catch (KafkaOffsetRetrievalFailureException e) { @@ -375,9 +382,13 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { } long previousOffset = 0; + long previousOffsetFetchEpochTime = 0; boolean previousOffsetNotFound = false; try { previousOffset = getPreviousOffsetForPartition(partition, state); + offsets.setPreviousLatestOffset(getPreviousExpectedHighWatermark(partition, state)); + previousOffsetFetchEpochTime = getPreviousOffsetFetchEpochTimeForPartition(partition, state); + offsets.setPreviousOffsetFetchEpochTime(previousOffsetFetchEpochTime); } catch (PreviousOffsetNotFoundException e) { previousOffsetNotFound = true; } @@ -392,7 +403,8 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { LOG.warn(String .format("Failed to retrieve earliest and/or latest offset for partition %s. This partition will be skipped.", partition)); - return previousOffsetNotFound ? null : createEmptyWorkUnit(partition, previousOffset, topicSpecificState); + return previousOffsetNotFound ? null : createEmptyWorkUnit(partition, previousOffset, previousOffsetFetchEpochTime, + topicSpecificState); } if (shouldMoveToLatestOffset(partition, state)) { @@ -444,7 +456,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { offsets.startAtEarliestOffset(); } else { LOG.warn(offsetOutOfRangeMsg + "This partition will be skipped."); - return createEmptyWorkUnit(partition, previousOffset, topicSpecificState); + return createEmptyWorkUnit(partition, previousOffset, previousOffsetFetchEpochTime, topicSpecificState); } } } @@ -452,10 +464,24 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { return getWorkUnitForTopicPartition(partition, offsets, topicSpecificState); } + private long getPreviousOffsetFetchEpochTimeForPartition(KafkaPartition partition, SourceState state) + throws PreviousOffsetNotFoundException { + + getAllPreviousOffsetState(state); + + if (this.previousOffsetFetchEpochTimes.containsKey(partition)) { + return this.previousOffsetFetchEpochTimes.get(partition); + } + + throw new PreviousOffsetNotFoundException(String + .format("Previous offset fetch epoch time for topic %s, partition %s not found.", partition.getTopicName(), + partition.getId())); + } + private long getPreviousOffsetForPartition(KafkaPartition partition, SourceState state) throws PreviousOffsetNotFoundException { - getAllPreviousOffsets(state); + getAllPreviousOffsetState(state); if (this.previousOffsets.containsKey(partition)) { return this.previousOffsets.get(partition); @@ -464,12 +490,28 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { .format("Previous offset for topic %s, partition %s not found.", partition.getTopicName(), partition.getId())); } - // need to be synchronized as this.previousOffsets need to be initialized once - private synchronized void getAllPreviousOffsets(SourceState state) { + private long getPreviousExpectedHighWatermark(KafkaPartition partition, SourceState state) + throws PreviousOffsetNotFoundException { + + getAllPreviousOffsetState(state); + + if (this.previousExpectedHighWatermarks.containsKey(partition)) { + return this.previousExpectedHighWatermarks.get(partition); + } + throw new PreviousOffsetNotFoundException(String + .format("Previous expected high watermark for topic %s, partition %s not found.", partition.getTopicName(), + partition.getId())); + } + + // need to be synchronized as this.previousOffsets, this.previousExpectedHighWatermarks, and + // this.previousOffsetFetchEpochTimes need to be initialized once + private synchronized void getAllPreviousOffsetState(SourceState state) { if (this.doneGettingAllPreviousOffsets) { return; } this.previousOffsets.clear(); + this.previousExpectedHighWatermarks.clear(); + this.previousOffsetFetchEpochTimes.clear(); Map<String, Iterable<WorkUnitState>> workUnitStatesByDatasetUrns = state.getPreviousWorkUnitStatesByDatasetUrns(); if (!workUnitStatesByDatasetUrns.isEmpty() && @@ -481,13 +523,26 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { for (WorkUnitState workUnitState : state.getPreviousWorkUnitStates()) { List<KafkaPartition> partitions = KafkaUtils.getPartitions(workUnitState); MultiLongWatermark watermark = workUnitState.getActualHighWatermark(MultiLongWatermark.class); + MultiLongWatermark previousExpectedHighWatermark = + workUnitState.getWorkunit().getExpectedHighWatermark(MultiLongWatermark.class); Preconditions.checkArgument(partitions.size() == watermark.size(), String .format("Num of partitions doesn't match number of watermarks: partitions=%s, watermarks=%s", partitions, watermark)); + for (int i = 0; i < partitions.size(); i++) { + KafkaPartition partition = partitions.get(i); + if (watermark.get(i) != ConfigurationKeys.DEFAULT_WATERMARK_VALUE) { - this.previousOffsets.put(partitions.get(i), watermark.get(i)); + this.previousOffsets.put(partition, watermark.get(i)); + } + + if (previousExpectedHighWatermark.get(i) != ConfigurationKeys.DEFAULT_WATERMARK_VALUE) { + this.previousExpectedHighWatermarks.put(partition, previousExpectedHighWatermark.get(i)); } + + this.previousOffsetFetchEpochTimes.put(partition, + Long.valueOf(workUnitState.getProp(KafkaUtils.getPartitionPropName(KafkaSource.OFFSET_FETCH_EPOCH_TIME, i), + "0"))); } } @@ -511,12 +566,13 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { } // thread safe - private WorkUnit createEmptyWorkUnit(KafkaPartition partition, long previousOffset, + private WorkUnit createEmptyWorkUnit(KafkaPartition partition, long previousOffset, long previousFetchEpochTime, Optional<State> topicSpecificState) { Offsets offsets = new Offsets(); offsets.setEarliestOffset(previousOffset); offsets.setLatestOffset(previousOffset); offsets.startAtEarliestOffset(); + offsets.setOffsetFetchEpochTime(previousFetchEpochTime); return getWorkUnitForTopicPartition(partition, offsets, topicSpecificState); } @@ -552,6 +608,9 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { workUnit.setProp(LEADER_HOSTANDPORT, partition.getLeader().getHostAndPort().toString()); workUnit.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, offsets.getStartOffset()); workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, offsets.getLatestOffset()); + workUnit.setProp(PREVIOUS_OFFSET_FETCH_EPOCH_TIME, offsets.getPreviousOffsetFetchEpochTime()); + workUnit.setProp(OFFSET_FETCH_EPOCH_TIME, offsets.getOffsetFetchEpochTime()); + workUnit.setProp(PREVIOUS_LATEST_OFFSET, offsets.getPreviousLatestOffset()); // Add lineage info DatasetDescriptor source = new DatasetDescriptor(DatasetConstants.PLATFORM_KAFKA, partition.getTopicName()); @@ -608,6 +667,18 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { @Setter private long latestOffset = 0; + @Getter + @Setter + private long offsetFetchEpochTime = 0; + + @Getter + @Setter + private long previousOffsetFetchEpochTime = 0; + + @Getter + @Setter + private long previousLatestOffset = 0; + private void startAt(long offset) throws StartOffsetOutOfRangeException { if (offset < this.earliestOffset || offset > this.latestOffset) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a3189d73/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java index 8d03f4f..0d93796 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java @@ -219,6 +219,24 @@ public abstract class KafkaWorkUnitPacker { workUnit.removeProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY); workUnit.removeProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY); workUnit.setWatermarkInterval(interval); + + // Update offset fetch epoch time and previous latest offset. These are used to compute the load factor, + // gobblin consumption rate relative to the kafka production rate. The kafka rate is computed as + // (current latest offset - previous latest offset)/(current epoch time - previous epoch time). + int index = 0; + for (WorkUnit wu : multiWorkUnit.getWorkUnits()) { + workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME, index), + wu.getProp(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME)); + workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.OFFSET_FETCH_EPOCH_TIME, index), + wu.getProp(KafkaSource.OFFSET_FETCH_EPOCH_TIME)); + workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_LATEST_OFFSET, index), + wu.getProp(KafkaSource.PREVIOUS_LATEST_OFFSET)); + index++; + } + workUnit.removeProp(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME); + workUnit.removeProp(KafkaSource.OFFSET_FETCH_EPOCH_TIME); + workUnit.removeProp(KafkaSource.PREVIOUS_LATEST_OFFSET); + // Remove the original partition information workUnit.removeProp(KafkaSource.PARTITION_ID); workUnit.removeProp(KafkaSource.LEADER_ID);
