This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 85b0a1e57 [GOBBLIN-1830] Improving Container Transition Tracking in
Streaming Data Ingestion (#3693)
85b0a1e57 is described below
commit 85b0a1e57402377f2baa35b9f7d0ca9a36d44a1a
Author: Zihan Li <[email protected]>
AuthorDate: Fri May 19 14:35:15 2023 -0700
[GOBBLIN-1830] Improving Container Transition Tracking in Streaming Data
Ingestion (#3693)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1830]Improving Container Transition Tracking in Streaming Data
Ingestion
* emmit event with a different name
* remove unnecessary log
---------
Co-authored-by: Zihan Li <[email protected]>
---
.../extract/kafka/KafkaExtractorStatsTracker.java | 17 +++++++++++++++++
.../extract/kafka/KafkaStreamingExtractor.java | 8 ++++++++
2 files changed, 25 insertions(+)
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
index b1bf19788..8ee8e5e8c 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
@@ -40,6 +40,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
import org.apache.gobblin.util.TaskEventMetadataUtils;
@@ -56,6 +57,7 @@ public class KafkaExtractorStatsTracker {
private static final String EMPTY_STRING = "";
private static final String GOBBLIN_KAFKA_NAMESPACE = "gobblin.kafka";
+ private static final String KAFKA_EXTRACTOR_CONTAINER_TRANSITION_EVENT_NAME
= "KafkaExtractorContainerTransitionEvent";
private static final String KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME =
"KafkaExtractorTopicMetadata";
private static final String LOW_WATERMARK = "lowWatermark";
private static final String ACTUAL_HIGH_WATERMARK = "actualHighWatermark";
@@ -497,6 +499,21 @@ public class KafkaExtractorStatsTracker {
}
}
+ /**
+ * Emit Tracking events reporting the topic partition information this
extractor handled to be consumed by a monitoring application.
+ * @param context the current {@link MetricContext}
+ */
+ public void submitEventToIndicateContainerTransition(MetricContext context) {
+ for (int i = 0; i < this.partitions.size(); i++) {
+ KafkaPartition partitionKey = this.partitions.get(i);
+ GobblinEventBuilder gobblinEventBuilder = new
GobblinEventBuilder(KAFKA_EXTRACTOR_CONTAINER_TRANSITION_EVENT_NAME,
GOBBLIN_KAFKA_NAMESPACE);
+ gobblinEventBuilder.addMetadata(TOPIC, partitionKey.getTopicName());
+ gobblinEventBuilder.addMetadata(PARTITION,
Integer.toString(partitionKey.getId()));
+
gobblinEventBuilder.addAdditionalMetadata(this.taskEventMetadataGenerator.getMetadata(workUnitState,
KAFKA_EXTRACTOR_CONTAINER_TRANSITION_EVENT_NAME));
+ EventSubmitter.submit(context, gobblinEventBuilder);
+ }
+ }
+
/**
* A helper function to merge tags for KafkaPartition. Separate into a
package-private method for ease of testing.
*/
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
index 87658e09c..3fa4d4c28 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
@@ -246,6 +246,8 @@ public class KafkaStreamingExtractor<S> extends
FlushingExtractor<S, DecodeableK
state.getPropAsLong(KAFKA_EXTRACTOR_STATS_REPORTING_INTERVAL_MINUTES_KEY,
DEFAULT_KAFKA_EXTRACTOR_STATS_REPORTING_INTERVAL_MINUTES) * 60 *
1000;
resetExtractorStatsAndWatermarks(true);
+ //Even though we haven't start ingesting yet, emit event to indicate the
container transition.
+ submitEventToIndicateContainerTransition();
//Schedule a thread for reporting Kafka consumer metrics
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
@@ -271,6 +273,12 @@ public class KafkaStreamingExtractor<S> extends
FlushingExtractor<S, DecodeableK
this.workUnitState.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_UNIT,
TimeUnit.MILLISECONDS.name()));
}
+ private void submitEventToIndicateContainerTransition() {
+ if (this.isInstrumentationEnabled()) {
+
this.statsTracker.submitEventToIndicateContainerTransition(getMetricContext());
+ }
+ }
+
private Map<KafkaPartition, LongWatermark>
getTopicPartitionWatermarks(List<KafkaPartition> topicPartitions) {
List<String> topicPartitionStrings =
topicPartitions.stream().map(topicPartition ->
topicPartition.toString()).collect(Collectors.toList());