This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 856fbc1  [GOBBLIN-758] Added new reporters to emit MetricReport and 
GobblinTrackingEvent without serializing them. Also added random key generator 
for reporters.
856fbc1 is described below

commit 856fbc103e9fd144f062fb0ecaeba1f35f2f205b
Author: vbohra <[email protected]>
AuthorDate: Mon May 6 15:53:00 2019 -0700

    [GOBBLIN-758] Added new reporters to emit MetricReport and 
GobblinTrackingEvent without serializing them. Also added random key generator 
for reporters.
    
    Closes #2622 from vikrambohra/ETL-8675
---
 .../gobblin/configuration/ConfigurationKeys.java   |   8 +-
 .../gobblin/metrics/KafkaReportingFormats.java     | 217 +++++++++++++++------
 .../metrics/kafka/KafkaReporterFactory.java        |  57 +++---
 .../gobblin/metrics/kafka/LoggingPusher.java       |  25 ++-
 .../apache/gobblin/metrics/kafka/PusherUtils.java  |   5 +-
 .../reporter/KeyValueEventObjectReporter.java      | 161 +++++++++++++++
 .../reporter/KeyValueMetricObjectReporter.java     | 138 +++++++++++++
 .../gobblin/metrics/reporter/KeyValuePusher.java   |  30 +++
 .../gobblin/metrics/kafka/LoggingPusherTest.java   |  20 +-
 .../reporter/KeyValueEventObjectReporterTest.java  | 116 +++++++++++
 .../reporter/KeyValueMetricObjectReporterTest.java | 113 +++++++++++
 .../metrics/reporter/MockKeyValuePusher.java       |  60 ++++++
 .../java/org/apache/gobblin/util/AvroUtils.java    |  42 +++-
 .../org/apache/gobblin/util/AvroUtilsTest.java     |  31 +++
 14 files changed, 910 insertions(+), 113 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index a5b56bb..b628138 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -666,7 +666,7 @@ public class ConfigurationKeys {
   public static final String METRIC_TIMER_WINDOW_SIZE_IN_MINUTES = 
METRICS_CONFIGURATIONS_PREFIX + "timer.window.size.in.minutes";
   public static final int DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES = 15;
   public static final String METRICS_REPORTING_CONFIGURATIONS_PREFIX = 
"metrics.reporting";
-
+  public static final String METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX = 
METRICS_REPORTING_CONFIGURATIONS_PREFIX + ".events";
   // File-based reporting
   public static final String METRICS_REPORTING_FILE_ENABLED_KEY =
       METRICS_CONFIGURATIONS_PREFIX + "reporting.file.enabled";
@@ -690,6 +690,7 @@ public class ConfigurationKeys {
   public static final String DEFAULT_METRICS_REPORTING_KAFKA_REPORTER_CLASS = 
"org.apache.gobblin.metrics.kafka.KafkaReporterFactory";
   public static final String METRICS_REPORTING_KAFKA_FORMAT = 
METRICS_CONFIGURATIONS_PREFIX + "reporting.kafka.format";
   public static final String METRICS_REPORTING_EVENTS_KAFKA_FORMAT = 
METRICS_CONFIGURATIONS_PREFIX + "reporting.events.kafka.format";
+  public static final String METRICS_REPORTING_KAFKAPUSHERKEYS = 
METRICS_CONFIGURATIONS_PREFIX + "reporting.kafkaPusherKeys";
   public static final String METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS = 
METRICS_CONFIGURATIONS_PREFIX + "reporting.events.kafkaPusherKeys";
   public static final String DEFAULT_METRICS_REPORTING_KAFKA_FORMAT = "json";
   public static final String METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY =
@@ -705,6 +706,11 @@ public class ConfigurationKeys {
   // Topic used only for event reporting.
   public static final String METRICS_KAFKA_TOPIC_EVENTS =
       METRICS_CONFIGURATIONS_PREFIX + "reporting.kafka.topic.events";
+  // Key related configurations for raw metric and event key value reporters
+  public static final int DEFAULT_REPORTER_KEY_SIZE = 100;
+
+  public static final String METRICS_REPORTING_PUSHERKEYS = 
METRICS_CONFIGURATIONS_PREFIX + "reporting.pusherKeys";
+  public static final String METRICS_REPORTING_EVENTS_PUSHERKEYS = 
METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX + ".pusherKeys";
 
   //Graphite-based reporting
   public static final String METRICS_REPORTING_GRAPHITE_METRICS_ENABLED_KEY =
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
index e9e981d..eb409ff 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
@@ -17,10 +17,13 @@
 
 package org.apache.gobblin.metrics;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Properties;
 
+import com.codahale.metrics.ScheduledReporter;
 import com.google.common.base.Splitter;
+import com.typesafe.config.Config;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
@@ -28,7 +31,12 @@ import 
org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter;
 import org.apache.gobblin.metrics.kafka.KafkaAvroReporter;
 import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
 import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
+import org.apache.gobblin.metrics.reporter.KeyValueEventObjectReporter;
+import org.apache.gobblin.metrics.reporter.KeyValueMetricObjectReporter;
 import org.apache.gobblin.metrics.kafka.KafkaReporter;
+import org.apache.gobblin.metrics.kafka.PusherUtils;
+import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
+import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
@@ -36,68 +44,161 @@ import org.apache.gobblin.metrics.kafka.KafkaReporter;
  */
 public enum KafkaReportingFormats {
 
-  AVRO,
-  AVRO_KEY_VALUE,
-  JSON;
+  AVRO() {
+    @Override
+    public void buildMetricsReporter(String brokers, String topic, Properties 
properties)
+        throws IOException {
+
+      KafkaAvroReporter.Builder<?> builder = 
KafkaAvroReporter.BuilderFactory.newBuilder();
+      if 
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+          
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
+        builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+      }
+      builder.build(brokers, topic, properties);
+    }
+
+    @Override
+    public ScheduledReporter buildEventsReporter(String brokers, String topic, 
MetricContext context,
+        Properties properties)
+        throws IOException {
+
+      KafkaAvroEventReporter.Builder<?> builder = 
KafkaAvroEventReporter.Factory.forContext(context);
+      if 
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+          
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
+        builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+      }
+      String pusherClassName = 
properties.containsKey(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) ? 
properties
+          .getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) : 
properties
+          .getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY, 
PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME);
+      builder.withPusherClassName(pusherClassName);
+
+      Config allConfig = ConfigUtils.propertiesToConfig(properties);
+      // the kafka configuration is composed of the metrics reporting specific 
keys with a fallback to the shared
+      // kafka config
+      Config kafkaConfig = ConfigUtils.getConfigOrEmpty(allConfig, 
PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX)
+          .withFallback(ConfigUtils.getConfigOrEmpty(allConfig, 
ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
+
+      builder.withConfig(kafkaConfig);
+
+      return builder.build(brokers, topic);
+    }
+  }, AVRO_KEY_VALUE() {
+    @Override
+    public void buildMetricsReporter(String brokers, String topic, Properties 
properties)
+        throws IOException {
+
+      throw new IOException("Unsupported format for Metric reporting " + 
this.name());
+    }
+
+    @Override
+    public ScheduledReporter buildEventsReporter(String brokers, String topic, 
MetricContext context,
+        Properties properties)
+        throws IOException {
+
+      KafkaAvroEventKeyValueReporter.Builder<?> builder = 
KafkaAvroEventKeyValueReporter.Factory.forContext(context);
+      if 
(properties.containsKey(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS))
 {
+        List<String> keys = Splitter.on(",").omitEmptyStrings().trimResults()
+            
.splitToList(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS));
+        builder.withKeys(keys);
+      }
+      if 
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+          
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
+        builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+      }
+      String pusherClassName = 
properties.containsKey(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) ? 
properties
+          .getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) : 
properties
+          .getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY, 
PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME);
+      builder.withPusherClassName(pusherClassName);
+
+      Config allConfig = ConfigUtils.propertiesToConfig(properties);
+      // the kafka configuration is composed of the metrics reporting specific 
keys with a fallback to the shared
+      // kafka config
+      Config kafkaConfig = ConfigUtils.getConfigOrEmpty(allConfig, 
PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX)
+          .withFallback(ConfigUtils.getConfigOrEmpty(allConfig, 
ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
+
+      builder.withConfig(kafkaConfig);
+
+      return builder.build(brokers, topic);
+    }
+  }, JSON() {
+    @Override
+    public void buildMetricsReporter(String brokers, String topic, Properties 
properties)
+        throws IOException {
+      KafkaReporter.Builder builder = 
KafkaReporter.BuilderFactory.newBuilder();
+      builder.build(brokers, topic, properties);
+    }
+
+    @Override
+    public ScheduledReporter buildEventsReporter(String brokers, String topic, 
MetricContext context,
+        Properties properties)
+        throws IOException {
+      KafkaEventReporter.Builder builder = 
KafkaEventReporter.Factory.forContext(context);
+      //builder.withConfig(getEventsConfig(properties));
+      String pusherClassName = 
properties.containsKey(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) ? 
properties
+          .getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) : 
properties
+          .getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY, 
PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME);
+      builder.withPusherClassName(pusherClassName);
+
+      Config allConfig = ConfigUtils.propertiesToConfig(properties);
+      // the kafka configuration is composed of the metrics reporting specific 
keys with a fallback to the shared
+      // kafka config
+      Config kafkaConfig = ConfigUtils.getConfigOrEmpty(allConfig, 
PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX)
+          .withFallback(ConfigUtils.getConfigOrEmpty(allConfig, 
ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
+
+      builder.withConfig(kafkaConfig);
+
+      return builder.build(brokers, topic);
+    }
+  }, PLAIN_OBJECT() {
+    @Override
+    public void buildMetricsReporter(String brokers, String topic, Properties 
properties)
+        throws IOException {
+
+      KeyValueMetricObjectReporter.Builder builder = new 
KeyValueMetricObjectReporter.Builder();
+      
builder.namespaceOverride(KafkaAvroReporterUtil.extractOverrideNamespace(properties));
+      Config allConfig = ConfigUtils.propertiesToConfig(properties);
+      Config config = ConfigUtils.getConfigOrEmpty(allConfig, 
ConfigurationKeys.METRICS_REPORTING_CONFIGURATIONS_PREFIX)
+          .withFallback(allConfig);
+      builder.build(brokers, topic, config);
+    }
+
+    @Override
+    public ScheduledReporter buildEventsReporter(String brokers, String topic, 
MetricContext context,
+        Properties properties)
+        throws IOException {
+
+      KeyValueEventObjectReporter.Builder builder = new 
KeyValueEventObjectReporter.Builder(context);
+      Config allConfig = ConfigUtils.propertiesToConfig(properties);
+      Config config =
+          ConfigUtils.getConfigOrEmpty(allConfig, 
ConfigurationKeys.METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX)
+              .withFallback(allConfig);
+      builder.withConfig(config);
+      
builder.namespaceOverride(KafkaAvroReporterUtil.extractOverrideNamespace(properties));
+      return builder.build(brokers, topic);
+    }
+  };
 
   /**
-   * Get a {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder} for 
this reporting format.
-   *
-   * @param properties {@link Properties} containing information to build 
reporters.
-   * @return {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder}.
+   * Method to build reporters that emit metrics. This method does not return 
anything but schedules/starts the reporter internally
+   * @param brokers Kafka broker to connect
+   * @param topic Kafka topic to publish data
+   * @param properties Properties to build configurations from
+   * @throws IOException
    */
-  public KafkaReporter.Builder<?> metricReporterBuilder(Properties properties) 
{
-    switch (this) {
-      case AVRO:
-        KafkaAvroReporter.Builder<?> builder = 
KafkaAvroReporter.BuilderFactory.newBuilder();
-        if 
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
-            
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
-          builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
-        }
-        return builder;
-      case JSON:
-        return KafkaReporter.BuilderFactory.newBuilder();
-      default:
-        // This should never happen.
-        throw new IllegalArgumentException("KafkaReportingFormat not 
recognized.");
-    }
-  }
+  public abstract void buildMetricsReporter(String brokers, String topic, 
Properties properties)
+      throws IOException;
 
   /**
-   * Get a {@link org.apache.gobblin.metrics.kafka.KafkaEventReporter.Builder} 
for this reporting format.
-   * @param context {@link MetricContext} that should be reported.
-   * @param properties {@link Properties} containing information to build 
reporters.
-   * @return {@link 
org.apache.gobblin.metrics.kafka.KafkaEventReporter.Builder}.
+   * Method to build reporters that emit events.
+   * @param brokers Kafka broker to connect
+   * @param topic Kafka topic to publish data
+   * @param context MetricContext to report
+   * @param properties Properties to build configurations from
+   * @return an instance of the event reporter
+   * @throws IOException
    */
-  public KafkaEventReporter.Builder<?> eventReporterBuilder(MetricContext 
context, Properties properties) {
-    switch (this) {
-      case AVRO:
-        KafkaAvroEventReporter.Builder<?> kafkaAvroEventReporterBuilder = 
KafkaAvroEventReporter.Factory.forContext(context);
-        if 
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
-            
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
-          kafkaAvroEventReporterBuilder.withSchemaRegistry(new 
KafkaAvroSchemaRegistry(properties));
-        }
-        return kafkaAvroEventReporterBuilder;
-
-      case AVRO_KEY_VALUE:
-        KafkaAvroEventKeyValueReporter.Builder<?> 
kafkaAvroEventKeyValueReporterBuilder = 
KafkaAvroEventKeyValueReporter.Factory.forContext(context);
-        if 
(properties.containsKey(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS))
 {
-          List<String> keys = Splitter.on(",").omitEmptyStrings().trimResults()
-              
.splitToList(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS));
-          kafkaAvroEventKeyValueReporterBuilder.withKeys(keys);
-        }
-        if 
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
-            
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
-          kafkaAvroEventKeyValueReporterBuilder.withSchemaRegistry(new 
KafkaAvroSchemaRegistry(properties));
-        }
-        return kafkaAvroEventKeyValueReporterBuilder;
-
-      case JSON:
-        return KafkaEventReporter.Factory.forContext(context);
-
-      default:
-        // This should never happen.
-        throw new IllegalArgumentException("KafkaReportingFormat not 
recognized.");
-    }
-  }
+  public abstract ScheduledReporter buildEventsReporter(String brokers, String 
topic, MetricContext context,
+      Properties properties)
+      throws IOException;
+
 }
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
index be95258..9f38379 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
@@ -24,13 +24,11 @@ import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.ScheduledReporter;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.CustomCodahaleReporterFactory;
 import org.apache.gobblin.metrics.KafkaReportingFormats;
 import org.apache.gobblin.metrics.RootMetricContext;
-import org.apache.gobblin.util.ConfigUtils;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -44,18 +42,23 @@ public class KafkaReporterFactory implements 
CustomCodahaleReporterFactory {
         ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_ENABLED))) {
       return null;
     }
-    log.info("Reporting metrics to Kafka");
+    log.info("Reporting metrics & events to Kafka");
 
-    Optional<String> defaultTopic = 
Optional.fromNullable(properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC));
-    Optional<String> metricsTopic = Optional.fromNullable(
-        properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_METRICS));
-    Optional<String> eventsTopic = Optional.fromNullable(
-        properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_EVENTS));
+    Optional<String> defaultTopic =
+        
Optional.fromNullable(properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC));
+    Optional<String> metricsTopic =
+        
Optional.fromNullable(properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_METRICS));
+    Optional<String> eventsTopic =
+        
Optional.fromNullable(properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_EVENTS));
 
     boolean metricsEnabled = metricsTopic.or(defaultTopic).isPresent();
-    if (metricsEnabled) log.info("Reporting metrics to Kafka");
+    if (metricsEnabled) {
+      log.info("Metrics enabled ---  Reporting metrics to Kafka");
+    }
     boolean eventsEnabled = eventsTopic.or(defaultTopic).isPresent();
-    if (eventsEnabled) log.info("Reporting events to Kafka");
+    if (eventsEnabled) {
+      log.info("Events enabled --- Reporting events to Kafka");
+    }
 
     try {
       
Preconditions.checkArgument(properties.containsKey(ConfigurationKeys.METRICS_KAFKA_BROKERS),
@@ -75,15 +78,15 @@ public class KafkaReporterFactory implements 
CustomCodahaleReporterFactory {
     try {
       formatEnum = 
KafkaReportingFormats.valueOf(metricsReportingFormat.toUpperCase());
     } catch (IllegalArgumentException exception) {
-      log.warn("Kafka metrics reporting format " + metricsReportingFormat +
-          " not recognized. Will report in json format.", exception);
+      log.warn(
+          "Kafka metrics reporting format " + metricsReportingFormat + " not 
recognized. Will report in json format.",
+          exception);
       formatEnum = KafkaReportingFormats.JSON;
     }
 
     if (metricsEnabled) {
       try {
-        formatEnum.metricReporterBuilder(properties)
-            .build(brokers, metricsTopic.or(defaultTopic).get(), properties);
+        formatEnum.buildMetricsReporter(brokers, 
metricsTopic.or(defaultTopic).get(), properties);
       } catch (IOException exception) {
         log.error("Failed to create Kafka metrics reporter. Will not report 
metrics to Kafka.", exception);
       }
@@ -96,7 +99,8 @@ public class KafkaReporterFactory implements 
CustomCodahaleReporterFactory {
       try {
         eventFormatEnum = 
KafkaReportingFormats.valueOf(eventsReportingFormat.toUpperCase());
       } catch (IllegalArgumentException exception) {
-        log.warn("Kafka events reporting format " + eventsReportingFormat + " 
not recognized. Will report in json format.",
+        log.warn(
+            "Kafka events reporting format " + eventsReportingFormat + " not 
recognized. Will report in json format.",
             exception);
         eventFormatEnum = KafkaReportingFormats.JSON;
       }
@@ -106,31 +110,16 @@ public class KafkaReporterFactory implements 
CustomCodahaleReporterFactory {
 
     if (eventsEnabled) {
       try {
-        KafkaEventReporter.Builder<?> builder = 
eventFormatEnum.eventReporterBuilder(RootMetricContext.get(),
-            properties);
-
-        Config allConfig = ConfigUtils.propertiesToConfig(properties);
-        // the kafka configuration is composed of the metrics reporting 
specific keys with a fallback to the shared
-        // kafka config
-        Config kafkaConfig = ConfigUtils.getConfigOrEmpty(allConfig,
-            
PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX).withFallback(ConfigUtils.getConfigOrEmpty(allConfig,
-            ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
-
-        builder.withConfig(kafkaConfig);
-
-        String pusherClassName = 
properties.containsKey(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS)
-            ? 
properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS)
-            : properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY,
-                PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME);
-        builder.withPusherClassName(pusherClassName);
+        String eventTopic = eventsTopic.or(defaultTopic).get();
+        ScheduledReporter reporter =
+            eventFormatEnum.buildEventsReporter(brokers, eventTopic, 
RootMetricContext.get(), properties);
 
-        return builder.build(brokers, eventsTopic.or(defaultTopic).get());
+        return reporter;
       } catch (IOException exception) {
         log.error("Failed to create Kafka events reporter. Will not report 
events to Kafka.", exception);
       }
     }
 
-    log.info("Will start reporting metrics to Kafka");
     return null;
   }
 }
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/LoggingPusher.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/LoggingPusher.java
index b86287e..7033a3a 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/LoggingPusher.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/LoggingPusher.java
@@ -19,20 +19,24 @@ package org.apache.gobblin.metrics.kafka;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.commons.lang3.tuple.Pair;
+
 import com.google.common.base.Optional;
 import com.typesafe.config.Config;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.reporter.KeyValuePusher;
 import org.apache.gobblin.util.ConfigUtils;
 
 import lombok.extern.slf4j.Slf4j;
 
+
 /**
  * This is a {@Pusher} class that logs the messages
- * @param <M> message type
+ * @param <V> message type
  */
 @Slf4j
-public class LoggingPusher<M> implements Pusher<M> {
+public class LoggingPusher<K, V> implements KeyValuePusher<K, V> {
   private final String brokers;
   private final String topic;
   private static final String KAFKA_TOPIC = "kafka.topic";
@@ -56,13 +60,22 @@ public class LoggingPusher<M> implements Pusher<M> {
     this.topic = topic;
   }
 
-  public void pushMessages(List<M> messages) {
-    for (M message: messages) {
-      log.info("Pushing to {}:{}: {}", this.brokers, this.topic, 
message.toString());
+  @Override
+  public void close()
+      throws IOException {
+  }
+
+  @Override
+  public void pushKeyValueMessages(List<Pair<K, V>> messages) {
+    for (Pair<K, V> message : messages) {
+      log.info("Pushing to {}:{}: {} - {}", this.brokers, this.topic, 
message.getKey(), message.getValue().toString());
     }
   }
 
   @Override
-  public void close() throws IOException {
+  public void pushMessages(List<V> messages) {
+    for (V message : messages) {
+      log.info("Pushing to {}:{}: {}", this.brokers, this.topic, 
message.toString());
+    }
   }
 }
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java
index 0d9b2a5..426b5ee 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java
@@ -21,11 +21,13 @@ import com.typesafe.config.Config;
 
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
+
 public class PusherUtils {
   public static final String METRICS_REPORTING_KAFKA_CONFIG_PREFIX = 
"metrics.reporting.kafka.config";
   public static final String KAFKA_PUSHER_CLASS_NAME_KEY = 
"metrics.reporting.kafkaPusherClass";
   public static final String KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS = 
"metrics.reporting.events.kafkaPusherClass";
   public static final String DEFAULT_KAFKA_PUSHER_CLASS_NAME = 
"org.apache.gobblin.metrics.kafka.KafkaPusher";
+  public static final String DEFAULT_KEY_VALUE_PUSHER_CLASS_NAME = 
"org.apache.gobblin.metrics.kafka.LoggingPusher";
 
   /**
    * Create a {@link Pusher}
@@ -39,8 +41,7 @@ public class PusherUtils {
     try {
       Class<?> pusherClass = Class.forName(pusherClassName);
 
-     return (Pusher) 
GobblinConstructorUtils.invokeLongestConstructor(pusherClass,
-          brokers, topic, config);
+      return (Pusher) 
GobblinConstructorUtils.invokeLongestConstructor(pusherClass, brokers, topic, 
config);
     } catch (ReflectiveOperationException e) {
       throw new RuntimeException("Could not instantiate kafka pusher", e);
     }
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporter.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporter.java
new file mode 100644
index 0000000..fb4e006
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporter.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+import java.util.StringJoiner;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.kafka.PusherUtils;
+import org.apache.gobblin.util.AvroUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * This is a raw event (GobblinTrackingEvent) key value reporter that reports 
events as GenericRecords without serialization
+ * Configuration for this reporter start with the prefix 
"metrics.reporting.events"
+ */
+@Slf4j
+public class KeyValueEventObjectReporter extends EventReporter {
+  private static final String PUSHER_CONFIG = "pusherConfig";
+  private static final String PUSHER_CLASS = "pusherClass";
+  private static final String PUSHER_KEYS = "pusherKeys";
+  private static final String KEY_DELIMITER = ",";
+  private static final String KEY_SIZE_KEY = "keySize";
+
+  protected List<String> keys;
+  protected final String randomKey;
+  protected KeyValuePusher pusher;
+  protected Optional<Map<String, String>> namespaceOverride;
+  protected final String topic;
+
+  public KeyValueEventObjectReporter(Builder builder) {
+    super(builder);
+
+    this.topic = builder.topic;
+    this.namespaceOverride = builder.namespaceOverride;
+
+    Config config = builder.config.get();
+    Config pusherConfig = ConfigUtils.getConfigOrEmpty(config, 
PUSHER_CONFIG).withFallback(config);
+    String pusherClassName =
+        ConfigUtils.getString(config, PUSHER_CLASS, 
PusherUtils.DEFAULT_KEY_VALUE_PUSHER_CLASS_NAME);
+    this.pusher = (KeyValuePusher) PusherUtils
+        .getPusher(pusherClassName, builder.brokers, builder.topic, 
Optional.of(pusherConfig));
+    this.closer.register(this.pusher);
+
+    randomKey = String.valueOf(
+        new Random().nextInt(ConfigUtils.getInt(config, KEY_SIZE_KEY, 
ConfigurationKeys.DEFAULT_REPORTER_KEY_SIZE)));
+    if (config.hasPath(PUSHER_KEYS)) {
+      List<String> keys = 
Splitter.on(",").omitEmptyStrings().trimResults().splitToList(config.getString(PUSHER_KEYS));
+      this.keys = keys;
+    } else {
+      log.info(
+          "Key not assigned from config. Please set it with property {} Using 
randomly generated number {} as key ",
+          ConfigurationKeys.METRICS_REPORTING_EVENTS_PUSHERKEYS, randomKey);
+    }
+  }
+
+  @Override
+  public void reportEventQueue(Queue<GobblinTrackingEvent> queue) {
+    log.info("Emitting report using KeyValueEventObjectReporter");
+
+    List<Pair<String, GenericRecord>> events = Lists.newArrayList();
+    GobblinTrackingEvent event;
+
+    while (null != (event = queue.poll())) {
+      GenericRecord record = AvroUtils.overrideNameAndNamespace(event, 
this.topic, this.namespaceOverride);
+      events.add(Pair.of(buildKey(record), record));
+    }
+
+    if (!events.isEmpty()) {
+      this.pusher.pushKeyValueMessages(events);
+    }
+  }
+
+  private String buildKey(GenericRecord record) {
+
+    String key = randomKey;
+    if (this.keys != null && this.keys.size() > 0) {
+      StringJoiner joiner = new StringJoiner(KEY_DELIMITER);
+      for (String keyPart : keys) {
+        Optional value = AvroUtils.getFieldValue(record, keyPart);
+        if (value.isPresent()) {
+          joiner.add(value.get().toString());
+        } else {
+          log.info("{} not found in the GobblinTrackingEvent. Setting key to 
{}", keyPart, key);
+          return key;
+        }
+      }
+
+      key = joiner.toString();
+    }
+
+    return key;
+  }
+
+  public static class Builder extends EventReporter.Builder<Builder> {
+
+    protected String brokers;
+    protected String topic;
+    protected Optional<Config> config = Optional.absent();
+    protected Optional<Map<String, String>> namespaceOverride = 
Optional.absent();
+
+    public Builder(MetricContext context) {
+      super(context);
+    }
+
+    @Override
+    protected Builder self() {
+      return this;
+    }
+
+    /**
+     * Set additional configuration.
+     */
+    public Builder withConfig(Config config) {
+      this.config = Optional.of(config);
+      return self();
+    }
+
+    public Builder namespaceOverride(Optional<Map<String, String>> 
namespaceOverride) {
+      this.namespaceOverride = namespaceOverride;
+      return self();
+    }
+
+    public KeyValueEventObjectReporter build(String brokers, String topic) {
+      this.brokers = brokers;
+      this.topic = topic;
+      return new KeyValueEventObjectReporter(this);
+    }
+  }
+}
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporter.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporter.java
new file mode 100644
index 0000000..96372c0
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporter.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.StringJoiner;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.MetricReport;
+import org.apache.gobblin.metrics.kafka.PusherUtils;
+import org.apache.gobblin.metrics.reporter.MetricReportReporter;
+import org.apache.gobblin.util.AvroUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * This is a raw metric (MetricReport) key value reporter that reports metrics 
as GenericRecords without serialization
+ * Configuration for this reporter start with the prefix "metrics.reporting"
+ */
+@Slf4j
+public class KeyValueMetricObjectReporter extends MetricReportReporter {
+
+  private static final String PUSHER_CONFIG = "pusherConfig";
+  private static final String PUSHER_CLASS = "pusherClass";
+  private static final String PUSHER_KEYS = "pusherKeys";
+  private static final String KEY_DELIMITER = ",";
+  private static final String KEY_SIZE_KEY = "keySize";
+
+  private List<String> keys;
+  protected final String randomKey;
+  protected KeyValuePusher pusher;
+  private Optional<Map<String, String>> namespaceOverride;
+  protected final String topic;
+
+  public KeyValueMetricObjectReporter(Builder builder, Config config) {
+    super(builder, config);
+
+    this.topic = builder.topic;
+    this.namespaceOverride = builder.namespaceOverride;
+
+    Config pusherConfig = ConfigUtils.getConfigOrEmpty(config, 
PUSHER_CONFIG).withFallback(config);
+    String pusherClassName =
+        ConfigUtils.getString(config, PUSHER_CLASS, 
PusherUtils.DEFAULT_KEY_VALUE_PUSHER_CLASS_NAME);
+    this.pusher = (KeyValuePusher) PusherUtils
+        .getPusher(pusherClassName, builder.brokers, builder.topic, 
Optional.of(pusherConfig));
+    this.closer.register(this.pusher);
+
+    randomKey = String.valueOf(
+        new Random().nextInt(ConfigUtils.getInt(config, KEY_SIZE_KEY, 
ConfigurationKeys.DEFAULT_REPORTER_KEY_SIZE)));
+    if (config.hasPath(PUSHER_KEYS)) {
+      List<String> keys = 
Splitter.on(",").omitEmptyStrings().trimResults().splitToList(config.getString(PUSHER_KEYS));
+      this.keys = keys;
+    } else {
+      log.info(
+          "Key not assigned from config. Please set it with property {} Using 
randomly generated number {} as key ",
+          ConfigurationKeys.METRICS_REPORTING_PUSHERKEYS, randomKey);
+    }
+  }
+
+  @Override
+  protected void emitReport(MetricReport report) {
+    GenericRecord record = AvroUtils.overrideNameAndNamespace(report, 
this.topic, this.namespaceOverride);
+    
this.pusher.pushKeyValueMessages(Lists.newArrayList(Pair.of(buildKey(record), 
record)));
+  }
+
+  private String buildKey(GenericRecord report) {
+
+    String key = randomKey;
+    if (this.keys != null && this.keys.size() > 0) {
+
+      StringJoiner joiner = new StringJoiner(KEY_DELIMITER);
+      for (String keyPart : keys) {
+        Optional value = AvroUtils.getFieldValue(report, keyPart);
+        if (value.isPresent()) {
+          joiner.add(value.get().toString());
+        } else {
+          log.error("{} not found in the MetricReport. Setting key to {}", 
keyPart, key);
+          return key;
+        }
+      }
+
+      key = joiner.toString();
+    }
+
+    return key;
+  }
+
+  public static class Builder extends MetricReportReporter.Builder<Builder> {
+    protected String brokers;
+    protected String topic;
+    protected Optional<Map<String, String>> namespaceOverride = 
Optional.absent();
+
+    public Builder namespaceOverride(Optional<Map<String, String>> 
namespaceOverride) {
+      this.namespaceOverride = namespaceOverride;
+      return self();
+    }
+
+    public KeyValueMetricObjectReporter build(String brokers, String topic, 
Config config)
+        throws IOException {
+      this.brokers = brokers;
+      this.topic = topic;
+      return new KeyValueMetricObjectReporter(this, config);
+    }
+
+    @Override
+    protected Builder self() {
+      return this;
+    }
+  }
+}
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValuePusher.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValuePusher.java
new file mode 100644
index 0000000..330bfc5
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValuePusher.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.util.List;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.apache.gobblin.metrics.kafka.Pusher;
+
+
+public interface KeyValuePusher<K, V> extends Pusher<V> {
+
+  void pushKeyValueMessages(List<Pair<K, V>> messages);
+}
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
index 3e861de..e1635ea 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
@@ -29,6 +29,8 @@ import org.testng.annotations.Test;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 
+import org.apache.gobblin.metrics.reporter.KeyValuePusher;
+
 
 @Test
 public class LoggingPusherTest {
@@ -40,22 +42,30 @@ public class LoggingPusherTest {
     Logger logger = LogManager.getLogger(LoggingPusher.class.getName());
     logger.addAppender(testAppender);
 
-    LoggingPusher<String> loggingPusher = new LoggingPusher<String>("broker", 
"topic", Optional.absent());
+    KeyValuePusher<String, String> loggingPusher =
+        new LoggingPusher<String, String>("broker", "topic", 
Optional.absent());
 
     loggingPusher.pushMessages(ImmutableList.of("message1", "message2"));
+    
loggingPusher.pushKeyValueMessages(ImmutableList.of(org.apache.commons.lang3.tuple.Pair.of("key",
 "message3")));
 
-    Assert.assertEquals(testAppender.events.size(), 2);
+    Assert.assertEquals(testAppender.events.size(), 3);
     Assert.assertEquals(testAppender.events.get(0).getRenderedMessage(), 
"Pushing to broker:topic: message1");
     Assert.assertEquals(testAppender.events.get(1).getRenderedMessage(), 
"Pushing to broker:topic: message2");
+    Assert.assertEquals(testAppender.events.get(2).getRenderedMessage(), 
"Pushing to broker:topic: key - message3");
 
     logger.removeAppender(testAppender);
   }
 
-
   private class TestAppender extends AppenderSkeleton {
     List<LoggingEvent> events = new ArrayList<LoggingEvent>();
-    public void close() {}
-    public boolean requiresLayout() {return false;}
+
+    public void close() {
+    }
+
+    public boolean requiresLayout() {
+      return false;
+    }
+
     @Override
     protected void append(LoggingEvent event) {
       events.add(event);
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporterTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporterTest.java
new file mode 100644
index 0000000..a02ce3f
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporterTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.tuple.Pair;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Maps;
+
+import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class KeyValueEventObjectReporterTest extends 
KeyValueEventObjectReporter {
+
+  public KeyValueEventObjectReporterTest(Builder builder) {
+    super(builder);
+  }
+
+  public MockKeyValuePusher getPusher() {
+    return (MockKeyValuePusher) pusher;
+  }
+
+  public static class Builder extends KeyValueEventObjectReporter.Builder {
+
+    protected Builder(MetricContext context) {
+      super(context);
+    }
+
+    public KeyValueEventObjectReporterTest build(String brokers, String topic) 
{
+      this.brokers = brokers;
+      this.topic = topic;
+      return new KeyValueEventObjectReporterTest(this);
+    }
+  }
+
+  /**
+   * Get builder for KeyValueEventObjectReporter
+   * @return KeyValueEventObjectReporter builder
+   */
+  public static KeyValueEventObjectReporterTest.Builder 
getBuilder(MetricContext context, Properties props) {
+    KeyValueEventObjectReporterTest.Builder builder = new 
KeyValueEventObjectReporterTest.Builder(context);
+    
builder.namespaceOverride(KafkaAvroReporterUtil.extractOverrideNamespace(props))
+        .withConfig(ConfigUtils.propertiesToConfig(props));
+    return builder;
+  }
+
+  @Test
+  public static void testKafkaKeyValueEventObjectReporter()
+      throws IOException {
+    MetricContext context = MetricContext.builder("context").build();
+    String namespace = "org.apache.gobblin.metrics:gobblin.metrics.test";
+
+    Properties properties = new Properties();
+    
properties.put(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_OVERRIDE_NAMESPACE,
 namespace);
+    properties.put("pusherClass", 
"org.apache.gobblin.metrics.reporter.MockKeyValuePusher");
+
+    KeyValueEventObjectReporterTest reporter = getBuilder(context, 
properties).build("localhost:0000", "topic");
+
+    String eventName = "testEvent";
+
+    GobblinTrackingEvent event = new GobblinTrackingEvent();
+    event.setName(eventName);
+    event.setNamespace(namespace);
+    Map<String, String> metadata = Maps.newHashMap();
+    event.setMetadata(metadata);
+    context.submitEvent(event);
+
+    try {
+      Thread.sleep(100);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    reporter.report();
+
+    MockKeyValuePusher pusher = reporter.getPusher();
+    Pair<String, GenericRecord> retrievedEvent = 
nextKVEvent(pusher.messageIterator());
+
+    Assert.assertEquals(retrievedEvent.getValue().get("namespace"), namespace);
+    Assert.assertEquals(retrievedEvent.getValue().get("name"), eventName);
+    int partition = Integer.parseInt(retrievedEvent.getKey());
+    Assert.assertTrue((0 <= partition && partition <= 99));
+  }
+
+  private static Pair<String, GenericRecord> nextKVEvent(Iterator<Pair<String, 
GenericRecord>> it) {
+    Assert.assertTrue(it.hasNext());
+    Pair<String, GenericRecord> event = it.next();
+    return Pair.of(event.getKey(), event.getValue());
+  }
+}
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporterTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporterTest.java
new file mode 100644
index 0000000..32b9620
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporterTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.tuple.Pair;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class KeyValueMetricObjectReporterTest extends 
KeyValueMetricObjectReporter {
+
+  private static final String TOPIC = 
KeyValueMetricObjectReporterTest.class.getSimpleName();
+
+  public KeyValueMetricObjectReporterTest(Builder builder, Config config) {
+    super(builder, config);
+  }
+
+  public MockKeyValuePusher getPusher() {
+    return (MockKeyValuePusher) pusher;
+  }
+
+  public static class Builder extends KeyValueMetricObjectReporter.Builder {
+
+    public KeyValueMetricObjectReporterTest build(String brokers, String 
topic, Config config)
+        throws IOException {
+      this.brokers = brokers;
+      this.topic = topic;
+      return new KeyValueMetricObjectReporterTest(this, config);
+    }
+  }
+
+  /**
+   * Get builder for KeyValueMetricObjectReporter
+   * @return KeyValueMetricObjectReporter builder
+   */
+  public static KeyValueMetricObjectReporterTest.Builder getBuilder(Properties 
props) {
+    KeyValueMetricObjectReporterTest.Builder builder = new 
KeyValueMetricObjectReporterTest.Builder();
+    
builder.namespaceOverride(KafkaAvroReporterUtil.extractOverrideNamespace(props));
+    return builder;
+  }
+
+  @Test
+  public static void testKafkaKeyValueMetricObjectReporter()
+      throws IOException {
+    MetricContext metricContext = MetricContext.builder("context").build();
+
+    String namespace = "org.apache.gobblin.metrics:gobblin.metrics.test";
+    String name = TOPIC;
+    Properties properties = new Properties();
+    
properties.put(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_OVERRIDE_NAMESPACE,
 namespace);
+    properties.put("pusherClass", 
"org.apache.gobblin.metrics.reporter.MockKeyValuePusher");
+
+    KeyValueMetricObjectReporterTest reporter =
+        getBuilder(properties).build("localhost:0000", TOPIC, 
ConfigUtils.propertiesToConfig(properties));
+
+    reporter.report(metricContext);
+
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    MockKeyValuePusher pusher = reporter.getPusher();
+    Pair<String, GenericRecord> retrievedEvent = 
nextKVReport(pusher.messageIterator());
+
+    Assert.assertEquals(retrievedEvent.getValue().getSchema().getNamespace(), 
"gobblin.metrics.test");
+    Assert.assertEquals(retrievedEvent.getValue().getSchema().getName(), name);
+    int partition = Integer.parseInt(retrievedEvent.getKey());
+    Assert.assertTrue((0 <= partition && partition <= 99));
+
+    reporter.close();
+  }
+
+  /**
+   * Extract the next metric from the Kafka iterator
+   * Assumes existence of the metric has already been checked.
+   * @param it Kafka ConsumerIterator
+   * @return next metric in the stream
+   * @throws IOException
+   */
+  protected static Pair<String, GenericRecord> 
nextKVReport(Iterator<Pair<String, GenericRecord>> it) {
+    Assert.assertTrue(it.hasNext());
+    return it.next();
+  }
+}
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKeyValuePusher.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKeyValuePusher.java
new file mode 100644
index 0000000..959bf01
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKeyValuePusher.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.collect.Queues;
+
+
+/**
+ * Mock instance of {@link org.apache.gobblin.metrics.reporter.KeyValuePusher} 
used to test
+ * {@link KeyValueMetricObjectReporter}
+ * {@link KeyValueEventObjectReporter}
+ */
+
+public class MockKeyValuePusher<K, V> implements KeyValuePusher<K, V> {
+
+  Queue<Pair<K, V>> messages = Queues.newLinkedBlockingQueue();
+
+  @Override
+  public void pushKeyValueMessages(List<Pair<K, V>> messages) {
+    this.messages.clear();
+    this.messages.addAll(messages);
+  }
+
+  @Override
+  public void pushMessages(List<V> messages) {
+
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+
+  }
+
+  public Iterator<Pair<K, V>> messageIterator() {
+    return this.messages.iterator();
+  }
+}
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index 8f2c653..fccd46e 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -78,10 +78,12 @@ import com.google.common.collect.Sets;
 import com.google.common.io.Closer;
 
 import javax.annotation.Nonnull;
+import lombok.extern.slf4j.Slf4j;
 
 /**
  * A Utils class for dealing with Avro objects
  */
+@Slf4j
 public class AvroUtils {
 
   private static final Logger LOG = LoggerFactory.getLogger(AvroUtils.class);
@@ -281,7 +283,7 @@ public class AvroUtils {
       } else if (data instanceof List) {
         val = getObjectFromArray((List)data, 
Integer.parseInt(pathList.get(field)));
       } else {
-        val = ((Record)data).get(pathList.get(field));
+        val = ((GenericRecord)data).get(pathList.get(field));
       }
 
       if (val != null) {
@@ -313,7 +315,7 @@ public class AvroUtils {
       return;
     }
 
-    AvroUtils.getFieldHelper(retVal, ((Record) data).get(pathList.get(field)), 
pathList, ++field);
+    AvroUtils.getFieldHelper(retVal, ((GenericRecord) 
data).get(pathList.get(field)), pathList, ++field);
     return;
   }
 
@@ -352,7 +354,12 @@ public class AvroUtils {
 
   private static Object getObjectFromMap(Map map, String key) {
     Utf8 utf8Key = new Utf8(key);
-    return map.get(utf8Key);
+    Object value = map.get(utf8Key);
+    if (value == null) {
+      return map.get(key);
+    }
+
+    return value;
   }
 
   /**
@@ -843,7 +850,6 @@ public class AvroUtils {
     return reader.read(null, decoder);
   }
 
-
   /**
    * Decorate the {@link Schema} for a record with additional {@link Field}s.
    * @param inputSchema: must be a {@link Record} schema.
@@ -875,11 +881,33 @@ public class AvroUtils {
   public static GenericRecord decorateRecord(GenericRecord inputRecord, 
@Nonnull Map<String, Object> fieldMap,
           Schema outputSchema) {
     GenericRecord outputRecord = new GenericData.Record(outputSchema);
-    inputRecord.getSchema().getFields().forEach(
-            f -> outputRecord.put(f.name(), inputRecord.get(f.name()))
-    );
+    inputRecord.getSchema().getFields().forEach(f -> 
outputRecord.put(f.name(), inputRecord.get(f.name())));
     fieldMap.forEach((key, value) -> outputRecord.put(key, value));
     return outputRecord;
   }
 
+  /**
+   * Given a generic record, Override the name and namespace of the schema and 
return a new generic record
+   * @param input input record who's name and namespace need to be overridden
+   * @param nameOverride new name for the record schema
+   * @param namespaceOverride Optional map containing namespace overrides
+   * @return an output record with overriden name and possibly namespace
+   */
+  public static GenericRecord overrideNameAndNamespace(GenericRecord input, 
String nameOverride, Optional<Map<String, String>> namespaceOverride) {
+
+    GenericRecord output = input;
+    Schema newSchema = switchName(input.getSchema(), nameOverride);
+    if(namespaceOverride.isPresent()) {
+      newSchema = switchNamespace(newSchema, namespaceOverride.get());
+    }
+
+    try {
+      output = convertRecordSchema(output, newSchema);
+    } catch (Exception e){
+      log.error("Unable to generate generic data record", e);
+    }
+
+    return output;
+  }
+
 }
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
index 94ce650..8031c7e 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
@@ -49,6 +49,7 @@ import org.codehaus.jackson.map.ObjectMapper;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
 
 
@@ -482,5 +483,35 @@ public class AvroUtilsTest {
     Assert.assertEquals(deserialized.get("metadata"), metadataRecord);
   }
 
+  @Test
+  public void overrideNameAndNamespaceTest() throws IOException{
+
+    String inputName = "input_name";
+    String inputNamespace = "input_namespace";
+    String outputName = "output_name";
+    String outputNamespace = "output_namespace";
+
+    Schema inputRecordSchema = 
SchemaBuilder.record(inputName).namespace(inputNamespace).fields()
+        .name("integer1")
+        .type().intBuilder().endInt().noDefault()
+        .endRecord();
+
+    GenericRecord inputRecord = new GenericData.Record(inputRecordSchema);
+    inputRecord.put("integer1", 10);
+
+    GenericRecord outputRecord = 
AvroUtils.overrideNameAndNamespace(inputRecord, outputName, 
Optional.of(Collections.EMPTY_MAP));
+    Assert.assertEquals(outputRecord.getSchema().getName(), outputName);
+    Assert.assertEquals(outputRecord.getSchema().getNamespace(), 
inputNamespace);
+    Assert.assertEquals(outputRecord.get("integer1"), 10);
+
+    Map<String,String> namespaceOverrideMap = new HashMap<>();
+    namespaceOverrideMap.put(inputNamespace,outputNamespace);
+
+    outputRecord = AvroUtils.overrideNameAndNamespace(inputRecord, outputName, 
Optional.of(namespaceOverrideMap));
+    Assert.assertEquals(outputRecord.getSchema().getName(), outputName);
+    Assert.assertEquals(outputRecord.getSchema().getNamespace(), 
outputNamespace);
+    Assert.assertEquals(outputRecord.get("integer1"), 10);
+
+  }
 
 }

Reply via email to