This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9389a4b [GOBBLIN-876][GOBBLIN-846] Expose metrics() API in
GobblinKafkaConsumerClient to al…
9389a4b is described below
commit 9389a4b2dfcfc08ad05b34182e341d32b198d97c
Author: sv2000 <[email protected]>
AuthorDate: Thu Sep 26 10:15:52 2019 -0700
[GOBBLIN-876][GOBBLIN-846] Expose metrics() API in
GobblinKafkaConsumerClient to al…
Closes #2730 from sv2000/kafkaConsumerMetrics
---
.../kafka/client/Kafka09ConsumerClient.java | 37 ++++++++++++++++
.../client/AbstractBaseKafkaConsumerClient.java | 50 +++++++++++++++++-----
.../kafka/client/GobblinKafkaConsumerClient.java | 13 ++++++
3 files changed, 89 insertions(+), 11 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
index 5ab27e0..97581c6 100644
---
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
+++
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
@@ -18,8 +18,10 @@ package org.apache.gobblin.kafka.client;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
@@ -27,9 +29,13 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@@ -43,6 +49,7 @@ import com.typesafe.config.ConfigFactory;
import javax.annotation.Nonnull;
import lombok.EqualsAndHashCode;
import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import
org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
@@ -58,6 +65,7 @@ import org.apache.gobblin.util.ConfigUtils;
* @param <K> Message key type
* @param <V> Message value type
*/
+@Slf4j
public class Kafka09ConsumerClient<K, V> extends
AbstractBaseKafkaConsumerClient {
private static final String KAFKA_09_CLIENT_BOOTSTRAP_SERVERS_KEY =
"bootstrap.servers";
@@ -163,6 +171,35 @@ public class Kafka09ConsumerClient<K, V> extends
AbstractBaseKafkaConsumerClient
}
@Override
+ public Map<String, Metric> getMetrics() {
+ Map<MetricName, KafkaMetric> kafkaMetrics = (Map<MetricName, KafkaMetric>)
this.consumer.metrics();
+ Map<String, Metric> codaHaleMetricMap = new HashMap<>();
+
+ kafkaMetrics
+ .forEach((key, value) ->
codaHaleMetricMap.put(canonicalMetricName(value),
kafkaToCodaHaleMetric(value)));
+ return codaHaleMetricMap;
+ }
+
+ /**
+ * Convert a {@link KafkaMetric} instance to a {@link Metric}.
+ * @param kafkaMetric
+ * @return
+ */
+ private Metric kafkaToCodaHaleMetric(final KafkaMetric kafkaMetric) {
+ if (log.isDebugEnabled()) {
+ log.debug("Processing a metric change for {}",
kafkaMetric.metricName().toString());
+ }
+ Gauge<Double> gauge = () -> kafkaMetric.value();
+ return gauge;
+ }
+
+ private String canonicalMetricName(KafkaMetric kafkaMetric) {
+ MetricName name = kafkaMetric.metricName();
+ return canonicalMetricName(name.group(), name.tags().values(),
name.name());
+ }
+
+
+ @Override
public void close() throws IOException {
this.consumer.close();
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
index 24c737f..63d81fe 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
@@ -16,24 +16,23 @@
*/
package org.apache.gobblin.kafka.client;
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
-import org.apache.gobblin.util.DatasetFilterUtils;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import java.util.regex.Pattern;
import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import com.typesafe.config.Config;
+import javax.annotation.Nonnull;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
import org.apache.gobblin.util.ConfigUtils;
-import java.util.Map;
-import java.util.regex.Pattern;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
+import org.apache.gobblin.util.DatasetFilterUtils;
/**
@@ -85,6 +84,35 @@ public abstract class AbstractBaseKafkaConsumerClient
implements GobblinKafkaCon
}
/**
+ * A helper method that returns the canonical metric name for a kafka
metric. A typical canonicalized metric name would
+ * be of the following format: "{metric-group}_{client-id}_{metric-name}".
This method is invoked in {@link GobblinKafkaConsumerClient#getMetrics()}
+ * implementations to convert KafkaMetric names to a Coda Hale metric name.
Note that the canonicalization is done on every invocation of the
+ * {@link GobblinKafkaConsumerClient#getMetrics()} ()} API.
+ * @param metricGroup the type of the Kafka metric
e.g."consumer-fetch-manager-metrics", "consumer-coordinator-metrics" etc.
+ * @param metricTags any tags associated with the Kafka metric, typically
include the kafka client id, topic name, partition number etc.
+ * @param metricName the name of the Kafka metric e.g. "records-lag-max",
"fetch-throttle-time-max" etc.
+ * @return the canonicalized metric name.
+ */
+ protected String canonicalMetricName(String metricGroup, Collection<String>
metricTags, String metricName) {
+ List<String> nameParts = new ArrayList<>();
+ nameParts.add(metricGroup);
+ nameParts.addAll(metricTags);
+ nameParts.add(metricName);
+
+ StringBuilder builder = new StringBuilder();
+ for (String namePart : nameParts) {
+ builder.append(namePart);
+ builder.append(".");
+ }
+ // Remove the trailing dot.
+ builder.setLength(builder.length() - 1);
+ String processedName = builder.toString().replace(' ', '_').replace("\\.",
"_");
+
+ return processedName;
+ }
+
+
+ /**
* Get a list of all kafka topics
*/
public abstract List<KafkaTopic> getTopics();
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
index a23e557..f94530b 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
@@ -19,8 +19,11 @@ package org.apache.gobblin.kafka.client;
import java.io.Closeable;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.regex.Pattern;
+import com.codahale.metrics.Metric;
+import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import
org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
@@ -87,6 +90,16 @@ public interface GobblinKafkaConsumerClient extends
Closeable {
public Iterator<KafkaConsumerRecord> consume(KafkaPartition partition, long
nextOffset, long maxOffset);
/**
+ * API to return underlying Kafka consumer metrics. The individual
implementations must translate
+ * org.apache.kafka.common.Metric to Coda Hale Metrics. A typical use case
for reporting the consumer metrics
+ * will call this method inside a scheduled thread.
+ * @return
+ */
+ public default Map<String, Metric> getMetrics() {
+ return Maps.newHashMap();
+ }
+
+ /**
* A factory to create {@link GobblinKafkaConsumerClient}s
*/
public interface GobblinKafkaConsumerClientFactory {