This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 077a10ac41 [GOBBLIN-2114] Add fields to show computed delta between 
consume and … (#4005)
077a10ac41 is described below

commit 077a10ac4182ad64865eb4985df15ee08de9a971
Author: Wei-Hsiang (Max) Lin <[email protected]>
AuthorDate: Thu Jul 18 14:53:15 2024 -0700

    [GOBBLIN-2114] Add fields to show computed delta between consume and … 
(#4005)
    
    * [GOBBLIN-2114] Add fields to show computed delta between consume and 
producer/append time respectively
---
 .../extract/kafka/KafkaExtractorStatsTracker.java       | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)

diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
index eec0d09c46..d88f026bb7 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
@@ -79,6 +79,8 @@ public class KafkaExtractorStatsTracker {
   private static final String FETCH_MESSAGE_BUFFER_TIME = 
"fetchMessageBufferTime";
   private static final String LAST_RECORD_HEADER_TIMESTAMP = 
"lastRecordHeaderTimestamp";
   private static final String OBSERVED_LATENCY_HISTOGRAM = 
"observedLatencyHistogram";
+  private static final String CREATION_DURATION_TIME = "creationDurationTime";
+  private static final String APPEND_DURATION_TIME = "appendDurationTime";
 
   @Getter
   private final Map<KafkaPartition, ExtractorStats> statsMap;
@@ -183,6 +185,8 @@ public class KafkaExtractorStatsTracker {
     private long minLogAppendTime = -1L;
     private long maxLogAppendTime = -1L;
     private long minRecordCreationTime = -1L;
+    private long creationDurationTime = -1L;
+    private long appendDurationTime = -1L;
   }
 
   /**
@@ -306,6 +310,17 @@ public class KafkaExtractorStatsTracker {
         if (logAppendTimestamp > 0 && (System.currentTimeMillis() - 
logAppendTimestamp > recordLevelSlaMillis)) {
           v.slaMissedRecordCount++;
         }
+        // This experiment tracks the SLA for record creation times after 
migrating to Xinfra.
+        // It compares the time taken for appending records versus the time 
taken for creating records.
+        // The goal is to identify any potential impact on the SLA. 
+        // Here we capture the time taken from the record was appended to the 
time consumed by the extractor.
+        if (logAppendTimestamp > 0) {
+          v.appendDurationTime = System.currentTimeMillis() - 
logAppendTimestamp;        
+        }
+        // Here we capture the time taken from the record was created or 
produced to the time consumed by the extractor.
+        if (recordCreationTimestamp > 0) {
+          v.creationDurationTime = System.currentTimeMillis() - 
recordCreationTimestamp;        
+        }
       }
       return v;
     });
@@ -456,6 +471,8 @@ public class KafkaExtractorStatsTracker {
     tagsForPartition.put(UNDECODABLE_MESSAGE_COUNT, 
Long.toString(stats.getDecodingErrorCount()));
     tagsForPartition.put(NULL_RECORD_COUNT, 
Long.toString(stats.getNullRecordCount()));
     tagsForPartition.put(LAST_RECORD_HEADER_TIMESTAMP, 
Long.toString(stats.getLastSuccessfulRecordHeaderTimestamp()));
+    tagsForPartition.put(APPEND_DURATION_TIME, 
Long.toString(stats.getAppendDurationTime()));
+    tagsForPartition.put(CREATION_DURATION_TIME, 
Long.toString(stats.getCreationDurationTime()));
 
     // Commit avg time to pull a record for each partition
     double avgMillis = stats.getAvgMillisPerRecord();

Reply via email to