Repository: incubator-gobblin Updated Branches: refs/heads/master ef59a1517 -> fcc4d412a
[GOBBLIN-589] Add more statistics to KafkaExtractor tracking event - Added emitting start/stop fetch epoch time statistics as well as partition total size - Added lagging and emitting of fetch epoch times and watermark statistics from previous run - Made changes to allow subclasses of KafkaExtractor to emit statistics based on the lastSuccessfulRecord Closes #2455 from cshen98/metrics1 Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/fcc4d412 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/fcc4d412 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/fcc4d412 Branch: refs/heads/master Commit: fcc4d412afa40a83ec1fccb25d76ea146ff1164b Parents: ef59a15 Author: Carl Shen <[email protected]> Authored: Mon Sep 17 11:03:43 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Mon Sep 17 11:03:43 2018 -0700 ---------------------------------------------------------------------- .../extractor/extract/kafka/KafkaExtractor.java | 173 +++++++++++-------- .../extractor/extract/kafka/KafkaSource.java | 82 ++++++++- .../extractor/extract/kafka/KafkaUtils.java | 14 ++ .../workunit/packer/KafkaWorkUnitPacker.java | 12 ++ 4 files changed, 209 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fcc4d412/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java index 0ec3caf..664446f 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java @@ -28,7 +28,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -71,6 +70,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { public static final String EXPECTED_HIGH_WATERMARK = "expectedHighWatermark"; public static final String ELAPSED_TIME = "elapsedTime"; public static final String PROCESSED_RECORD_COUNT = "processedRecordCount"; + public static final String PARTITION_TOTAL_SIZE = "partitionTotalSize"; public static final String AVG_RECORD_PULL_TIME = "avgRecordPullTime"; public static final String READ_RECORD_TIME = "readRecordTime"; public static final String DECODE_RECORD_TIME = "decodeRecordTime"; @@ -87,16 +87,17 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { protected final GobblinKafkaConsumerClient kafkaConsumerClient; private final ClassAliasResolver<GobblinKafkaConsumerClientFactory> kafkaConsumerClientResolver; - protected final Stopwatch stopwatch; - protected final Map<KafkaPartition, Integer> decodingErrorCount; private final Map<KafkaPartition, Double> avgMillisPerRecord; private final Map<KafkaPartition, Long> avgRecordSizes; private final Map<KafkaPartition, Long> elapsedTime; private final Map<KafkaPartition, Long> processedRecordCount; + private final Map<KafkaPartition, Long> partitionTotalSize; private final Map<KafkaPartition, Long> decodeRecordTime; private final Map<KafkaPartition, Long> fetchMessageBufferTime; private final Map<KafkaPartition, Long> readRecordTime; + private final Map<KafkaPartition, Long> startFetchEpochTime; + private final Map<KafkaPartition, Long> stopFetchEpochTime; private final Set<Integer> errorPartitions; private int undecodableMessageCount = 0; @@ -105,10 +106,10 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { private int currentPartitionIdx = INITIAL_PARTITION_IDX; private long currentPartitionRecordCount = 0; private long currentPartitionTotalSize = 0; - private long currentPartitionFetchDuration = 0; private long currentPartitionDecodeRecordTime = 0; private long currentPartitionFetchMessageBufferTime = 0; private long currentPartitionReadRecordTime = 0; + protected D currentPartitionLastSuccessfulRecord = null; public KafkaExtractor(WorkUnitState state) { super(state); @@ -130,16 +131,17 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { throw new RuntimeException(e); } - this.stopwatch = Stopwatch.createUnstarted(); - this.decodingErrorCount = Maps.newHashMap(); this.avgMillisPerRecord = Maps.newHashMapWithExpectedSize(this.partitions.size()); this.avgRecordSizes = Maps.newHashMapWithExpectedSize(this.partitions.size()); this.elapsedTime = Maps.newHashMapWithExpectedSize(this.partitions.size()); this.processedRecordCount = Maps.newHashMapWithExpectedSize(this.partitions.size()); + this.partitionTotalSize = Maps.newHashMapWithExpectedSize(this.partitions.size()); this.decodeRecordTime = Maps.newHashMapWithExpectedSize(this.partitions.size()); this.fetchMessageBufferTime = Maps.newHashMapWithExpectedSize(this.partitions.size()); this.readRecordTime = Maps.newHashMapWithExpectedSize(this.partitions.size()); + this.startFetchEpochTime = Maps.newHashMapWithExpectedSize(this.partitions.size()); + this.stopFetchEpochTime= Maps.newHashMapWithExpectedSize(this.partitions.size()); this.errorPartitions = Sets.newHashSet(); @@ -224,6 +226,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { this.currentPartitionRecordCount++; this.currentPartitionTotalSize += nextValidMessage.getValueSizeInBytes(); this.currentPartitionReadRecordTime += System.nanoTime() - readStartTime; + this.currentPartitionLastSuccessfulRecord = record; return record; } catch (Throwable t) { this.errorPartitions.add(this.currentPartitionIdx); @@ -268,10 +271,11 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { updateStatisticsForCurrentPartition(); this.currentPartitionIdx++; this.currentPartitionRecordCount = 0; - this.currentPartitionFetchDuration = 0; + this.currentPartitionTotalSize = 0; this.currentPartitionDecodeRecordTime = 0; this.currentPartitionFetchMessageBufferTime = 0; this.currentPartitionReadRecordTime = 0; + this.currentPartitionLastSuccessfulRecord = null; } this.messageIterator = null; @@ -281,29 +285,36 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { this.highWatermark.get(this.currentPartitionIdx) - this.nextWatermark.get(this.currentPartitionIdx))); switchMetricContextToCurrentPartition(); } - this.stopwatch.start(); + + if (!allPartitionsFinished()) { + this.startFetchEpochTime.put(this.getCurrentPartition(), System.currentTimeMillis()); + } } - private void updateStatisticsForCurrentPartition() { - this.stopwatch.stop(); + protected void updateStatisticsForCurrentPartition() { + long stopFetchEpochTime = System.currentTimeMillis(); + + if (!allPartitionsFinished()) { + this.stopFetchEpochTime.put(this.getCurrentPartition(), stopFetchEpochTime); + } if (this.currentPartitionRecordCount != 0) { - this.currentPartitionFetchDuration = this.stopwatch.elapsed(TimeUnit.MILLISECONDS); + long currentPartitionFetchDuration = + stopFetchEpochTime - this.startFetchEpochTime.get(this.getCurrentPartition()); double avgMillisForCurrentPartition = - (double) this.currentPartitionFetchDuration / (double) this.currentPartitionRecordCount; + (double) currentPartitionFetchDuration / (double) this.currentPartitionRecordCount; this.avgMillisPerRecord.put(this.getCurrentPartition(), avgMillisForCurrentPartition); long avgRecordSize = this.currentPartitionTotalSize / this.currentPartitionRecordCount; this.avgRecordSizes.put(this.getCurrentPartition(), avgRecordSize); - this.elapsedTime.put(this.getCurrentPartition(), this.currentPartitionFetchDuration); + this.elapsedTime.put(this.getCurrentPartition(), currentPartitionFetchDuration); this.processedRecordCount.put(this.getCurrentPartition(), this.currentPartitionRecordCount); + this.partitionTotalSize.put(this.getCurrentPartition(), this.currentPartitionTotalSize); this.decodeRecordTime.put(this.getCurrentPartition(), this.currentPartitionDecodeRecordTime); this.fetchMessageBufferTime.put(this.getCurrentPartition(), this.currentPartitionFetchMessageBufferTime); this.readRecordTime.put(this.getCurrentPartition(), this.currentPartitionReadRecordTime); } - - this.stopwatch.reset(); } private void switchMetricContextToCurrentPartition() { @@ -367,65 +378,13 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { this.workUnitState.setProp(ConfigurationKeys.ERROR_PARTITION_COUNT, this.errorPartitions.size()); this.workUnitState.setProp(ConfigurationKeys.ERROR_MESSAGE_UNDECODABLE_COUNT, this.undecodableMessageCount); - // Commit actual high watermark for each partition for (int i = 0; i < this.partitions.size(); i++) { LOG.info(String.format("Actual high watermark for partition %s=%d, expected=%d", this.partitions.get(i), this.nextWatermark.get(i), this.highWatermark.get(i))); - - Map<String, String> tagsForPartition = Maps.newHashMap(); - KafkaPartition partition = this.partitions.get(i); - tagsForPartition.put(TOPIC, partition.getTopicName()); - tagsForPartition.put(PARTITION, Integer.toString(partition.getId())); - tagsForPartition.put(LOW_WATERMARK, Long.toString(this.lowWatermark.get(i))); - tagsForPartition.put(ACTUAL_HIGH_WATERMARK, Long.toString(this.nextWatermark.get(i))); - // These are used to compute the load factor, - // gobblin consumption rate relative to the kafka production rate. - // The gobblin rate is computed as (processed record count/elapsed time) - // The kafka rate is computed as (expected high watermark - previous latest offset) / - // (current offset fetch epoch time - previous offset fetch epoch time). - tagsForPartition.put(EXPECTED_HIGH_WATERMARK, Long.toString(this.highWatermark.get(i))); - tagsForPartition.put(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME, - this.workUnitState.getProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME, - i))); - tagsForPartition.put(KafkaSource.OFFSET_FETCH_EPOCH_TIME, - this.workUnitState.getProp(KafkaUtils.getPartitionPropName(KafkaSource.OFFSET_FETCH_EPOCH_TIME, i))); - tagsForPartition.put(KafkaSource.PREVIOUS_LATEST_OFFSET, - this.workUnitState.getProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_LATEST_OFFSET, i))); - - if (this.processedRecordCount.containsKey(partition)) { - tagsForPartition.put(PROCESSED_RECORD_COUNT, Long.toString(this.processedRecordCount.get(partition))); - tagsForPartition.put(ELAPSED_TIME, Long.toString(this.elapsedTime.get(partition))); - tagsForPartition.put(DECODE_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis( - this.decodeRecordTime.get(partition)))); - tagsForPartition.put(FETCH_MESSAGE_BUFFER_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis( - this.fetchMessageBufferTime.get(partition)))); - tagsForPartition.put(READ_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis( - this.readRecordTime.get(partition)))); - } else { - tagsForPartition.put(PROCESSED_RECORD_COUNT, "0"); - tagsForPartition.put(ELAPSED_TIME, "0"); - tagsForPartition.put(DECODE_RECORD_TIME, "0"); - tagsForPartition.put(FETCH_MESSAGE_BUFFER_TIME, "0"); - tagsForPartition.put(READ_RECORD_TIME, "0"); - } - - tagsForPartitionsMap.put(partition, tagsForPartition); + tagsForPartitionsMap.put(this.partitions.get(i), createTagsForPartition(i)); } this.workUnitState.setActualHighWatermark(this.nextWatermark); - // Commit avg time to pull a record for each partition - for (KafkaPartition partition : this.partitions) { - if (this.avgMillisPerRecord.containsKey(partition)) { - double avgMillis = this.avgMillisPerRecord.get(partition); - LOG.info(String.format("Avg time to pull a record for partition %s = %f milliseconds", partition, avgMillis)); - KafkaUtils.setPartitionAvgRecordMillis(this.workUnitState, partition, avgMillis); - tagsForPartitionsMap.get(partition).put(AVG_RECORD_PULL_TIME, Double.toString(avgMillis)); - } else { - LOG.info(String.format("Avg time to pull a record for partition %s not recorded", partition)); - tagsForPartitionsMap.get(partition).put(AVG_RECORD_PULL_TIME, Double.toString(-1)); - } - } - if (isInstrumentationEnabled()) { for (Map.Entry<KafkaPartition, Map<String, String>> eventTags : tagsForPartitionsMap.entrySet()) { new EventSubmitter.Builder(getMetricContext(), GOBBLIN_KAFKA_NAMESPACE).build() @@ -436,6 +395,84 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { this.closer.close(); } + protected Map<String, String> createTagsForPartition(int partitionId) { + Map<String, String> tagsForPartition = Maps.newHashMap(); + KafkaPartition partition = this.partitions.get(partitionId); + + tagsForPartition.put(TOPIC, partition.getTopicName()); + tagsForPartition.put(PARTITION, Integer.toString(partition.getId())); + tagsForPartition.put(LOW_WATERMARK, Long.toString(this.lowWatermark.get(partitionId))); + tagsForPartition.put(ACTUAL_HIGH_WATERMARK, Long.toString(this.nextWatermark.get(partitionId))); + + // These are used to compute the load factor, + // gobblin consumption rate relative to the kafka production rate. + // The gobblin rate is computed as (processed record count/elapsed time) + // The kafka rate is computed as (expected high watermark - previous latest offset) / + // (current offset fetch epoch time - previous offset fetch epoch time). + tagsForPartition.put(EXPECTED_HIGH_WATERMARK, Long.toString(this.highWatermark.get(partitionId))); + tagsForPartition.put(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME, + Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState, + KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME, partitionId))); + tagsForPartition.put(KafkaSource.OFFSET_FETCH_EPOCH_TIME, + Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState, + KafkaSource.OFFSET_FETCH_EPOCH_TIME, partitionId))); + tagsForPartition.put(KafkaSource.PREVIOUS_LATEST_OFFSET, + Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState, + KafkaSource.PREVIOUS_LATEST_OFFSET, partitionId))); + + tagsForPartition.put(KafkaSource.PREVIOUS_LOW_WATERMARK, + Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState, + KafkaSource.PREVIOUS_LOW_WATERMARK, partitionId))); + tagsForPartition.put(KafkaSource.PREVIOUS_HIGH_WATERMARK, + Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState, + KafkaSource.PREVIOUS_HIGH_WATERMARK, partitionId))); + tagsForPartition.put(KafkaSource.PREVIOUS_START_FETCH_EPOCH_TIME, + Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState, + KafkaSource.PREVIOUS_START_FETCH_EPOCH_TIME, partitionId))); + tagsForPartition.put(KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME, + Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState, + KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME, partitionId))); + + tagsForPartition.put(KafkaSource.START_FETCH_EPOCH_TIME, Long.toString(this.startFetchEpochTime.get(partition))); + tagsForPartition.put(KafkaSource.STOP_FETCH_EPOCH_TIME, Long.toString(this.stopFetchEpochTime.get(partition))); + this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.START_FETCH_EPOCH_TIME, partitionId), + Long.toString(this.startFetchEpochTime.get(partition))); + this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.STOP_FETCH_EPOCH_TIME, partitionId), + Long.toString(this.stopFetchEpochTime.get(partition))); + + if (this.processedRecordCount.containsKey(partition)) { + tagsForPartition.put(PROCESSED_RECORD_COUNT, Long.toString(this.processedRecordCount.get(partition))); + tagsForPartition.put(PARTITION_TOTAL_SIZE, Long.toString(this.partitionTotalSize.get(partition))); + tagsForPartition.put(ELAPSED_TIME, Long.toString(this.elapsedTime.get(partition))); + tagsForPartition.put(DECODE_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis( + this.decodeRecordTime.get(partition)))); + tagsForPartition.put(FETCH_MESSAGE_BUFFER_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis( + this.fetchMessageBufferTime.get(partition)))); + tagsForPartition.put(READ_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis( + this.readRecordTime.get(partition)))); + } else { + tagsForPartition.put(PROCESSED_RECORD_COUNT, "0"); + tagsForPartition.put(PARTITION_TOTAL_SIZE, "0"); + tagsForPartition.put(ELAPSED_TIME, "0"); + tagsForPartition.put(DECODE_RECORD_TIME, "0"); + tagsForPartition.put(FETCH_MESSAGE_BUFFER_TIME, "0"); + tagsForPartition.put(READ_RECORD_TIME, "0"); + } + + // Commit avg time to pull a record for each partition + if (this.avgMillisPerRecord.containsKey(partition)) { + double avgMillis = this.avgMillisPerRecord.get(partition); + LOG.info(String.format("Avg time to pull a record for partition %s = %f milliseconds", partition, avgMillis)); + KafkaUtils.setPartitionAvgRecordMillis(this.workUnitState, partition, avgMillis); + tagsForPartition.put(AVG_RECORD_PULL_TIME, Double.toString(avgMillis)); + } else { + LOG.info(String.format("Avg time to pull a record for partition %s not recorded", partition)); + tagsForPartition.put(AVG_RECORD_PULL_TIME, Double.toString(-1)); + } + + return tagsForPartition; + } + @Deprecated @Override public long getHighWatermark() { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fcc4d412/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java index b96412c..e7d7da0 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java @@ -104,6 +104,12 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { public static final String ALL_TOPICS = "all"; public static final String AVG_RECORD_SIZE = "avg.record.size"; public static final String AVG_RECORD_MILLIS = "avg.record.millis"; + public static final String START_FETCH_EPOCH_TIME = "startFetchEpochTime"; + public static final String STOP_FETCH_EPOCH_TIME = "stopFetchEpochTime"; + public static final String PREVIOUS_START_FETCH_EPOCH_TIME = "previousStartFetchEpochTime"; + public static final String PREVIOUS_STOP_FETCH_EPOCH_TIME = "previousStopFetchEpochTime"; + public static final String PREVIOUS_LOW_WATERMARK = "previousLowWatermark"; + public static final String PREVIOUS_HIGH_WATERMARK = "previousHighWatermark"; public static final String PREVIOUS_LATEST_OFFSET = "previousLatestOffset"; public static final String OFFSET_FETCH_EPOCH_TIME = "offsetFetchEpochTime"; public static final String PREVIOUS_OFFSET_FETCH_EPOCH_TIME = "previousOffsetFetchEpochTime"; @@ -119,8 +125,11 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { private final Set<String> moveToLatestTopics = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); private final Map<KafkaPartition, Long> previousOffsets = Maps.newConcurrentMap(); + private final Map<KafkaPartition, Long> previousLowWatermarks = Maps.newConcurrentMap(); private final Map<KafkaPartition, Long> previousExpectedHighWatermarks = Maps.newConcurrentMap(); private final Map<KafkaPartition, Long> previousOffsetFetchEpochTimes = Maps.newConcurrentMap(); + private final Map<KafkaPartition, Long> previousStartFetchEpochTimes = Maps.newConcurrentMap(); + private final Map<KafkaPartition, Long> previousStopFetchEpochTimes = Maps.newConcurrentMap(); private final Set<KafkaPartition> partitionsToBeProcessed = Sets.newConcurrentHashSet(); @@ -386,6 +395,10 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { boolean previousOffsetNotFound = false; try { previousOffset = getPreviousOffsetForPartition(partition, state); + offsets.setPreviousEndOffset(previousOffset); + offsets.setPreviousStartOffset(getPreviousLowWatermark(partition, state)); + offsets.setPreviousStartFetchEpochTime(getPreviousStartFetchEpochTimeForPartition(partition, state)); + offsets.setPreviousStopFetchEpochTime(getPreviousStopFetchEpochTimeForPartition(partition, state)); offsets.setPreviousLatestOffset(getPreviousExpectedHighWatermark(partition, state)); previousOffsetFetchEpochTime = getPreviousOffsetFetchEpochTimeForPartition(partition, state); offsets.setPreviousOffsetFetchEpochTime(previousOffsetFetchEpochTime); @@ -464,6 +477,18 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { return getWorkUnitForTopicPartition(partition, offsets, topicSpecificState); } + private long getPreviousStartFetchEpochTimeForPartition(KafkaPartition partition, SourceState state) { + getAllPreviousOffsetState(state); + return this.previousStartFetchEpochTimes.containsKey(partition) ? + this.previousStartFetchEpochTimes.get(partition) : 0; + } + + private long getPreviousStopFetchEpochTimeForPartition(KafkaPartition partition, SourceState state) { + getAllPreviousOffsetState(state); + return this.previousStopFetchEpochTimes.containsKey(partition) ? + this.previousStopFetchEpochTimes.get(partition) : 0; + } + private long getPreviousOffsetFetchEpochTimeForPartition(KafkaPartition partition, SourceState state) throws PreviousOffsetNotFoundException { @@ -503,6 +528,19 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { partition.getId())); } + private long getPreviousLowWatermark(KafkaPartition partition, SourceState state) + throws PreviousOffsetNotFoundException { + + getAllPreviousOffsetState(state); + + if (this.previousLowWatermarks.containsKey(partition)) { + return this.previousLowWatermarks.get(partition); + } + throw new PreviousOffsetNotFoundException(String + .format("Previous low watermark for topic %s, partition %s not found.", partition.getTopicName(), + partition.getId())); + } + // need to be synchronized as this.previousOffsets, this.previousExpectedHighWatermarks, and // this.previousOffsetFetchEpochTimes need to be initialized once private synchronized void getAllPreviousOffsetState(SourceState state) { @@ -510,8 +548,11 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { return; } this.previousOffsets.clear(); + this.previousLowWatermarks.clear(); this.previousExpectedHighWatermarks.clear(); this.previousOffsetFetchEpochTimes.clear(); + this.previousStartFetchEpochTimes.clear(); + this.previousStopFetchEpochTimes.clear(); Map<String, Iterable<WorkUnitState>> workUnitStatesByDatasetUrns = state.getPreviousWorkUnitStatesByDatasetUrns(); if (!workUnitStatesByDatasetUrns.isEmpty() && @@ -522,9 +563,11 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { for (WorkUnitState workUnitState : state.getPreviousWorkUnitStates()) { List<KafkaPartition> partitions = KafkaUtils.getPartitions(workUnitState); + WorkUnit workUnit = workUnitState.getWorkunit(); + MultiLongWatermark watermark = workUnitState.getActualHighWatermark(MultiLongWatermark.class); - MultiLongWatermark previousExpectedHighWatermark = - workUnitState.getWorkunit().getExpectedHighWatermark(MultiLongWatermark.class); + MultiLongWatermark previousLowWatermark = workUnit.getLowWatermark(MultiLongWatermark.class); + MultiLongWatermark previousExpectedHighWatermark = workUnit.getExpectedHighWatermark(MultiLongWatermark.class); Preconditions.checkArgument(partitions.size() == watermark.size(), String .format("Num of partitions doesn't match number of watermarks: partitions=%s, watermarks=%s", partitions, watermark)); @@ -536,13 +579,22 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { this.previousOffsets.put(partition, watermark.get(i)); } + if (previousLowWatermark.get(i) != ConfigurationKeys.DEFAULT_WATERMARK_VALUE) { + this.previousLowWatermarks.put(partition, previousLowWatermark.get(i)); + } + if (previousExpectedHighWatermark.get(i) != ConfigurationKeys.DEFAULT_WATERMARK_VALUE) { this.previousExpectedHighWatermarks.put(partition, previousExpectedHighWatermark.get(i)); } this.previousOffsetFetchEpochTimes.put(partition, - Long.valueOf(workUnitState.getProp(KafkaUtils.getPartitionPropName(KafkaSource.OFFSET_FETCH_EPOCH_TIME, i), - "0"))); + KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(workUnitState, OFFSET_FETCH_EPOCH_TIME, i)); + + this.previousStartFetchEpochTimes.put(partition, + KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(workUnitState, START_FETCH_EPOCH_TIME, i)); + + this.previousStopFetchEpochTimes.put(partition, + KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(workUnitState, STOP_FETCH_EPOCH_TIME, i)); } } @@ -608,6 +660,10 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { workUnit.setProp(LEADER_HOSTANDPORT, partition.getLeader().getHostAndPort().toString()); workUnit.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, offsets.getStartOffset()); workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, offsets.getLatestOffset()); + workUnit.setProp(PREVIOUS_START_FETCH_EPOCH_TIME, offsets.getPreviousStartFetchEpochTime()); + workUnit.setProp(PREVIOUS_STOP_FETCH_EPOCH_TIME, offsets.getPreviousStopFetchEpochTime()); + workUnit.setProp(PREVIOUS_LOW_WATERMARK, offsets.getPreviousStartOffset()); + workUnit.setProp(PREVIOUS_HIGH_WATERMARK, offsets.getPreviousEndOffset()); workUnit.setProp(PREVIOUS_OFFSET_FETCH_EPOCH_TIME, offsets.getPreviousOffsetFetchEpochTime()); workUnit.setProp(OFFSET_FETCH_EPOCH_TIME, offsets.getOffsetFetchEpochTime()); workUnit.setProp(PREVIOUS_LATEST_OFFSET, offsets.getPreviousLatestOffset()); @@ -679,6 +735,24 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { @Setter private long previousLatestOffset = 0; + // previous low watermark + @Getter + @Setter + private long previousStartOffset = 0; + + // previous actual high watermark + @Getter + @Setter + private long previousEndOffset = 0; + + @Getter + @Setter + private long previousStartFetchEpochTime = 0; + + @Getter + @Setter + private long previousStopFetchEpochTime = 0; + private void startAt(long offset) throws StartOffsetOutOfRangeException { if (offset < this.earliestOffset || offset > this.latestOffset) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fcc4d412/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java index 55ecab4..5472134 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java @@ -19,6 +19,7 @@ package org.apache.gobblin.source.extractor.extract.kafka; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; +import org.apache.gobblin.configuration.WorkUnitState; import java.util.List; @@ -168,4 +169,17 @@ public class KafkaUtils { getPartitionPropName(partition.getTopicName(), partition.getId()) + "." + KafkaSource.AVG_RECORD_MILLIS, millis); } + + /** + * Get a property as long from a work unit that may or may not be a multiworkunit. + * This method is needed because the SingleLevelWorkUnitPacker does not squeeze work units + * into a multiworkunit, and thus does not append the partitionId to property keys, while + * the BiLevelWorkUnitPacker does. + * Return 0 as default if key not found in either form. + */ + public static long getPropAsLongFromSingleOrMultiWorkUnitState(WorkUnitState workUnitState, + String key, int partitionId) { + return Long.parseLong(workUnitState.contains(key) ? workUnitState.getProp(key) + : workUnitState.getProp(KafkaUtils.getPartitionPropName(key, partitionId), "0")); + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fcc4d412/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java index 0d93796..fef3219 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java @@ -225,6 +225,14 @@ public abstract class KafkaWorkUnitPacker { // (current latest offset - previous latest offset)/(current epoch time - previous epoch time). int index = 0; for (WorkUnit wu : multiWorkUnit.getWorkUnits()) { + workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_START_FETCH_EPOCH_TIME, index), + wu.getProp(KafkaSource.PREVIOUS_START_FETCH_EPOCH_TIME)); + workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME, index), + wu.getProp(KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME)); + workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_LOW_WATERMARK, index), + wu.getProp(KafkaSource.PREVIOUS_LOW_WATERMARK)); + workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_HIGH_WATERMARK, index), + wu.getProp(KafkaSource.PREVIOUS_HIGH_WATERMARK)); workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME, index), wu.getProp(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME)); workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.OFFSET_FETCH_EPOCH_TIME, index), @@ -233,6 +241,10 @@ public abstract class KafkaWorkUnitPacker { wu.getProp(KafkaSource.PREVIOUS_LATEST_OFFSET)); index++; } + workUnit.removeProp(KafkaSource.PREVIOUS_START_FETCH_EPOCH_TIME); + workUnit.removeProp(KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME); + workUnit.removeProp(KafkaSource.PREVIOUS_LOW_WATERMARK); + workUnit.removeProp(KafkaSource.PREVIOUS_HIGH_WATERMARK); workUnit.removeProp(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME); workUnit.removeProp(KafkaSource.OFFSET_FETCH_EPOCH_TIME); workUnit.removeProp(KafkaSource.PREVIOUS_LATEST_OFFSET);
