This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 077a10ac41 [GOBBLIN-2114] Add fields to show computed delta between
consume and … (#4005)
077a10ac41 is described below
commit 077a10ac4182ad64865eb4985df15ee08de9a971
Author: Wei-Hsiang (Max) Lin <[email protected]>
AuthorDate: Thu Jul 18 14:53:15 2024 -0700
[GOBBLIN-2114] Add fields to show computed delta between consume and …
(#4005)
* [GOBBLIN-2114] Add fields to show computed delta between consume and
producer/append time respectively
---
.../extract/kafka/KafkaExtractorStatsTracker.java | 17 +++++++++++++++++
1 file changed, 17 insertions(+)
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
index eec0d09c46..d88f026bb7 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
@@ -79,6 +79,8 @@ public class KafkaExtractorStatsTracker {
private static final String FETCH_MESSAGE_BUFFER_TIME =
"fetchMessageBufferTime";
private static final String LAST_RECORD_HEADER_TIMESTAMP =
"lastRecordHeaderTimestamp";
private static final String OBSERVED_LATENCY_HISTOGRAM =
"observedLatencyHistogram";
+ private static final String CREATION_DURATION_TIME = "creationDurationTime";
+ private static final String APPEND_DURATION_TIME = "appendDurationTime";
@Getter
private final Map<KafkaPartition, ExtractorStats> statsMap;
@@ -183,6 +185,8 @@ public class KafkaExtractorStatsTracker {
private long minLogAppendTime = -1L;
private long maxLogAppendTime = -1L;
private long minRecordCreationTime = -1L;
+ private long creationDurationTime = -1L;
+ private long appendDurationTime = -1L;
}
/**
@@ -306,6 +310,17 @@ public class KafkaExtractorStatsTracker {
if (logAppendTimestamp > 0 && (System.currentTimeMillis() -
logAppendTimestamp > recordLevelSlaMillis)) {
v.slaMissedRecordCount++;
}
+ // This experiment tracks the SLA for record creation times after
migrating to Xinfra.
+ // It compares the time taken for appending records versus the time
taken for creating records.
+ // The goal is to identify any potential impact on the SLA.
+ // Here we capture the time taken from the record was appended to the
time consumed by the extractor.
+ if (logAppendTimestamp > 0) {
+ v.appendDurationTime = System.currentTimeMillis() -
logAppendTimestamp;
+ }
+ // Here we capture the time taken from the record was created or
produced to the time consumed by the extractor.
+ if (recordCreationTimestamp > 0) {
+ v.creationDurationTime = System.currentTimeMillis() -
recordCreationTimestamp;
+ }
}
return v;
});
@@ -456,6 +471,8 @@ public class KafkaExtractorStatsTracker {
tagsForPartition.put(UNDECODABLE_MESSAGE_COUNT,
Long.toString(stats.getDecodingErrorCount()));
tagsForPartition.put(NULL_RECORD_COUNT,
Long.toString(stats.getNullRecordCount()));
tagsForPartition.put(LAST_RECORD_HEADER_TIMESTAMP,
Long.toString(stats.getLastSuccessfulRecordHeaderTimestamp()));
+ tagsForPartition.put(APPEND_DURATION_TIME,
Long.toString(stats.getAppendDurationTime()));
+ tagsForPartition.put(CREATION_DURATION_TIME,
Long.toString(stats.getCreationDurationTime()));
// Commit avg time to pull a record for each partition
double avgMillis = stats.getAvgMillisPerRecord();