This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 689a3f4  [GOBBLIN-989] Track and report record level SLA in Gobblin 
Kafka Extra…
689a3f4 is described below

commit 689a3f4374e579aa5dfa0c9515ccf3627833696a
Author: sv2000 <[email protected]>
AuthorDate: Mon Dec 2 17:04:24 2019 -0800

    [GOBBLIN-989] Track and report record level SLA in Gobblin Kafka Extra…
    
    Closes #2835 from sv2000/kafkaGetTimestamp
---
 .../gobblin/kafka/client/KafkaConsumerRecord.java   | 12 ++++++++++++
 .../extractor/extract/kafka/KafkaExtractor.java     |  3 ++-
 .../extract/kafka/KafkaExtractorStatsTracker.java   | 20 +++++++++++++++++++-
 .../source/extractor/extract/kafka/KafkaSource.java | 16 +++++++++++++++-
 .../kafka/KafkaExtractorStatsTrackerTest.java       | 21 +++++++++++++++------
 5 files changed, 63 insertions(+), 9 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java
index 5308a18..3279508 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java
@@ -38,4 +38,16 @@ public interface KafkaConsumerRecord {
    * does not provide size (like Kafka 09 clients)
    */
   public long getValueSizeInBytes();
+
+  /**
+   * @return the timestamp of the underlying ConsumerRecord.
+   */
+  public default long getTimestamp() { return 0; }
+
+  /**
+   * @return true if the timestamp in the ConsumerRecord is the timestamp when 
the record is written to Kafka.
+   */
+  public default boolean isTimestampLogAppend() {
+    return false;
+  }
 }
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index 11e17bf..d65dfb2 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -176,7 +176,8 @@ public abstract class KafkaExtractor<S, D> extends 
EventBasedExtractor<S, D> {
 
           D record = decodeKafkaMessage(nextValidMessage);
 
-          this.statsTracker.onDecodeableRecord(this.currentPartitionIdx, 
readStartTime, decodeStartTime, nextValidMessage.getValueSizeInBytes());
+          this.statsTracker.onDecodeableRecord(this.currentPartitionIdx, 
readStartTime, decodeStartTime,
+              nextValidMessage.getValueSizeInBytes(), 
nextValidMessage.isTimestampLogAppend() ? nextValidMessage.getTimestamp() : 0L);
           this.currentPartitionLastSuccessfulRecord = record;
           return record;
         } catch (Throwable t) {
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
index db61731..28caf84 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
@@ -50,6 +50,7 @@ public class KafkaExtractorStatsTracker {
   private static final String EXPECTED_HIGH_WATERMARK = 
"expectedHighWatermark";
   private static final String ELAPSED_TIME = "elapsedTime";
   private static final String PROCESSED_RECORD_COUNT = "processedRecordCount";
+  private static final String SLA_MISSED_RECORD_COUNT = "slaMissedRecordCount";
   private static final String UNDECODABLE_MESSAGE_COUNT = 
"undecodableMessageCount";
   private static final String PARTITION_TOTAL_SIZE = "partitionTotalSize";
   private static final String AVG_RECORD_PULL_TIME = "avgRecordPullTime";
@@ -63,6 +64,8 @@ public class KafkaExtractorStatsTracker {
   private final Map<KafkaPartition, ExtractorStats> statsMap;
   private final Set<Integer> errorPartitions;
   private final WorkUnitState workUnitState;
+  private boolean isSlaConfigured;
+  private long recordLevelSlaMillis;
 
   //A global count of number of undecodeable messages encountered by the 
KafkaExtractor across all Kafka
   //TopicPartitions.
@@ -76,6 +79,10 @@ public class KafkaExtractorStatsTracker {
     this.statsMap = Maps.newHashMapWithExpectedSize(this.partitions.size());
     this.partitions.forEach(partition -> this.statsMap.put(partition, new 
ExtractorStats()));
     this.errorPartitions = Sets.newHashSet();
+    if (this.workUnitState.contains(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY)) 
{
+      this.isSlaConfigured = true;
+      this.recordLevelSlaMillis = 
TimeUnit.MINUTES.toMillis(this.workUnitState.getPropAsLong(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY));
+    }
   }
 
   public int getErrorPartitionCount() {
@@ -92,6 +99,7 @@ public class KafkaExtractorStatsTracker {
     private long avgRecordSize;
     private long elapsedTime;
     private long processedRecordCount;
+    private long slaMissedRecordCount = -1L;
     private long partitionTotalSize;
     private long decodeRecordTime;
     private long fetchMessageBufferTime;
@@ -143,14 +151,23 @@ public class KafkaExtractorStatsTracker {
    * @param readStartTime the start time when readRecord() is invoked.
    * @param decodeStartTime the time instant immediately before a record 
decoding begins.
    * @param recordSizeInBytes the size of the decoded record in bytes.
+   * @param logAppendTimestamp the log append time of the {@link 
org.apache.gobblin.kafka.client.KafkaConsumerRecord}.
    */
-  public void onDecodeableRecord(int partitionIdx, long readStartTime, long 
decodeStartTime, long recordSizeInBytes) {
+  public void onDecodeableRecord(int partitionIdx, long readStartTime, long 
decodeStartTime, long recordSizeInBytes, long logAppendTimestamp) {
     this.statsMap.computeIfPresent(this.partitions.get(partitionIdx), (k, v) 
-> {
       long currentTime = System.nanoTime();
       v.processedRecordCount++;
       v.partitionTotalSize += recordSizeInBytes;
       v.decodeRecordTime += currentTime - decodeStartTime;
       v.readRecordTime += currentTime - readStartTime;
+      if (this.isSlaConfigured) {
+        if (v.slaMissedRecordCount < 0) {
+          v.slaMissedRecordCount = 0;
+        }
+        if (logAppendTimestamp > 0 && (System.currentTimeMillis() - 
logAppendTimestamp > recordLevelSlaMillis)) {
+          v.slaMissedRecordCount++;
+        }
+      }
       return v;
     });
   }
@@ -249,6 +266,7 @@ public class KafkaExtractorStatsTracker {
     
this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.STOP_FETCH_EPOCH_TIME,
 partitionId),
         Long.toString(stats.getStopFetchEpochTime()));
     tagsForPartition.put(PROCESSED_RECORD_COUNT, 
Long.toString(stats.getProcessedRecordCount()));
+    tagsForPartition.put(SLA_MISSED_RECORD_COUNT, 
Long.toString(stats.getSlaMissedRecordCount()));
     tagsForPartition.put(PARTITION_TOTAL_SIZE, 
Long.toString(stats.getPartitionTotalSize()));
     tagsForPartition.put(AVG_RECORD_SIZE, 
Long.toString(stats.getAvgRecordSize()));
     tagsForPartition.put(ELAPSED_TIME, Long.toString(stats.getElapsedTime()));
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index ac10acd..611e55e 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -124,6 +124,7 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
       "gobblin.kafka.shouldEnableDatasetStateStore";
   public static final boolean 
DEFAULT_GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE = false;
   public static final String OFFSET_FETCH_TIMER = "offsetFetchTimer";
+  public static final String RECORD_LEVEL_SLA_MINUTES_KEY = 
"gobblin.kafka.recordLevelSlaMinutes";
 
   private final Set<String> moveToLatestTopics = 
Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
   private final Map<KafkaPartition, Long> previousOffsets = 
Maps.newConcurrentMap();
@@ -534,8 +535,21 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
         }
       }
     }
+    WorkUnit workUnit = getWorkUnitForTopicPartition(partition, offsets, 
topicSpecificState);
+    addSourceStatePropsToWorkUnit(workUnit, state);
+    return workUnit;
+  }
 
-    return getWorkUnitForTopicPartition(partition, offsets, 
topicSpecificState);
+  /**
+   * A method to copy specific properties from the {@link SourceState} object 
to {@link WorkUnitState}.
+   * @param workUnit WorkUnit state
+   * @param state Source state
+   */
+  private void addSourceStatePropsToWorkUnit(WorkUnit workUnit, SourceState 
state) {
+    //Copy the SLA config from SourceState to WorkUnitState.
+    if (state.contains(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY)) {
+      workUnit.setProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY, 
state.getProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY));
+    }
   }
 
   private long getPreviousStartFetchEpochTimeForPartition(KafkaPartition 
partition, SourceState state) {
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
index 6dbb387..2022ba7 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
@@ -34,7 +34,9 @@ public class KafkaExtractorStatsTrackerTest {
   public void setUp() {
     kafkaPartitions.add(new 
KafkaPartition.Builder().withTopicName("test-topic").withId(0).build());
     kafkaPartitions.add(new 
KafkaPartition.Builder().withTopicName("test-topic").withId(1).build());
-    this.extractorStatsTracker = new KafkaExtractorStatsTracker(new 
WorkUnitState(), kafkaPartitions);
+    WorkUnitState workUnitState = new WorkUnitState();
+    workUnitState.setProp("gobblin.kafka.recordLevelSlaMinutes", 10L);
+    this.extractorStatsTracker = new KafkaExtractorStatsTracker(workUnitState, 
kafkaPartitions);
   }
 
   @Test
@@ -69,29 +71,33 @@ public class KafkaExtractorStatsTrackerTest {
     long readStartTime = System.nanoTime();
     Thread.sleep(1);
     long decodeStartTime = System.nanoTime();
-
+    long logAppendTimestamp = System.currentTimeMillis() - 15 * 60 * 1000L;
     
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getProcessedRecordCount(),
 0);
     
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getPartitionTotalSize(),
 0);
     
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime()
 == 0);
     
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getReadRecordTime()
 == 0);
+    
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getSlaMissedRecordCount(),
 -1);
 
-    this.extractorStatsTracker.onDecodeableRecord(0, readStartTime, 
decodeStartTime, 100);
+    this.extractorStatsTracker.onDecodeableRecord(0, readStartTime, 
decodeStartTime, 100, logAppendTimestamp);
     
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getProcessedRecordCount(),
 1);
     
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getPartitionTotalSize(),
 100);
     
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime()
 > 0);
     
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getReadRecordTime()
 > 0);
+    
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getSlaMissedRecordCount(),
 1);
 
     readStartTime = System.nanoTime();
     Thread.sleep(1);
     decodeStartTime = System.nanoTime();
+    logAppendTimestamp = System.currentTimeMillis() - 10;
     long previousDecodeRecordTime = 
this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime();
     long previousReadRecordTime = 
this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getReadRecordTime();
 
-    this.extractorStatsTracker.onDecodeableRecord(0, readStartTime, 
decodeStartTime, 100);
+    this.extractorStatsTracker.onDecodeableRecord(0, readStartTime, 
decodeStartTime, 100, logAppendTimestamp);
     
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getProcessedRecordCount(),
 2);
     
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getPartitionTotalSize(),
 200);
     
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime()
 > previousDecodeRecordTime);
     
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getReadRecordTime()
 > previousReadRecordTime);
+    
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getSlaMissedRecordCount(),
 1);
   }
 
   @Test
@@ -126,13 +132,15 @@ public class KafkaExtractorStatsTrackerTest {
     
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getAvgRecordSize(),
 100);
 
     readStartTime = System.nanoTime();
+    long logAppendTimestamp = System.currentTimeMillis() - 10;
     Thread.sleep(1);
     long decodeStartTime = System.nanoTime();
-    this.extractorStatsTracker.onDecodeableRecord(1, readStartTime, 
decodeStartTime, 100);
+    this.extractorStatsTracker.onDecodeableRecord(1, readStartTime, 
decodeStartTime, 100, logAppendTimestamp);
     this.extractorStatsTracker.updateStatisticsForCurrentPartition(1, 
readStartTime, 0);
     
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getElapsedTime()
 > 0);
     
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getAvgMillisPerRecord()
 > 0);
     
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getAvgRecordSize(),
 100);
+    
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getSlaMissedRecordCount(),
 0);
   }
 
   @Test (dependsOnMethods = "testUpdateStatisticsForCurrentPartition")
@@ -143,7 +151,8 @@ public class KafkaExtractorStatsTrackerTest {
     Assert.assertEquals(this.extractorStatsTracker.getAvgRecordSize(0), 0);
     long readStartTime = System.nanoTime();
     long decodeStartTime = readStartTime + 1;
-    this.extractorStatsTracker.onDecodeableRecord(1, readStartTime, 
decodeStartTime, 150);
+    long logAppendTimeStamp = System.currentTimeMillis() - 10;
+    this.extractorStatsTracker.onDecodeableRecord(1, readStartTime, 
decodeStartTime, 150, logAppendTimeStamp);
     Assert.assertEquals(this.extractorStatsTracker.getAvgRecordSize(1), 150);
   }
 }
\ No newline at end of file

Reply via email to