zxcware commented on a change in pull request #2622: ETL-8363 Added random key 
generator for reporters. Added reporters fo…
URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r279067356
 
 

 ##########
 File path: 
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
 ##########
 @@ -17,87 +17,179 @@
 
 package org.apache.gobblin.metrics;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Properties;
 
+import com.codahale.metrics.ScheduledReporter;
 import com.google.common.base.Splitter;
+import com.typesafe.config.Config;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
 import org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter;
+import org.apache.gobblin.metrics.kafka.KafkaAvroMetricKeyValueReporter;
 import org.apache.gobblin.metrics.kafka.KafkaAvroReporter;
 import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
 import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
+import org.apache.gobblin.metrics.kafka.KafkaKeyValueEventObjectReporter;
+import org.apache.gobblin.metrics.kafka.KafkaKeyValueMetricObjectReporter;
 import org.apache.gobblin.metrics.kafka.KafkaReporter;
+import org.apache.gobblin.metrics.kafka.PusherUtils;
+import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
+import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
  * Kafka reporting formats enumeration.
  */
 public enum KafkaReportingFormats {
 
-  AVRO,
-  AVRO_KEY_VALUE,
-  JSON;
-
-  /**
-   * Get a {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder} for 
this reporting format.
-   *
-   * @param properties {@link Properties} containing information to build 
reporters.
-   * @return {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder}.
-   */
-  public KafkaReporter.Builder<?> metricReporterBuilder(Properties properties) 
{
-    switch (this) {
-      case AVRO:
-        KafkaAvroReporter.Builder<?> builder = 
KafkaAvroReporter.BuilderFactory.newBuilder();
-        if 
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
-            
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
-          builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
-        }
-        return builder;
-      case JSON:
-        return KafkaReporter.BuilderFactory.newBuilder();
-      default:
-        // This should never happen.
-        throw new IllegalArgumentException("KafkaReportingFormat not 
recognized.");
+  AVRO() {
+
+    @Override
+    public void buildMetricsScheduledReporter(String brokers, String topic, 
Properties properties) throws IOException {
+
+      KafkaAvroReporter.Builder<?> builder = 
KafkaAvroReporter.BuilderFactory.newBuilder();
+      if 
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+          
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
+        builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+      }
+      builder.build(brokers, topic, properties);
+
     }
-  }
 
-  /**
-   * Get a {@link org.apache.gobblin.metrics.kafka.KafkaEventReporter.Builder} 
for this reporting format.
-   * @param context {@link MetricContext} that should be reported.
-   * @param properties {@link Properties} containing information to build 
reporters.
-   * @return {@link 
org.apache.gobblin.metrics.kafka.KafkaEventReporter.Builder}.
-   */
-  public KafkaEventReporter.Builder<?> eventReporterBuilder(MetricContext 
context, Properties properties) {
-    switch (this) {
-      case AVRO:
-        KafkaAvroEventReporter.Builder<?> kafkaAvroEventReporterBuilder = 
KafkaAvroEventReporter.Factory.forContext(context);
-        if 
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
-            
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
-          kafkaAvroEventReporterBuilder.withSchemaRegistry(new 
KafkaAvroSchemaRegistry(properties));
-        }
-        return kafkaAvroEventReporterBuilder;
-
-      case AVRO_KEY_VALUE:
-        KafkaAvroEventKeyValueReporter.Builder<?> 
kafkaAvroEventKeyValueReporterBuilder = 
KafkaAvroEventKeyValueReporter.Factory.forContext(context);
-        if 
(properties.containsKey(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS))
 {
-          List<String> keys = Splitter.on(",").omitEmptyStrings().trimResults()
-              
.splitToList(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS));
-          kafkaAvroEventKeyValueReporterBuilder.withKeys(keys);
-        }
-        if 
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
-            
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
-          kafkaAvroEventKeyValueReporterBuilder.withSchemaRegistry(new 
KafkaAvroSchemaRegistry(properties));
-        }
-        return kafkaAvroEventKeyValueReporterBuilder;
-
-      case JSON:
-        return KafkaEventReporter.Factory.forContext(context);
-
-      default:
-        // This should never happen.
-        throw new IllegalArgumentException("KafkaReportingFormat not 
recognized.");
+    @Override
+    public ScheduledReporter buildEventsScheduledReporter(String brokers, 
String topic, MetricContext context, Properties properties) throws IOException {
+
+      KafkaAvroEventReporter.Builder<?> builder = 
KafkaAvroEventReporter.Factory.forContext(context);
+      if 
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+          
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
+        builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+      }
+      builder.withConfig(getEventsKafkaConfig(properties));
+      String pusherClassName = 
properties.containsKey(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS)
+          ? 
properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS)
+          : properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY,
+              PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME);
+      builder.withPusherClassName(pusherClassName);
+
+
+      return builder.build(brokers, topic);
+
+    }
+  },
+  AVRO_KEY_VALUE() {
+
+    @Override
+    public void buildMetricsScheduledReporter(String brokers, String topic, 
Properties properties) throws IOException {
+
+      KafkaAvroMetricKeyValueReporter.Builder<?> builder = 
KafkaAvroMetricKeyValueReporter.Factory.newBuilder();
+      if 
(properties.containsKey(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS))
 {
+        List<String> keys = Splitter.on(",").omitEmptyStrings().trimResults()
+            
.splitToList(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS));
+        builder.withKeys(keys);
+      }
+      if 
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+          
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
+        builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+      }
+      builder.build(brokers, topic, properties);
+
+    }
+
+    @Override
+    public ScheduledReporter buildEventsScheduledReporter(String brokers, 
String topic, MetricContext context, Properties properties) throws IOException {
+
+      KafkaAvroEventKeyValueReporter.Builder<?> builder = 
KafkaAvroEventKeyValueReporter.Factory.forContext(context);
+      if 
(properties.containsKey(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS))
 {
+        List<String> keys = Splitter.on(",").omitEmptyStrings().trimResults()
+            
.splitToList(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS));
+        builder.withKeys(keys);
+      }
+      if 
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+          
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
+        builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+      }
+      builder.withConfig(getEventsKafkaConfig(properties));
+      String pusherClassName = 
properties.containsKey(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS)
+          ? 
properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS)
+          : properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY,
+              PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME);
+      builder.withPusherClassName(pusherClassName);
+
+      return builder.build(brokers, topic);
+
     }
+  },
+  JSON() {
+
+    @Override
+    public void buildMetricsScheduledReporter(String brokers, String topic, 
Properties properties) throws IOException {
+      KafkaReporter.Builder builder = 
KafkaReporter.BuilderFactory.newBuilder();
+      builder.build(brokers, topic, properties);
+    }
+
+    @Override
+    public ScheduledReporter buildEventsScheduledReporter(String brokers, 
String topic, MetricContext context, Properties properties) throws IOException {
+       KafkaEventReporter.Builder builder = 
KafkaEventReporter.Factory.forContext(context);
+       builder.withConfig(getEventsKafkaConfig(properties));
+       String pusherClassName = 
properties.containsKey(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS)
+          ? 
properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS)
+          : properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY,
+              PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME);
+       builder.withPusherClassName(pusherClassName);
+       return builder.build(brokers, topic);
+    }
+  },
+  PLAIN_OBJECT() {
+
+    @Override
+    public void buildMetricsScheduledReporter(String brokers, String topic, 
Properties properties) throws IOException {
+
+      KafkaKeyValueMetricObjectReporter.Builder<?> builder = 
KafkaKeyValueMetricObjectReporter.Factory.newBuilder();
+      if 
(properties.containsKey(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS))
 {
+        List<String> keys = Splitter.on(",").omitEmptyStrings().trimResults()
+            
.splitToList(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS));
+        builder.withKeys(keys);
 
 Review comment:
   This is the tricky point. It's hard to define key in the reporter for a 
pusher that pushes the key:
   
   - Should we support just one field?
   - If we want to support multiple fields, how are they associated, 
concatenation or interpolate into a pattern?
   - Should we support a different object other than any data in the 
event/metric object?
   
   We shouldn't assume how keys are composed but provide a reasonable default 
way to compute the key. Here, we made too much assumption. I suggest you move 
key computation logic in the `report` or `sendEvent` method instead of built-in 
to the builder.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to