This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9389a4b  [GOBBLIN-876][GOBBLIN-846] Expose metrics() API in 
GobblinKafkaConsumerClient to al…
9389a4b is described below

commit 9389a4b2dfcfc08ad05b34182e341d32b198d97c
Author: sv2000 <[email protected]>
AuthorDate: Thu Sep 26 10:15:52 2019 -0700

    [GOBBLIN-876][GOBBLIN-846] Expose metrics() API in 
GobblinKafkaConsumerClient to al…
    
    Closes #2730 from sv2000/kafkaConsumerMetrics
---
 .../kafka/client/Kafka09ConsumerClient.java        | 37 ++++++++++++++++
 .../client/AbstractBaseKafkaConsumerClient.java    | 50 +++++++++++++++++-----
 .../kafka/client/GobblinKafkaConsumerClient.java   | 13 ++++++
 3 files changed, 89 insertions(+), 11 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
index 5ab27e0..97581c6 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
@@ -18,8 +18,10 @@ package org.apache.gobblin.kafka.client;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 
@@ -27,9 +29,13 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.KafkaMetric;
 
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -43,6 +49,7 @@ import com.typesafe.config.ConfigFactory;
 import javax.annotation.Nonnull;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import 
org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
@@ -58,6 +65,7 @@ import org.apache.gobblin.util.ConfigUtils;
  * @param <K> Message key type
  * @param <V> Message value type
  */
+@Slf4j
 public class Kafka09ConsumerClient<K, V> extends 
AbstractBaseKafkaConsumerClient {
 
   private static final String KAFKA_09_CLIENT_BOOTSTRAP_SERVERS_KEY = 
"bootstrap.servers";
@@ -163,6 +171,35 @@ public class Kafka09ConsumerClient<K, V> extends 
AbstractBaseKafkaConsumerClient
   }
 
   @Override
+  public Map<String, Metric> getMetrics() {
+    Map<MetricName, KafkaMetric> kafkaMetrics = (Map<MetricName, KafkaMetric>) 
this.consumer.metrics();
+    Map<String, Metric> codaHaleMetricMap = new HashMap<>();
+
+    kafkaMetrics
+        .forEach((key, value) -> 
codaHaleMetricMap.put(canonicalMetricName(value), 
kafkaToCodaHaleMetric(value)));
+    return codaHaleMetricMap;
+  }
+
+  /**
+   * Convert a {@link KafkaMetric} instance to a {@link Metric}.
+   * @param kafkaMetric
+   * @return
+   */
+  private Metric kafkaToCodaHaleMetric(final KafkaMetric kafkaMetric) {
+    if (log.isDebugEnabled()) {
+      log.debug("Processing a metric change for {}", 
kafkaMetric.metricName().toString());
+    }
+    Gauge<Double> gauge = () -> kafkaMetric.value();
+    return gauge;
+  }
+
+  private String canonicalMetricName(KafkaMetric kafkaMetric) {
+    MetricName name = kafkaMetric.metricName();
+    return canonicalMetricName(name.group(), name.tags().values(), 
name.name());
+  }
+
+
+  @Override
   public void close() throws IOException {
     this.consumer.close();
   }
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
index 24c737f..63d81fe 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
@@ -16,24 +16,23 @@
  */
 package org.apache.gobblin.kafka.client;
 
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
-import org.apache.gobblin.util.DatasetFilterUtils;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.regex.Pattern;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 
+import javax.annotation.Nonnull;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
 import org.apache.gobblin.util.ConfigUtils;
-import java.util.Map;
-import java.util.regex.Pattern;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
+import org.apache.gobblin.util.DatasetFilterUtils;
 
 
 /**
@@ -85,6 +84,35 @@ public abstract class AbstractBaseKafkaConsumerClient 
implements GobblinKafkaCon
   }
 
   /**
+   * A helper method that returns the canonical metric name for a kafka 
metric. A typical canonicalized metric name would
+   * be of the following format: "{metric-group}_{client-id}_{metric-name}". 
This method is invoked in {@link GobblinKafkaConsumerClient#getMetrics()}
+   * implementations to convert KafkaMetric names to a Coda Hale metric name. 
Note that the canonicalization is done on every invocation of the
+   * {@link GobblinKafkaConsumerClient#getMetrics()} ()} API.
+   * @param metricGroup the type of the Kafka metric 
e.g."consumer-fetch-manager-metrics", "consumer-coordinator-metrics" etc.
+   * @param metricTags any tags associated with the Kafka metric, typically 
include the kafka client id, topic name, partition number etc.
+   * @param metricName the name of the Kafka metric e.g. "records-lag-max", 
"fetch-throttle-time-max" etc.
+   * @return the canonicalized metric name.
+   */
+  protected String canonicalMetricName(String metricGroup, Collection<String> 
metricTags, String metricName) {
+    List<String> nameParts = new ArrayList<>();
+    nameParts.add(metricGroup);
+    nameParts.addAll(metricTags);
+    nameParts.add(metricName);
+
+    StringBuilder builder = new StringBuilder();
+    for (String namePart : nameParts) {
+      builder.append(namePart);
+      builder.append(".");
+    }
+    // Remove the trailing dot.
+    builder.setLength(builder.length() - 1);
+    String processedName = builder.toString().replace(' ', '_').replace("\\.", 
"_");
+
+    return processedName;
+  }
+
+
+  /**
    * Get a list of all kafka topics
    */
   public abstract List<KafkaTopic> getTopics();
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
index a23e557..f94530b 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
@@ -19,8 +19,11 @@ package org.apache.gobblin.kafka.client;
 import java.io.Closeable;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.regex.Pattern;
 
+import com.codahale.metrics.Metric;
+import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 
 import 
org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
@@ -87,6 +90,16 @@ public interface GobblinKafkaConsumerClient extends 
Closeable {
   public Iterator<KafkaConsumerRecord> consume(KafkaPartition partition, long 
nextOffset, long maxOffset);
 
   /**
+   * API to return underlying Kafka consumer metrics. The individual 
implementations must translate
+   * org.apache.kafka.common.Metric to Coda Hale Metrics. A typical use case 
for reporting the consumer metrics
+   * will call this method inside a scheduled thread.
+   * @return
+   */
+  public default Map<String, Metric> getMetrics() {
+    return Maps.newHashMap();
+  }
+
+  /**
    * A factory to create {@link GobblinKafkaConsumerClient}s
    */
   public interface GobblinKafkaConsumerClientFactory {

Reply via email to