This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 689a3f4 [GOBBLIN-989] Track and report record level SLA in Gobblin
Kafka Extra…
689a3f4 is described below
commit 689a3f4374e579aa5dfa0c9515ccf3627833696a
Author: sv2000 <[email protected]>
AuthorDate: Mon Dec 2 17:04:24 2019 -0800
[GOBBLIN-989] Track and report record level SLA in Gobblin Kafka Extra…
Closes #2835 from sv2000/kafkaGetTimestamp
---
.../gobblin/kafka/client/KafkaConsumerRecord.java | 12 ++++++++++++
.../extractor/extract/kafka/KafkaExtractor.java | 3 ++-
.../extract/kafka/KafkaExtractorStatsTracker.java | 20 +++++++++++++++++++-
.../source/extractor/extract/kafka/KafkaSource.java | 16 +++++++++++++++-
.../kafka/KafkaExtractorStatsTrackerTest.java | 21 +++++++++++++++------
5 files changed, 63 insertions(+), 9 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java
index 5308a18..3279508 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java
@@ -38,4 +38,16 @@ public interface KafkaConsumerRecord {
* does not provide size (like Kafka 09 clients)
*/
public long getValueSizeInBytes();
+
+ /**
+ * @return the timestamp of the underlying ConsumerRecord.
+ */
+ public default long getTimestamp() { return 0; }
+
+ /**
+ * @return true if the timestamp in the ConsumerRecord is the timestamp when
the record is written to Kafka.
+ */
+ public default boolean isTimestampLogAppend() {
+ return false;
+ }
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index 11e17bf..d65dfb2 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -176,7 +176,8 @@ public abstract class KafkaExtractor<S, D> extends
EventBasedExtractor<S, D> {
D record = decodeKafkaMessage(nextValidMessage);
- this.statsTracker.onDecodeableRecord(this.currentPartitionIdx,
readStartTime, decodeStartTime, nextValidMessage.getValueSizeInBytes());
+ this.statsTracker.onDecodeableRecord(this.currentPartitionIdx,
readStartTime, decodeStartTime,
+ nextValidMessage.getValueSizeInBytes(),
nextValidMessage.isTimestampLogAppend() ? nextValidMessage.getTimestamp() : 0L);
this.currentPartitionLastSuccessfulRecord = record;
return record;
} catch (Throwable t) {
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
index db61731..28caf84 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
@@ -50,6 +50,7 @@ public class KafkaExtractorStatsTracker {
private static final String EXPECTED_HIGH_WATERMARK =
"expectedHighWatermark";
private static final String ELAPSED_TIME = "elapsedTime";
private static final String PROCESSED_RECORD_COUNT = "processedRecordCount";
+ private static final String SLA_MISSED_RECORD_COUNT = "slaMissedRecordCount";
private static final String UNDECODABLE_MESSAGE_COUNT =
"undecodableMessageCount";
private static final String PARTITION_TOTAL_SIZE = "partitionTotalSize";
private static final String AVG_RECORD_PULL_TIME = "avgRecordPullTime";
@@ -63,6 +64,8 @@ public class KafkaExtractorStatsTracker {
private final Map<KafkaPartition, ExtractorStats> statsMap;
private final Set<Integer> errorPartitions;
private final WorkUnitState workUnitState;
+ private boolean isSlaConfigured;
+ private long recordLevelSlaMillis;
//A global count of number of undecodeable messages encountered by the
KafkaExtractor across all Kafka
//TopicPartitions.
@@ -76,6 +79,10 @@ public class KafkaExtractorStatsTracker {
this.statsMap = Maps.newHashMapWithExpectedSize(this.partitions.size());
this.partitions.forEach(partition -> this.statsMap.put(partition, new
ExtractorStats()));
this.errorPartitions = Sets.newHashSet();
+ if (this.workUnitState.contains(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY))
{
+ this.isSlaConfigured = true;
+ this.recordLevelSlaMillis =
TimeUnit.MINUTES.toMillis(this.workUnitState.getPropAsLong(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY));
+ }
}
public int getErrorPartitionCount() {
@@ -92,6 +99,7 @@ public class KafkaExtractorStatsTracker {
private long avgRecordSize;
private long elapsedTime;
private long processedRecordCount;
+ private long slaMissedRecordCount = -1L;
private long partitionTotalSize;
private long decodeRecordTime;
private long fetchMessageBufferTime;
@@ -143,14 +151,23 @@ public class KafkaExtractorStatsTracker {
* @param readStartTime the start time when readRecord() is invoked.
* @param decodeStartTime the time instant immediately before a record
decoding begins.
* @param recordSizeInBytes the size of the decoded record in bytes.
+ * @param logAppendTimestamp the log append time of the {@link
org.apache.gobblin.kafka.client.KafkaConsumerRecord}.
*/
- public void onDecodeableRecord(int partitionIdx, long readStartTime, long
decodeStartTime, long recordSizeInBytes) {
+ public void onDecodeableRecord(int partitionIdx, long readStartTime, long
decodeStartTime, long recordSizeInBytes, long logAppendTimestamp) {
this.statsMap.computeIfPresent(this.partitions.get(partitionIdx), (k, v)
-> {
long currentTime = System.nanoTime();
v.processedRecordCount++;
v.partitionTotalSize += recordSizeInBytes;
v.decodeRecordTime += currentTime - decodeStartTime;
v.readRecordTime += currentTime - readStartTime;
+ if (this.isSlaConfigured) {
+ if (v.slaMissedRecordCount < 0) {
+ v.slaMissedRecordCount = 0;
+ }
+ if (logAppendTimestamp > 0 && (System.currentTimeMillis() -
logAppendTimestamp > recordLevelSlaMillis)) {
+ v.slaMissedRecordCount++;
+ }
+ }
return v;
});
}
@@ -249,6 +266,7 @@ public class KafkaExtractorStatsTracker {
this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.STOP_FETCH_EPOCH_TIME,
partitionId),
Long.toString(stats.getStopFetchEpochTime()));
tagsForPartition.put(PROCESSED_RECORD_COUNT,
Long.toString(stats.getProcessedRecordCount()));
+ tagsForPartition.put(SLA_MISSED_RECORD_COUNT,
Long.toString(stats.getSlaMissedRecordCount()));
tagsForPartition.put(PARTITION_TOTAL_SIZE,
Long.toString(stats.getPartitionTotalSize()));
tagsForPartition.put(AVG_RECORD_SIZE,
Long.toString(stats.getAvgRecordSize()));
tagsForPartition.put(ELAPSED_TIME, Long.toString(stats.getElapsedTime()));
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index ac10acd..611e55e 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -124,6 +124,7 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
"gobblin.kafka.shouldEnableDatasetStateStore";
public static final boolean
DEFAULT_GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE = false;
public static final String OFFSET_FETCH_TIMER = "offsetFetchTimer";
+ public static final String RECORD_LEVEL_SLA_MINUTES_KEY =
"gobblin.kafka.recordLevelSlaMinutes";
private final Set<String> moveToLatestTopics =
Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
private final Map<KafkaPartition, Long> previousOffsets =
Maps.newConcurrentMap();
@@ -534,8 +535,21 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
}
}
}
+ WorkUnit workUnit = getWorkUnitForTopicPartition(partition, offsets,
topicSpecificState);
+ addSourceStatePropsToWorkUnit(workUnit, state);
+ return workUnit;
+ }
- return getWorkUnitForTopicPartition(partition, offsets,
topicSpecificState);
+ /**
+ * A method to copy specific properties from the {@link SourceState} object
to {@link WorkUnitState}.
+ * @param workUnit WorkUnit state
+ * @param state Source state
+ */
+ private void addSourceStatePropsToWorkUnit(WorkUnit workUnit, SourceState
state) {
+ //Copy the SLA config from SourceState to WorkUnitState.
+ if (state.contains(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY)) {
+ workUnit.setProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY,
state.getProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY));
+ }
}
private long getPreviousStartFetchEpochTimeForPartition(KafkaPartition
partition, SourceState state) {
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
index 6dbb387..2022ba7 100644
---
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
@@ -34,7 +34,9 @@ public class KafkaExtractorStatsTrackerTest {
public void setUp() {
kafkaPartitions.add(new
KafkaPartition.Builder().withTopicName("test-topic").withId(0).build());
kafkaPartitions.add(new
KafkaPartition.Builder().withTopicName("test-topic").withId(1).build());
- this.extractorStatsTracker = new KafkaExtractorStatsTracker(new
WorkUnitState(), kafkaPartitions);
+ WorkUnitState workUnitState = new WorkUnitState();
+ workUnitState.setProp("gobblin.kafka.recordLevelSlaMinutes", 10L);
+ this.extractorStatsTracker = new KafkaExtractorStatsTracker(workUnitState,
kafkaPartitions);
}
@Test
@@ -69,29 +71,33 @@ public class KafkaExtractorStatsTrackerTest {
long readStartTime = System.nanoTime();
Thread.sleep(1);
long decodeStartTime = System.nanoTime();
-
+ long logAppendTimestamp = System.currentTimeMillis() - 15 * 60 * 1000L;
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getProcessedRecordCount(),
0);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getPartitionTotalSize(),
0);
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime()
== 0);
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getReadRecordTime()
== 0);
+
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getSlaMissedRecordCount(),
-1);
- this.extractorStatsTracker.onDecodeableRecord(0, readStartTime,
decodeStartTime, 100);
+ this.extractorStatsTracker.onDecodeableRecord(0, readStartTime,
decodeStartTime, 100, logAppendTimestamp);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getProcessedRecordCount(),
1);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getPartitionTotalSize(),
100);
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime()
> 0);
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getReadRecordTime()
> 0);
+
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getSlaMissedRecordCount(),
1);
readStartTime = System.nanoTime();
Thread.sleep(1);
decodeStartTime = System.nanoTime();
+ logAppendTimestamp = System.currentTimeMillis() - 10;
long previousDecodeRecordTime =
this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime();
long previousReadRecordTime =
this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getReadRecordTime();
- this.extractorStatsTracker.onDecodeableRecord(0, readStartTime,
decodeStartTime, 100);
+ this.extractorStatsTracker.onDecodeableRecord(0, readStartTime,
decodeStartTime, 100, logAppendTimestamp);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getProcessedRecordCount(),
2);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getPartitionTotalSize(),
200);
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime()
> previousDecodeRecordTime);
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getReadRecordTime()
> previousReadRecordTime);
+
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getSlaMissedRecordCount(),
1);
}
@Test
@@ -126,13 +132,15 @@ public class KafkaExtractorStatsTrackerTest {
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getAvgRecordSize(),
100);
readStartTime = System.nanoTime();
+ long logAppendTimestamp = System.currentTimeMillis() - 10;
Thread.sleep(1);
long decodeStartTime = System.nanoTime();
- this.extractorStatsTracker.onDecodeableRecord(1, readStartTime,
decodeStartTime, 100);
+ this.extractorStatsTracker.onDecodeableRecord(1, readStartTime,
decodeStartTime, 100, logAppendTimestamp);
this.extractorStatsTracker.updateStatisticsForCurrentPartition(1,
readStartTime, 0);
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getElapsedTime()
> 0);
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getAvgMillisPerRecord()
> 0);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getAvgRecordSize(),
100);
+
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getSlaMissedRecordCount(),
0);
}
@Test (dependsOnMethods = "testUpdateStatisticsForCurrentPartition")
@@ -143,7 +151,8 @@ public class KafkaExtractorStatsTrackerTest {
Assert.assertEquals(this.extractorStatsTracker.getAvgRecordSize(0), 0);
long readStartTime = System.nanoTime();
long decodeStartTime = readStartTime + 1;
- this.extractorStatsTracker.onDecodeableRecord(1, readStartTime,
decodeStartTime, 150);
+ long logAppendTimeStamp = System.currentTimeMillis() - 10;
+ this.extractorStatsTracker.onDecodeableRecord(1, readStartTime,
decodeStartTime, 150, logAppendTimeStamp);
Assert.assertEquals(this.extractorStatsTracker.getAvgRecordSize(1), 150);
}
}
\ No newline at end of file