Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 568f0f4d8 -> b2dd7ddfc


[GOBBLIN-642] Implement KafkaAvroEventKeyValueReporter

Closes #2511 from arjun4084346/keyvaluerreporter


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b2dd7ddf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b2dd7ddf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b2dd7ddf

Branch: refs/heads/master
Commit: b2dd7ddfc4561fd42f439a6d7357daf5e7fb5f1b
Parents: 568f0f4
Author: Arjun <[email protected]>
Authored: Fri Dec 7 08:59:53 2018 -0800
Committer: Hung Tran <[email protected]>
Committed: Fri Dec 7 08:59:53 2018 -0800

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |   2 +
 .../gobblin/metrics/KafkaReportingFormats.java  |  26 +++-
 .../kafka/KafkaAvroEventKeyValueReporter.java   | 112 ++++++++++++++++
 .../kafka/KafkaEventKeyValueReporter.java       | 133 +++++++++++++++++++
 .../metrics/kafka/KafkaEventReporter.java       |   2 +-
 .../metrics/kafka/KafkaReporterFactory.java     |  30 ++++-
 .../gobblin/metrics/kafka/PusherUtils.java      |   1 +
 .../KafkaAvroEventKeyValueReporterTest.java     | 101 ++++++++++++++
 .../reporter/MockKafkaKeyValuePusher.java       |  49 +++++++
 9 files changed, 446 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b2dd7ddf/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 510f7cf..34b227e 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -681,6 +681,8 @@ public class ConfigurationKeys {
   public static final String DEFAULT_METRICS_REPORTING_KAFKA_ENABLED = 
Boolean.toString(false);
   public static final String DEFAULT_METRICS_REPORTING_KAFKA_REPORTER_CLASS = 
"org.apache.gobblin.metrics.kafka.KafkaReporterFactory";
   public static final String METRICS_REPORTING_KAFKA_FORMAT = 
METRICS_CONFIGURATIONS_PREFIX + "reporting.kafka.format";
+  public static final String METRICS_REPORTING_EVENTS_KAFKA_FORMAT = 
METRICS_CONFIGURATIONS_PREFIX + "reporting.events.kafka.format";
+  public static final String METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS = 
METRICS_CONFIGURATIONS_PREFIX + "reporting.events.kafkaPusherKeys";
   public static final String DEFAULT_METRICS_REPORTING_KAFKA_FORMAT = "json";
   public static final String METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY =
       METRICS_CONFIGURATIONS_PREFIX + 
"reporting.kafka.avro.use.schema.registry";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b2dd7ddf/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
index a0af52a..e9e981d 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
@@ -17,9 +17,13 @@
 
 package org.apache.gobblin.metrics;
 
+import java.util.List;
 import java.util.Properties;
 
+import com.google.common.base.Splitter;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
 import org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter;
 import org.apache.gobblin.metrics.kafka.KafkaAvroReporter;
 import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
@@ -33,6 +37,7 @@ import org.apache.gobblin.metrics.kafka.KafkaReporter;
 public enum KafkaReportingFormats {
 
   AVRO,
+  AVRO_KEY_VALUE,
   JSON;
 
   /**
@@ -67,14 +72,29 @@ public enum KafkaReportingFormats {
   public KafkaEventReporter.Builder<?> eventReporterBuilder(MetricContext 
context, Properties properties) {
     switch (this) {
       case AVRO:
-        KafkaAvroEventReporter.Builder<?> builder = 
KafkaAvroEventReporter.Factory.forContext(context);
+        KafkaAvroEventReporter.Builder<?> kafkaAvroEventReporterBuilder = 
KafkaAvroEventReporter.Factory.forContext(context);
         if 
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
             
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
-          builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+          kafkaAvroEventReporterBuilder.withSchemaRegistry(new 
KafkaAvroSchemaRegistry(properties));
         }
-        return builder;
+        return kafkaAvroEventReporterBuilder;
+
+      case AVRO_KEY_VALUE:
+        KafkaAvroEventKeyValueReporter.Builder<?> 
kafkaAvroEventKeyValueReporterBuilder = 
KafkaAvroEventKeyValueReporter.Factory.forContext(context);
+        if 
(properties.containsKey(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS))
 {
+          List<String> keys = Splitter.on(",").omitEmptyStrings().trimResults()
+              
.splitToList(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS));
+          kafkaAvroEventKeyValueReporterBuilder.withKeys(keys);
+        }
+        if 
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+            
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
+          kafkaAvroEventKeyValueReporterBuilder.withSchemaRegistry(new 
KafkaAvroSchemaRegistry(properties));
+        }
+        return kafkaAvroEventKeyValueReporterBuilder;
+
       case JSON:
         return KafkaEventReporter.Factory.forContext(context);
+
       default:
         // This should never happen.
         throw new IllegalArgumentException("KafkaReportingFormat not 
recognized.");

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b2dd7ddf/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventKeyValueReporter.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventKeyValueReporter.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventKeyValueReporter.java
new file mode 100644
index 0000000..bf6a07e
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventKeyValueReporter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
+import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
+import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
+import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.avro.Schema;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+
+
+/**
+ * Implement of {@link KafkaEventKeyValueReporter} for avro records.
+ */
+@Slf4j
+public class KafkaAvroEventKeyValueReporter extends KafkaEventKeyValueReporter 
{
+
+  protected KafkaAvroEventKeyValueReporter(Builder<?> builder) throws 
IOException {
+    super(builder);
+    if(builder.registry.isPresent()) {
+      Schema schema =
+          new 
Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc"));
+      this.serializer.setSchemaVersionWriter(new 
SchemaRegistryVersionWriter(builder.registry.get(), builder.topic,
+          Optional.of(schema)));
+    }
+  }
+
+  @Override
+  protected AvroSerializer<GobblinTrackingEvent> 
createSerializer(SchemaVersionWriter schemaVersionWriter)
+      throws IOException {
+    return new AvroBinarySerializer<>(GobblinTrackingEvent.SCHEMA$, 
schemaVersionWriter);
+  }
+
+  private static class BuilderImpl extends Builder<BuilderImpl> {
+    private BuilderImpl(MetricContext context) {
+      super(context);
+    }
+
+    @Override
+    protected BuilderImpl self() {
+      return this;
+    }
+  }
+
+  public static abstract class Factory {
+    /**
+     * Returns a new {@link Builder} for {@link 
KafkaAvroEventKeyValueReporter}.
+     *
+     * @param context the {@link MetricContext} to report
+     * @return KafkaAvroReporter builder
+     */
+    public static BuilderImpl forContext(MetricContext context) {
+      return new BuilderImpl(context);
+    }
+  }
+
+  /**
+   * Builder for {@link KafkaAvroEventKeyValueReporter}.
+   * Defaults to no filter, reporting rates in seconds and times in 
milliseconds.
+   */
+  public static abstract class Builder<T extends Builder<T>> extends 
KafkaEventKeyValueReporter.Builder<T> {
+    private Optional<KafkaAvroSchemaRegistry> registry = Optional.absent();
+
+    private Builder(MetricContext context) {
+      super(context);
+    }
+
+    public T withSchemaRegistry(KafkaAvroSchemaRegistry registry) {
+      this.registry = Optional.of(registry);
+      return self();
+    }
+
+    /**
+     * Builds and returns {@link KafkaAvroEventReporter}.
+     *
+     * @param brokers string of Kafka brokers
+     * @param topic topic to send metrics to
+     * @return KafkaAvroReporter
+     */
+    public KafkaAvroEventKeyValueReporter build(String brokers, String topic) 
throws IOException {
+      this.brokers = brokers;
+      this.topic = topic;
+      return new KafkaAvroEventKeyValueReporter(this);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b2dd7ddf/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java
new file mode 100644
index 0000000..cfe0dbc
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Queue;
+import org.apache.commons.lang3.tuple.Pair;
+import com.google.common.base.Splitter;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+
+/**
+ * {@link org.apache.gobblin.metrics.reporter.EventReporter} that emits events 
to Kafka as serialized Avro records with a key.
+ * Key for these kafka messages is obtained from values of properties provided 
via {@link ConfigurationKeys#METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS}.
+ * If the GobblinTrackingEvent does not contain any of the required property, 
key is set to null. In that case, this reporter
+ * will act like a {@link 
org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter}
+ */
+@Slf4j
+public class KafkaEventKeyValueReporter extends KafkaEventReporter {
+  private Optional<List<String>> keys = Optional.absent();
+
+  protected KafkaEventKeyValueReporter(Builder<?> builder) throws IOException {
+    super(builder);
+    if (builder.keys.size() > 0) {
+      this.keys = Optional.of(builder.keys);
+    } else {
+      log.warn("Cannot find keys for key-value reporter. Please set it with 
property {}",
+          ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS);
+    }
+  }
+
+  @Override
+  public void reportEventQueue(Queue<GobblinTrackingEvent> queue) {
+    GobblinTrackingEvent nextEvent;
+    List<Pair<String, byte[]>> events = Lists.newArrayList();
+
+    while(null != (nextEvent = queue.poll())) {
+      StringBuilder sb = new StringBuilder();
+      String key = null;
+      if (keys.isPresent()) {
+        for (String keyPart : keys.get()) {
+          if (nextEvent.getMetadata().containsKey(keyPart)) {
+            sb.append(nextEvent.getMetadata().get(keyPart));
+          } else {
+            log.error("{} not found in the GobblinTrackingEvent. Setting key 
to null.", keyPart);
+            sb = null;
+            break;
+          }
+        }
+        key = (sb == null) ? null : sb.toString();
+      }
+      events.add(Pair.of(key, this.serializer.serializeRecord(nextEvent)));
+    }
+
+    if (!events.isEmpty()) {
+      this.kafkaPusher.pushMessages(events);
+    }
+  }
+
+  private static class BuilderImpl extends Builder<BuilderImpl> {
+    private BuilderImpl(MetricContext context) {
+      super(context);
+    }
+
+    @Override
+    protected BuilderImpl self() {
+      return this;
+    }
+  }
+
+  public static abstract class Factory {
+    /**
+     * Returns a new {@link Builder} for {@link KafkaEventKeyValueReporter}.
+     *
+     * @param context the {@link MetricContext} to report
+     * @return KafkaAvroReporter builder
+     */
+    public static BuilderImpl forContext(MetricContext context) {
+      return new BuilderImpl(context);
+    }
+  }
+
+  /**
+   * Builder for {@link KafkaEventKeyValueReporter}.
+   * Defaults to no filter, reporting rates in seconds and times in 
milliseconds.
+   */
+  public static abstract class Builder<T extends Builder<T>> extends 
KafkaEventReporter.Builder<T> {
+    private List<String> keys = Lists.newArrayList();
+
+    protected Builder(MetricContext context) {
+      super(context);
+    }
+
+    public T withKeys(List<String> keys) {
+      this.keys = keys;
+      return self();
+    }
+
+    /**
+     * Builds and returns {@link KafkaAvroEventReporter}.
+     *
+     * @param brokers string of Kafka brokers
+     * @param topic topic to send metrics to
+     * @return KafkaAvroReporter
+     */
+    public KafkaEventKeyValueReporter build(String brokers, String topic) 
throws IOException {
+      this.brokers = brokers;
+      this.topic = topic;
+      return new KafkaEventKeyValueReporter(this);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b2dd7ddf/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
index 804d909..8c5d3be 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
@@ -40,7 +40,7 @@ import 
org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
 public class KafkaEventReporter extends EventReporter {
 
   protected final AvroSerializer<GobblinTrackingEvent> serializer;
-  private final Pusher kafkaPusher;
+  protected final Pusher kafkaPusher;
 
   public KafkaEventReporter(Builder<?> builder) throws IOException {
     super(builder);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b2dd7ddf/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
index 4dcf717..be95258 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
@@ -68,14 +68,14 @@ public class KafkaReporterFactory implements 
CustomCodahaleReporterFactory {
 
     String brokers = 
properties.getProperty(ConfigurationKeys.METRICS_KAFKA_BROKERS);
 
-    String reportingFormat = 
properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_FORMAT,
+    String metricsReportingFormat = 
properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_FORMAT,
         ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_FORMAT);
 
     KafkaReportingFormats formatEnum;
     try {
-      formatEnum = 
KafkaReportingFormats.valueOf(reportingFormat.toUpperCase());
+      formatEnum = 
KafkaReportingFormats.valueOf(metricsReportingFormat.toUpperCase());
     } catch (IllegalArgumentException exception) {
-      log.warn("Kafka metrics reporting format " + reportingFormat +
+      log.warn("Kafka metrics reporting format " + metricsReportingFormat +
           " not recognized. Will report in json format.", exception);
       formatEnum = KafkaReportingFormats.JSON;
     }
@@ -89,9 +89,24 @@ public class KafkaReporterFactory implements 
CustomCodahaleReporterFactory {
       }
     }
 
+    KafkaReportingFormats eventFormatEnum;
+    if 
(properties.containsKey(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKA_FORMAT))
 {
+      String eventsReportingFormat = 
properties.getProperty(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKA_FORMAT,
+          ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_FORMAT);
+      try {
+        eventFormatEnum = 
KafkaReportingFormats.valueOf(eventsReportingFormat.toUpperCase());
+      } catch (IllegalArgumentException exception) {
+        log.warn("Kafka events reporting format " + eventsReportingFormat + " 
not recognized. Will report in json format.",
+            exception);
+        eventFormatEnum = KafkaReportingFormats.JSON;
+      }
+    } else {
+      eventFormatEnum = formatEnum;
+    }
+
     if (eventsEnabled) {
       try {
-        KafkaEventReporter.Builder<?> builder = 
formatEnum.eventReporterBuilder(RootMetricContext.get(),
+        KafkaEventReporter.Builder<?> builder = 
eventFormatEnum.eventReporterBuilder(RootMetricContext.get(),
             properties);
 
         Config allConfig = ConfigUtils.propertiesToConfig(properties);
@@ -103,8 +118,11 @@ public class KafkaReporterFactory implements 
CustomCodahaleReporterFactory {
 
         builder.withConfig(kafkaConfig);
 
-        
builder.withPusherClassName(properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY,
-            PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME));
+        String pusherClassName = 
properties.containsKey(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS)
+            ? 
properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS)
+            : properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY,
+                PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME);
+        builder.withPusherClassName(pusherClassName);
 
         return builder.build(brokers, eventsTopic.or(defaultTopic).get());
       } catch (IOException exception) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b2dd7ddf/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java
index a76c750..0d9b2a5 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java
@@ -24,6 +24,7 @@ import 
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 public class PusherUtils {
   public static final String METRICS_REPORTING_KAFKA_CONFIG_PREFIX = 
"metrics.reporting.kafka.config";
   public static final String KAFKA_PUSHER_CLASS_NAME_KEY = 
"metrics.reporting.kafkaPusherClass";
+  public static final String KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS = 
"metrics.reporting.events.kafkaPusherClass";
   public static final String DEFAULT_KAFKA_PUSHER_CLASS_NAME = 
"org.apache.gobblin.metrics.kafka.KafkaPusher";
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b2dd7ddf/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventKeyValueReporterTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventKeyValueReporterTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventKeyValueReporterTest.java
new file mode 100644
index 0000000..d4e5939
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventKeyValueReporterTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
+import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
+import org.apache.gobblin.metrics.kafka.Pusher;
+import org.apache.gobblin.metrics.reporter.util.EventUtils;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.collect.Lists;
+
+
+public class KafkaAvroEventKeyValueReporterTest extends 
KafkaAvroEventReporterTest {
+
+  @Override
+  public KafkaEventReporter.Builder<? extends KafkaEventReporter.Builder> 
getBuilder(MetricContext context,
+                                                                               
      Pusher pusher) {
+    KafkaAvroEventKeyValueReporter.Builder<?> builder = 
KafkaAvroEventKeyValueReporter.Factory.forContext(context);
+    return builder.withKafkaPusher(pusher).withKeys(Lists.newArrayList("k1", 
"k2", "k3"));
+  }
+
+  private Pair<String, GobblinTrackingEvent> nextKVEvent(Iterator<Pair<String, 
byte[]>> it) throws IOException {
+    Assert.assertTrue(it.hasNext());
+    Pair<String, byte[]> event = it.next();
+    return Pair.of(event.getKey(), 
EventUtils.deserializeReportFromAvroSerialization(new GobblinTrackingEvent(), 
event.getValue()));
+  }
+
+  @Test
+  public void testKafkaEventReporter() throws IOException {
+    MetricContext context = MetricContext.builder("context").build();
+
+    MockKafkaKeyValuePusher pusher = new MockKafkaKeyValuePusher();
+    KafkaEventReporter kafkaReporter = getBuilder(context, 
pusher).build("localhost:0000", "topic");
+
+    String namespace = "gobblin.metrics.test";
+    String eventName = "testEvent";
+
+    GobblinTrackingEvent event = new GobblinTrackingEvent();
+    event.setName(eventName);
+    event.setNamespace(namespace);
+    Map<String, String> metadata = Maps.newHashMap();
+    metadata.put("m1", "v1");
+    metadata.put("m2", null);
+    event.setMetadata(metadata);
+    context.submitEvent(event);
+
+    kafkaReporter.report();
+
+    Pair<String, GobblinTrackingEvent> retrievedEvent = 
nextKVEvent(pusher.messageIterator());
+    Assert.assertNull(retrievedEvent.getKey());
+
+    event = new GobblinTrackingEvent();
+    event.setName(eventName);
+    event.setNamespace(namespace);
+    metadata = Maps.newHashMap();
+    metadata.put("k1", "v1");
+    metadata.put("k2", "v2");
+    metadata.put("k3", "v3");
+    event.setMetadata(metadata);
+    context.submitEvent(event);
+
+    kafkaReporter.report();
+
+    retrievedEvent = nextKVEvent(pusher.messageIterator());
+    Assert.assertEquals(retrievedEvent.getKey(), "v1v2v3");
+  }
+
+  @Test (enabled=false)
+  public void testTagInjection() throws IOException {
+    // This test is not applicable for testing KafkaAvroEventKeyValueReporter
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b2dd7ddf/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaKeyValuePusher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaKeyValuePusher.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaKeyValuePusher.java
new file mode 100644
index 0000000..adbaf57
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaKeyValuePusher.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import com.google.common.collect.Queues;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gobblin.metrics.kafka.Pusher;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * Mock instance of {@link org.apache.gobblin.metrics.kafka.Pusher} used to 
test {@link org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter}.
+ */
+public class MockKafkaKeyValuePusher<K, V> implements Pusher<Pair<K, V>> {
+
+  Queue<Pair<K, V>> messages = Queues.newLinkedBlockingQueue();
+
+  @Override
+  public void pushMessages(List<Pair<K, V>> messages) {
+    this.messages.clear();
+    this.messages.addAll(messages);
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+
+  public Iterator<Pair<K, V>> messageIterator() {
+    return this.messages.iterator();
+  }
+}

Reply via email to