This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e6fa4901 [GOBBLIN-1882] Add brokerName in metric emitted from 
KafkaStreamingExtractor  (#3745)
4e6fa4901 is described below

commit 4e6fa49015c9b1222b531de8386415c8b36038e6
Author: Zihan Li <[email protected]>
AuthorDate: Mon Aug 21 10:37:13 2023 -0700

    [GOBBLIN-1882] Add brokerName in metric emitted from 
KafkaStreamingExtractor  (#3745)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1882] Add brokerName in metric emitted from 
KafkaStreamingExtractor
    
    * add unit tests
    
    * remove unintentional change
    
    * address comment
    
    ---------
    
    Co-authored-by: Zihan Li <[email protected]>
---
 .../src/main/java/org/apache/gobblin/KafkaCommonUtil.java    | 12 +++++++-----
 .../source/extractor/extract/kafka/KafkaExtractor.java       |  9 +++++----
 .../extractor/extract/kafka/KafkaExtractorStatsTracker.java  |  5 +++++
 .../test/java/org/apache/gobblin/KafkaCommonUtilTest.java    |  2 +-
 .../extract/kafka/KafkaExtractorStatsTrackerTest.java        |  2 ++
 .../source/extractor/extract/kafka/KafkaExtractorTest.java   |  4 ++--
 .../extractor/extract/kafka/KafkaProduceRateTrackerTest.java |  3 +++
 7 files changed, 25 insertions(+), 12 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/KafkaCommonUtil.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/KafkaCommonUtil.java
index e85aed1c5..7634e35fc 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/KafkaCommonUtil.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/KafkaCommonUtil.java
@@ -27,14 +27,14 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.configuration.State;
 
 import static 
org.apache.gobblin.configuration.ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY;
 
-
+@Slf4j
 public class KafkaCommonUtil {
   public static final long KAFKA_FLUSH_TIMEOUT_SECONDS = 15L;
   public static final String MAP_KEY_VALUE_DELIMITER_KEY = "->";
@@ -72,10 +72,12 @@ public class KafkaCommonUtil {
   }
 
   public static Map<String, String> getKafkaBrokerToSimpleNameMap(State state) 
{
-    
Preconditions.checkArgument(state.contains(KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY),
-        String.format("Configuration must contain value for %s", 
KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY));
-    String mapStr = state.getProp(KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY);
     Map<String, String> kafkaBrokerUriToSimpleName = new HashMap<>();
+    if (!state.contains(KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY)) {
+        log.warn("Configuration does not contain value for {}", 
KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY);
+        return kafkaBrokerUriToSimpleName;
+    }
+    String mapStr = state.getProp(KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY);
     for (String entry : LIST_SPLITTER.splitToList(mapStr)) {
       String[] items = entry.trim().split(MAP_KEY_VALUE_DELIMITER_KEY);
       kafkaBrokerUriToSimpleName.put(items[0], items[1]);
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index 9a29411c2..e95e98842 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -354,10 +354,11 @@ public abstract class KafkaExtractor<S, D> extends 
EventBasedExtractor<S, D> {
     String brokerUri = kafkaBrokerUriList.get(0);
     Map<String, String> brokerToSimpleName = 
KafkaCommonUtil.getKafkaBrokerToSimpleNameMap(state);
 
-    Preconditions.checkArgument(brokerToSimpleName.get(brokerUri) != null,
-        String.format("Unable to find simple name for the kafka cluster broker 
uri in the config. Please check the map "
-            + "value of %s. brokerUri=%s, configMapValue=%s", 
KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY, brokerUri, brokerToSimpleName));
-
+    if (!brokerToSimpleName.containsKey(brokerUri)) {
+      LOG.warn("Unable to find simple name for the kafka cluster broker uri in 
the config. Please check the map value of {}. brokerUri={}, configMapValue=%{}",
+          KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY, brokerUri, brokerToSimpleName);
+      return "";
+    }
     return brokerToSimpleName.get(brokerUri);
   }
 }
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
index 8ee8e5e8c..9b4e3ae1a 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
@@ -53,6 +53,7 @@ import org.apache.gobblin.util.TaskEventMetadataUtils;
 public class KafkaExtractorStatsTracker {
   // Constants for event submission
   public static final String TOPIC = "topic";
+  public static final String BROKER_NAME = "brokerName";
   public static final String PARTITION = "partition";
 
   private static final String EMPTY_STRING = "";
@@ -87,6 +88,7 @@ public class KafkaExtractorStatsTracker {
   private final Histogram observedLatencyHistogram;
   private boolean isSlaConfigured;
   private long recordLevelSlaMillis;
+  private String brokerName;
   //Minimum partition index processed by this task. Statistics that are 
aggregated across all partitions (e.g. observed latency histogram)
   // processed by the task are reported against this partition index.
   private int minPartitionIdx = Integer.MAX_VALUE;
@@ -110,6 +112,7 @@ public class KafkaExtractorStatsTracker {
 
   public KafkaExtractorStatsTracker(WorkUnitState state, List<KafkaPartition> 
partitions) {
     this.workUnitState = state;
+    this.brokerName = KafkaExtractor.getKafkaBrokerSimpleName(workUnitState);
     this.partitions = partitions;
     this.statsMap = Maps.newHashMapWithExpectedSize(this.partitions.size());
     this.partitions.forEach(partition -> {
@@ -495,6 +498,7 @@ public class KafkaExtractorStatsTracker {
     for (Map.Entry<KafkaPartition, Map<String, String>> eventTags : 
tagsForPartitionsMap.entrySet()) {
       EventSubmitter.Builder eventSubmitterBuilder = new 
EventSubmitter.Builder(context, GOBBLIN_KAFKA_NAMESPACE);
       
eventSubmitterBuilder.addMetadata(this.taskEventMetadataGenerator.getMetadata(workUnitState,
 KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME));
+      eventSubmitterBuilder.addMetadata(BROKER_NAME, this.brokerName);
       
eventSubmitterBuilder.build().submit(KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME, 
eventTags.getValue());
     }
   }
@@ -508,6 +512,7 @@ public class KafkaExtractorStatsTracker {
       KafkaPartition partitionKey = this.partitions.get(i);
       GobblinEventBuilder gobblinEventBuilder = new 
GobblinEventBuilder(KAFKA_EXTRACTOR_CONTAINER_TRANSITION_EVENT_NAME, 
GOBBLIN_KAFKA_NAMESPACE);
       gobblinEventBuilder.addMetadata(TOPIC, partitionKey.getTopicName());
+      gobblinEventBuilder.addMetadata(BROKER_NAME, this.brokerName);
       gobblinEventBuilder.addMetadata(PARTITION, 
Integer.toString(partitionKey.getId()));
       
gobblinEventBuilder.addAdditionalMetadata(this.taskEventMetadataGenerator.getMetadata(workUnitState,
 KAFKA_EXTRACTOR_CONTAINER_TRANSITION_EVENT_NAME));
       EventSubmitter.submit(context, gobblinEventBuilder);
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/KafkaCommonUtilTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/KafkaCommonUtilTest.java
index 273e41481..c40692cd1 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/KafkaCommonUtilTest.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/KafkaCommonUtilTest.java
@@ -34,7 +34,7 @@ public class KafkaCommonUtilTest {
     String simpleName = "some-identifier";
 
     State state = new State();
-    Assert.assertThrows(IllegalArgumentException.class, () -> 
KafkaCommonUtil.getKafkaBrokerToSimpleNameMap(state));
+    
Assert.assertEquals(KafkaCommonUtil.getKafkaBrokerToSimpleNameMap(state).size(),0);
 
     state.setProp(ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY, 
String.format("%s->%s", brokerUri, simpleName));
     Assert.assertEquals(KafkaCommonUtil.getKafkaBrokerToSimpleNameMap(state),
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
index bef77d892..035fd5b6d 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.HdrHistogram.Histogram;
 import org.HdrHistogram.HistogramLogReader;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -50,6 +51,7 @@ public class KafkaExtractorStatsTrackerTest {
     this.workUnitState = new WorkUnitState();
     this.workUnitState.setProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY, 10L);
     
this.workUnitState.setProp(KafkaSource.OBSERVED_LATENCY_MEASUREMENT_ENABLED, 
true);
+    this.workUnitState.setProp(ConfigurationKeys.KAFKA_BROKERS, "testBroker");
     this.extractorStatsTracker = new KafkaExtractorStatsTracker(workUnitState, 
kafkaPartitions);
   }
 
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorTest.java
index 7f1b27e58..782e9c661 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorTest.java
@@ -40,10 +40,10 @@ public class KafkaExtractorTest {
     final String kafkaBrokerUri = "kafka.broker.uri.com:12345";
     final String kafkaBrokerSimpleName = "simple.kafka.name";
     state.setProp(ConfigurationKeys.KAFKA_BROKERS, kafkaBrokerUri);
-    Assert.assertThrows(IllegalArgumentException.class, () -> 
KafkaExtractor.getKafkaBrokerSimpleName(state));
+    Assert.assertEquals("", KafkaExtractor.getKafkaBrokerSimpleName(state));
 
     state.setProp(ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY, 
String.format("foobar->foobarId", kafkaBrokerUri, kafkaBrokerSimpleName));
-    Assert.assertThrows(IllegalArgumentException.class, () -> 
KafkaExtractor.getKafkaBrokerSimpleName(state));
+    Assert.assertEquals("", KafkaExtractor.getKafkaBrokerSimpleName(state));
 
     state.setProp(ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY, 
String.format("%s->%s,foobar->foobarId", kafkaBrokerUri, 
kafkaBrokerSimpleName));
     Assert.assertEquals(KafkaExtractor.getKafkaBrokerSimpleName(state), 
kafkaBrokerSimpleName);
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTrackerTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTrackerTest.java
index bf8d37f02..097c26210 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTrackerTest.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTrackerTest.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.joda.time.LocalDate;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
@@ -58,6 +59,7 @@ public class KafkaProduceRateTrackerTest {
     kafkaPartitions.add(new 
KafkaPartition.Builder().withTopicName("test-topic").withId(1).build());
     this.workUnitState = new WorkUnitState();
     this.workUnitState.setProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY, 5L);
+    this.workUnitState.setProp(ConfigurationKeys.KAFKA_BROKERS, "testBroker");
     this.watermarkTracker = new LastWatermarkTracker(false);
     this.extractorStatsTracker = new 
KafkaExtractorStatsTracker(this.workUnitState, kafkaPartitions);
   }
@@ -134,6 +136,7 @@ public class KafkaProduceRateTrackerTest {
     
workUnitState.setProp(KafkaProduceRateTracker.KAFKA_PRODUCE_RATE_DISABLE_STATS_ON_HOLIDAYS_KEY,
 false);
     workUnitState.setProp(FlushingExtractor.FLUSH_INTERVAL_SECONDS_KEY, 1L);
     workUnitState.setProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY, 5L);
+    workUnitState.setProp(ConfigurationKeys.KAFKA_BROKERS, "testBroker");
     WatermarkTracker watermarkTracker = new LastWatermarkTracker(false);
     KafkaExtractorStatsTracker extractorStatsTracker = new 
KafkaExtractorStatsTracker(workUnitState, kafkaPartitions);
 

Reply via email to