This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4e6fa4901 [GOBBLIN-1882] Add brokerName in metric emitted from
KafkaStreamingExtractor (#3745)
4e6fa4901 is described below
commit 4e6fa49015c9b1222b531de8386415c8b36038e6
Author: Zihan Li <[email protected]>
AuthorDate: Mon Aug 21 10:37:13 2023 -0700
[GOBBLIN-1882] Add brokerName in metric emitted from
KafkaStreamingExtractor (#3745)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1882] Add brokerName in metric emitted from
KafkaStreamingExtractor
* add unit tests
* remove unintentional change
* address comment
---------
Co-authored-by: Zihan Li <[email protected]>
---
.../src/main/java/org/apache/gobblin/KafkaCommonUtil.java | 12 +++++++-----
.../source/extractor/extract/kafka/KafkaExtractor.java | 9 +++++----
.../extractor/extract/kafka/KafkaExtractorStatsTracker.java | 5 +++++
.../test/java/org/apache/gobblin/KafkaCommonUtilTest.java | 2 +-
.../extract/kafka/KafkaExtractorStatsTrackerTest.java | 2 ++
.../source/extractor/extract/kafka/KafkaExtractorTest.java | 4 ++--
.../extractor/extract/kafka/KafkaProduceRateTrackerTest.java | 3 +++
7 files changed, 25 insertions(+), 12 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/KafkaCommonUtil.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/KafkaCommonUtil.java
index e85aed1c5..7634e35fc 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/KafkaCommonUtil.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/KafkaCommonUtil.java
@@ -27,14 +27,14 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.State;
import static
org.apache.gobblin.configuration.ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY;
-
+@Slf4j
public class KafkaCommonUtil {
public static final long KAFKA_FLUSH_TIMEOUT_SECONDS = 15L;
public static final String MAP_KEY_VALUE_DELIMITER_KEY = "->";
@@ -72,10 +72,12 @@ public class KafkaCommonUtil {
}
public static Map<String, String> getKafkaBrokerToSimpleNameMap(State state)
{
-
Preconditions.checkArgument(state.contains(KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY),
- String.format("Configuration must contain value for %s",
KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY));
- String mapStr = state.getProp(KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY);
Map<String, String> kafkaBrokerUriToSimpleName = new HashMap<>();
+ if (!state.contains(KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY)) {
+ log.warn("Configuration does not contain value for {}",
KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY);
+ return kafkaBrokerUriToSimpleName;
+ }
+ String mapStr = state.getProp(KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY);
for (String entry : LIST_SPLITTER.splitToList(mapStr)) {
String[] items = entry.trim().split(MAP_KEY_VALUE_DELIMITER_KEY);
kafkaBrokerUriToSimpleName.put(items[0], items[1]);
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index 9a29411c2..e95e98842 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -354,10 +354,11 @@ public abstract class KafkaExtractor<S, D> extends
EventBasedExtractor<S, D> {
String brokerUri = kafkaBrokerUriList.get(0);
Map<String, String> brokerToSimpleName =
KafkaCommonUtil.getKafkaBrokerToSimpleNameMap(state);
- Preconditions.checkArgument(brokerToSimpleName.get(brokerUri) != null,
- String.format("Unable to find simple name for the kafka cluster broker
uri in the config. Please check the map "
- + "value of %s. brokerUri=%s, configMapValue=%s",
KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY, brokerUri, brokerToSimpleName));
-
+ if (!brokerToSimpleName.containsKey(brokerUri)) {
+ LOG.warn("Unable to find simple name for the kafka cluster broker uri in
the config. Please check the map value of {}. brokerUri={}, configMapValue=%{}",
+ KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY, brokerUri, brokerToSimpleName);
+ return "";
+ }
return brokerToSimpleName.get(brokerUri);
}
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
index 8ee8e5e8c..9b4e3ae1a 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
@@ -53,6 +53,7 @@ import org.apache.gobblin.util.TaskEventMetadataUtils;
public class KafkaExtractorStatsTracker {
// Constants for event submission
public static final String TOPIC = "topic";
+ public static final String BROKER_NAME = "brokerName";
public static final String PARTITION = "partition";
private static final String EMPTY_STRING = "";
@@ -87,6 +88,7 @@ public class KafkaExtractorStatsTracker {
private final Histogram observedLatencyHistogram;
private boolean isSlaConfigured;
private long recordLevelSlaMillis;
+ private String brokerName;
//Minimum partition index processed by this task. Statistics that are
aggregated across all partitions (e.g. observed latency histogram)
// processed by the task are reported against this partition index.
private int minPartitionIdx = Integer.MAX_VALUE;
@@ -110,6 +112,7 @@ public class KafkaExtractorStatsTracker {
public KafkaExtractorStatsTracker(WorkUnitState state, List<KafkaPartition>
partitions) {
this.workUnitState = state;
+ this.brokerName = KafkaExtractor.getKafkaBrokerSimpleName(workUnitState);
this.partitions = partitions;
this.statsMap = Maps.newHashMapWithExpectedSize(this.partitions.size());
this.partitions.forEach(partition -> {
@@ -495,6 +498,7 @@ public class KafkaExtractorStatsTracker {
for (Map.Entry<KafkaPartition, Map<String, String>> eventTags :
tagsForPartitionsMap.entrySet()) {
EventSubmitter.Builder eventSubmitterBuilder = new
EventSubmitter.Builder(context, GOBBLIN_KAFKA_NAMESPACE);
eventSubmitterBuilder.addMetadata(this.taskEventMetadataGenerator.getMetadata(workUnitState,
KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME));
+ eventSubmitterBuilder.addMetadata(BROKER_NAME, this.brokerName);
eventSubmitterBuilder.build().submit(KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME,
eventTags.getValue());
}
}
@@ -508,6 +512,7 @@ public class KafkaExtractorStatsTracker {
KafkaPartition partitionKey = this.partitions.get(i);
GobblinEventBuilder gobblinEventBuilder = new
GobblinEventBuilder(KAFKA_EXTRACTOR_CONTAINER_TRANSITION_EVENT_NAME,
GOBBLIN_KAFKA_NAMESPACE);
gobblinEventBuilder.addMetadata(TOPIC, partitionKey.getTopicName());
+ gobblinEventBuilder.addMetadata(BROKER_NAME, this.brokerName);
gobblinEventBuilder.addMetadata(PARTITION,
Integer.toString(partitionKey.getId()));
gobblinEventBuilder.addAdditionalMetadata(this.taskEventMetadataGenerator.getMetadata(workUnitState,
KAFKA_EXTRACTOR_CONTAINER_TRANSITION_EVENT_NAME));
EventSubmitter.submit(context, gobblinEventBuilder);
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/KafkaCommonUtilTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/KafkaCommonUtilTest.java
index 273e41481..c40692cd1 100644
---
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/KafkaCommonUtilTest.java
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/KafkaCommonUtilTest.java
@@ -34,7 +34,7 @@ public class KafkaCommonUtilTest {
String simpleName = "some-identifier";
State state = new State();
- Assert.assertThrows(IllegalArgumentException.class, () ->
KafkaCommonUtil.getKafkaBrokerToSimpleNameMap(state));
+
Assert.assertEquals(KafkaCommonUtil.getKafkaBrokerToSimpleNameMap(state).size(),0);
state.setProp(ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY,
String.format("%s->%s", brokerUri, simpleName));
Assert.assertEquals(KafkaCommonUtil.getKafkaBrokerToSimpleNameMap(state),
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
index bef77d892..035fd5b6d 100644
---
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.HistogramLogReader;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -50,6 +51,7 @@ public class KafkaExtractorStatsTrackerTest {
this.workUnitState = new WorkUnitState();
this.workUnitState.setProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY, 10L);
this.workUnitState.setProp(KafkaSource.OBSERVED_LATENCY_MEASUREMENT_ENABLED,
true);
+ this.workUnitState.setProp(ConfigurationKeys.KAFKA_BROKERS, "testBroker");
this.extractorStatsTracker = new KafkaExtractorStatsTracker(workUnitState,
kafkaPartitions);
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorTest.java
index 7f1b27e58..782e9c661 100644
---
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorTest.java
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorTest.java
@@ -40,10 +40,10 @@ public class KafkaExtractorTest {
final String kafkaBrokerUri = "kafka.broker.uri.com:12345";
final String kafkaBrokerSimpleName = "simple.kafka.name";
state.setProp(ConfigurationKeys.KAFKA_BROKERS, kafkaBrokerUri);
- Assert.assertThrows(IllegalArgumentException.class, () ->
KafkaExtractor.getKafkaBrokerSimpleName(state));
+ Assert.assertEquals("", KafkaExtractor.getKafkaBrokerSimpleName(state));
state.setProp(ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY,
String.format("foobar->foobarId", kafkaBrokerUri, kafkaBrokerSimpleName));
- Assert.assertThrows(IllegalArgumentException.class, () ->
KafkaExtractor.getKafkaBrokerSimpleName(state));
+ Assert.assertEquals("", KafkaExtractor.getKafkaBrokerSimpleName(state));
state.setProp(ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY,
String.format("%s->%s,foobar->foobarId", kafkaBrokerUri,
kafkaBrokerSimpleName));
Assert.assertEquals(KafkaExtractor.getKafkaBrokerSimpleName(state),
kafkaBrokerSimpleName);
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTrackerTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTrackerTest.java
index bf8d37f02..097c26210 100644
---
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTrackerTest.java
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTrackerTest.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.joda.time.LocalDate;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
@@ -58,6 +59,7 @@ public class KafkaProduceRateTrackerTest {
kafkaPartitions.add(new
KafkaPartition.Builder().withTopicName("test-topic").withId(1).build());
this.workUnitState = new WorkUnitState();
this.workUnitState.setProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY, 5L);
+ this.workUnitState.setProp(ConfigurationKeys.KAFKA_BROKERS, "testBroker");
this.watermarkTracker = new LastWatermarkTracker(false);
this.extractorStatsTracker = new
KafkaExtractorStatsTracker(this.workUnitState, kafkaPartitions);
}
@@ -134,6 +136,7 @@ public class KafkaProduceRateTrackerTest {
workUnitState.setProp(KafkaProduceRateTracker.KAFKA_PRODUCE_RATE_DISABLE_STATS_ON_HOLIDAYS_KEY,
false);
workUnitState.setProp(FlushingExtractor.FLUSH_INTERVAL_SECONDS_KEY, 1L);
workUnitState.setProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY, 5L);
+ workUnitState.setProp(ConfigurationKeys.KAFKA_BROKERS, "testBroker");
WatermarkTracker watermarkTracker = new LastWatermarkTracker(false);
KafkaExtractorStatsTracker extractorStatsTracker = new
KafkaExtractorStatsTracker(workUnitState, kafkaPartitions);