This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 856fbc1 [GOBBLIN-758] Added new reporters to emit MetricReport and
GobblinTrackingEvent without serializing them. Also added random key generator
for reporters.
856fbc1 is described below
commit 856fbc103e9fd144f062fb0ecaeba1f35f2f205b
Author: vbohra <[email protected]>
AuthorDate: Mon May 6 15:53:00 2019 -0700
[GOBBLIN-758] Added new reporters to emit MetricReport and
GobblinTrackingEvent without serializing them. Also added random key generator
for reporters.
Closes #2622 from vikrambohra/ETL-8675
---
.../gobblin/configuration/ConfigurationKeys.java | 8 +-
.../gobblin/metrics/KafkaReportingFormats.java | 217 +++++++++++++++------
.../metrics/kafka/KafkaReporterFactory.java | 57 +++---
.../gobblin/metrics/kafka/LoggingPusher.java | 25 ++-
.../apache/gobblin/metrics/kafka/PusherUtils.java | 5 +-
.../reporter/KeyValueEventObjectReporter.java | 161 +++++++++++++++
.../reporter/KeyValueMetricObjectReporter.java | 138 +++++++++++++
.../gobblin/metrics/reporter/KeyValuePusher.java | 30 +++
.../gobblin/metrics/kafka/LoggingPusherTest.java | 20 +-
.../reporter/KeyValueEventObjectReporterTest.java | 116 +++++++++++
.../reporter/KeyValueMetricObjectReporterTest.java | 113 +++++++++++
.../metrics/reporter/MockKeyValuePusher.java | 60 ++++++
.../java/org/apache/gobblin/util/AvroUtils.java | 42 +++-
.../org/apache/gobblin/util/AvroUtilsTest.java | 31 +++
14 files changed, 910 insertions(+), 113 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index a5b56bb..b628138 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -666,7 +666,7 @@ public class ConfigurationKeys {
public static final String METRIC_TIMER_WINDOW_SIZE_IN_MINUTES =
METRICS_CONFIGURATIONS_PREFIX + "timer.window.size.in.minutes";
public static final int DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES = 15;
public static final String METRICS_REPORTING_CONFIGURATIONS_PREFIX =
"metrics.reporting";
-
+ public static final String METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX =
METRICS_REPORTING_CONFIGURATIONS_PREFIX + ".events";
// File-based reporting
public static final String METRICS_REPORTING_FILE_ENABLED_KEY =
METRICS_CONFIGURATIONS_PREFIX + "reporting.file.enabled";
@@ -690,6 +690,7 @@ public class ConfigurationKeys {
public static final String DEFAULT_METRICS_REPORTING_KAFKA_REPORTER_CLASS =
"org.apache.gobblin.metrics.kafka.KafkaReporterFactory";
public static final String METRICS_REPORTING_KAFKA_FORMAT =
METRICS_CONFIGURATIONS_PREFIX + "reporting.kafka.format";
public static final String METRICS_REPORTING_EVENTS_KAFKA_FORMAT =
METRICS_CONFIGURATIONS_PREFIX + "reporting.events.kafka.format";
+ public static final String METRICS_REPORTING_KAFKAPUSHERKEYS =
METRICS_CONFIGURATIONS_PREFIX + "reporting.kafkaPusherKeys";
public static final String METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS =
METRICS_CONFIGURATIONS_PREFIX + "reporting.events.kafkaPusherKeys";
public static final String DEFAULT_METRICS_REPORTING_KAFKA_FORMAT = "json";
public static final String METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY =
@@ -705,6 +706,11 @@ public class ConfigurationKeys {
// Topic used only for event reporting.
public static final String METRICS_KAFKA_TOPIC_EVENTS =
METRICS_CONFIGURATIONS_PREFIX + "reporting.kafka.topic.events";
+ // Key related configurations for raw metric and event key value reporters
+ public static final int DEFAULT_REPORTER_KEY_SIZE = 100;
+
+ public static final String METRICS_REPORTING_PUSHERKEYS =
METRICS_CONFIGURATIONS_PREFIX + "reporting.pusherKeys";
+ public static final String METRICS_REPORTING_EVENTS_PUSHERKEYS =
METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX + ".pusherKeys";
//Graphite-based reporting
public static final String METRICS_REPORTING_GRAPHITE_METRICS_ENABLED_KEY =
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
index e9e981d..eb409ff 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
@@ -17,10 +17,13 @@
package org.apache.gobblin.metrics;
+import java.io.IOException;
import java.util.List;
import java.util.Properties;
+import com.codahale.metrics.ScheduledReporter;
import com.google.common.base.Splitter;
+import com.typesafe.config.Config;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
@@ -28,7 +31,12 @@ import
org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter;
import org.apache.gobblin.metrics.kafka.KafkaAvroReporter;
import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
+import org.apache.gobblin.metrics.reporter.KeyValueEventObjectReporter;
+import org.apache.gobblin.metrics.reporter.KeyValueMetricObjectReporter;
import org.apache.gobblin.metrics.kafka.KafkaReporter;
+import org.apache.gobblin.metrics.kafka.PusherUtils;
+import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
+import org.apache.gobblin.util.ConfigUtils;
/**
@@ -36,68 +44,161 @@ import org.apache.gobblin.metrics.kafka.KafkaReporter;
*/
public enum KafkaReportingFormats {
- AVRO,
- AVRO_KEY_VALUE,
- JSON;
+ AVRO() {
+ @Override
+ public void buildMetricsReporter(String brokers, String topic, Properties
properties)
+ throws IOException {
+
+ KafkaAvroReporter.Builder<?> builder =
KafkaAvroReporter.BuilderFactory.newBuilder();
+ if
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
+ builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+ }
+ builder.build(brokers, topic, properties);
+ }
+
+ @Override
+ public ScheduledReporter buildEventsReporter(String brokers, String topic,
MetricContext context,
+ Properties properties)
+ throws IOException {
+
+ KafkaAvroEventReporter.Builder<?> builder =
KafkaAvroEventReporter.Factory.forContext(context);
+ if
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
+ builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+ }
+ String pusherClassName =
properties.containsKey(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) ?
properties
+ .getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) :
properties
+ .getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY,
PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME);
+ builder.withPusherClassName(pusherClassName);
+
+ Config allConfig = ConfigUtils.propertiesToConfig(properties);
+ // the kafka configuration is composed of the metrics reporting specific
keys with a fallback to the shared
+ // kafka config
+ Config kafkaConfig = ConfigUtils.getConfigOrEmpty(allConfig,
PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX)
+ .withFallback(ConfigUtils.getConfigOrEmpty(allConfig,
ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
+
+ builder.withConfig(kafkaConfig);
+
+ return builder.build(brokers, topic);
+ }
+ }, AVRO_KEY_VALUE() {
+ @Override
+ public void buildMetricsReporter(String brokers, String topic, Properties
properties)
+ throws IOException {
+
+ throw new IOException("Unsupported format for Metric reporting " +
this.name());
+ }
+
+ @Override
+ public ScheduledReporter buildEventsReporter(String brokers, String topic,
MetricContext context,
+ Properties properties)
+ throws IOException {
+
+ KafkaAvroEventKeyValueReporter.Builder<?> builder =
KafkaAvroEventKeyValueReporter.Factory.forContext(context);
+ if
(properties.containsKey(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS))
{
+ List<String> keys = Splitter.on(",").omitEmptyStrings().trimResults()
+
.splitToList(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS));
+ builder.withKeys(keys);
+ }
+ if
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
+ builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+ }
+ String pusherClassName =
properties.containsKey(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) ?
properties
+ .getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) :
properties
+ .getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY,
PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME);
+ builder.withPusherClassName(pusherClassName);
+
+ Config allConfig = ConfigUtils.propertiesToConfig(properties);
+ // the kafka configuration is composed of the metrics reporting specific
keys with a fallback to the shared
+ // kafka config
+ Config kafkaConfig = ConfigUtils.getConfigOrEmpty(allConfig,
PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX)
+ .withFallback(ConfigUtils.getConfigOrEmpty(allConfig,
ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
+
+ builder.withConfig(kafkaConfig);
+
+ return builder.build(brokers, topic);
+ }
+ }, JSON() {
+ @Override
+ public void buildMetricsReporter(String brokers, String topic, Properties
properties)
+ throws IOException {
+ KafkaReporter.Builder builder =
KafkaReporter.BuilderFactory.newBuilder();
+ builder.build(brokers, topic, properties);
+ }
+
+ @Override
+ public ScheduledReporter buildEventsReporter(String brokers, String topic,
MetricContext context,
+ Properties properties)
+ throws IOException {
+ KafkaEventReporter.Builder builder =
KafkaEventReporter.Factory.forContext(context);
+ //builder.withConfig(getEventsConfig(properties));
+ String pusherClassName =
properties.containsKey(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) ?
properties
+ .getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) :
properties
+ .getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY,
PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME);
+ builder.withPusherClassName(pusherClassName);
+
+ Config allConfig = ConfigUtils.propertiesToConfig(properties);
+ // the kafka configuration is composed of the metrics reporting specific
keys with a fallback to the shared
+ // kafka config
+ Config kafkaConfig = ConfigUtils.getConfigOrEmpty(allConfig,
PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX)
+ .withFallback(ConfigUtils.getConfigOrEmpty(allConfig,
ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
+
+ builder.withConfig(kafkaConfig);
+
+ return builder.build(brokers, topic);
+ }
+ }, PLAIN_OBJECT() {
+ @Override
+ public void buildMetricsReporter(String brokers, String topic, Properties
properties)
+ throws IOException {
+
+ KeyValueMetricObjectReporter.Builder builder = new
KeyValueMetricObjectReporter.Builder();
+
builder.namespaceOverride(KafkaAvroReporterUtil.extractOverrideNamespace(properties));
+ Config allConfig = ConfigUtils.propertiesToConfig(properties);
+ Config config = ConfigUtils.getConfigOrEmpty(allConfig,
ConfigurationKeys.METRICS_REPORTING_CONFIGURATIONS_PREFIX)
+ .withFallback(allConfig);
+ builder.build(brokers, topic, config);
+ }
+
+ @Override
+ public ScheduledReporter buildEventsReporter(String brokers, String topic,
MetricContext context,
+ Properties properties)
+ throws IOException {
+
+ KeyValueEventObjectReporter.Builder builder = new
KeyValueEventObjectReporter.Builder(context);
+ Config allConfig = ConfigUtils.propertiesToConfig(properties);
+ Config config =
+ ConfigUtils.getConfigOrEmpty(allConfig,
ConfigurationKeys.METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX)
+ .withFallback(allConfig);
+ builder.withConfig(config);
+
builder.namespaceOverride(KafkaAvroReporterUtil.extractOverrideNamespace(properties));
+ return builder.build(brokers, topic);
+ }
+ };
/**
- * Get a {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder} for
this reporting format.
- *
- * @param properties {@link Properties} containing information to build
reporters.
- * @return {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder}.
+ * Method to build reporters that emit metrics. This method does not return
anything but schedules/starts the reporter internally
+ * @param brokers Kafka broker to connect
+ * @param topic Kafka topic to publish data
+ * @param properties Properties to build configurations from
+ * @throws IOException
*/
- public KafkaReporter.Builder<?> metricReporterBuilder(Properties properties)
{
- switch (this) {
- case AVRO:
- KafkaAvroReporter.Builder<?> builder =
KafkaAvroReporter.BuilderFactory.newBuilder();
- if
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
-
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
- builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
- }
- return builder;
- case JSON:
- return KafkaReporter.BuilderFactory.newBuilder();
- default:
- // This should never happen.
- throw new IllegalArgumentException("KafkaReportingFormat not
recognized.");
- }
- }
+ public abstract void buildMetricsReporter(String brokers, String topic,
Properties properties)
+ throws IOException;
/**
- * Get a {@link org.apache.gobblin.metrics.kafka.KafkaEventReporter.Builder}
for this reporting format.
- * @param context {@link MetricContext} that should be reported.
- * @param properties {@link Properties} containing information to build
reporters.
- * @return {@link
org.apache.gobblin.metrics.kafka.KafkaEventReporter.Builder}.
+ * Method to build reporters that emit events.
+ * @param brokers Kafka broker to connect
+ * @param topic Kafka topic to publish data
+ * @param context MetricContext to report
+ * @param properties Properties to build configurations from
+ * @return an instance of the event reporter
+ * @throws IOException
*/
- public KafkaEventReporter.Builder<?> eventReporterBuilder(MetricContext
context, Properties properties) {
- switch (this) {
- case AVRO:
- KafkaAvroEventReporter.Builder<?> kafkaAvroEventReporterBuilder =
KafkaAvroEventReporter.Factory.forContext(context);
- if
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
-
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
- kafkaAvroEventReporterBuilder.withSchemaRegistry(new
KafkaAvroSchemaRegistry(properties));
- }
- return kafkaAvroEventReporterBuilder;
-
- case AVRO_KEY_VALUE:
- KafkaAvroEventKeyValueReporter.Builder<?>
kafkaAvroEventKeyValueReporterBuilder =
KafkaAvroEventKeyValueReporter.Factory.forContext(context);
- if
(properties.containsKey(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS))
{
- List<String> keys = Splitter.on(",").omitEmptyStrings().trimResults()
-
.splitToList(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS));
- kafkaAvroEventKeyValueReporterBuilder.withKeys(keys);
- }
- if
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
-
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
- kafkaAvroEventKeyValueReporterBuilder.withSchemaRegistry(new
KafkaAvroSchemaRegistry(properties));
- }
- return kafkaAvroEventKeyValueReporterBuilder;
-
- case JSON:
- return KafkaEventReporter.Factory.forContext(context);
-
- default:
- // This should never happen.
- throw new IllegalArgumentException("KafkaReportingFormat not
recognized.");
- }
- }
+ public abstract ScheduledReporter buildEventsReporter(String brokers, String
topic, MetricContext context,
+ Properties properties)
+ throws IOException;
+
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
index be95258..9f38379 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
@@ -24,13 +24,11 @@ import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.ScheduledReporter;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.CustomCodahaleReporterFactory;
import org.apache.gobblin.metrics.KafkaReportingFormats;
import org.apache.gobblin.metrics.RootMetricContext;
-import org.apache.gobblin.util.ConfigUtils;
import lombok.extern.slf4j.Slf4j;
@@ -44,18 +42,23 @@ public class KafkaReporterFactory implements
CustomCodahaleReporterFactory {
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_ENABLED))) {
return null;
}
- log.info("Reporting metrics to Kafka");
+ log.info("Reporting metrics & events to Kafka");
- Optional<String> defaultTopic =
Optional.fromNullable(properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC));
- Optional<String> metricsTopic = Optional.fromNullable(
- properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_METRICS));
- Optional<String> eventsTopic = Optional.fromNullable(
- properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_EVENTS));
+ Optional<String> defaultTopic =
+
Optional.fromNullable(properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC));
+ Optional<String> metricsTopic =
+
Optional.fromNullable(properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_METRICS));
+ Optional<String> eventsTopic =
+
Optional.fromNullable(properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_EVENTS));
boolean metricsEnabled = metricsTopic.or(defaultTopic).isPresent();
- if (metricsEnabled) log.info("Reporting metrics to Kafka");
+ if (metricsEnabled) {
+ log.info("Metrics enabled --- Reporting metrics to Kafka");
+ }
boolean eventsEnabled = eventsTopic.or(defaultTopic).isPresent();
- if (eventsEnabled) log.info("Reporting events to Kafka");
+ if (eventsEnabled) {
+ log.info("Events enabled --- Reporting events to Kafka");
+ }
try {
Preconditions.checkArgument(properties.containsKey(ConfigurationKeys.METRICS_KAFKA_BROKERS),
@@ -75,15 +78,15 @@ public class KafkaReporterFactory implements
CustomCodahaleReporterFactory {
try {
formatEnum =
KafkaReportingFormats.valueOf(metricsReportingFormat.toUpperCase());
} catch (IllegalArgumentException exception) {
- log.warn("Kafka metrics reporting format " + metricsReportingFormat +
- " not recognized. Will report in json format.", exception);
+ log.warn(
+ "Kafka metrics reporting format " + metricsReportingFormat + " not
recognized. Will report in json format.",
+ exception);
formatEnum = KafkaReportingFormats.JSON;
}
if (metricsEnabled) {
try {
- formatEnum.metricReporterBuilder(properties)
- .build(brokers, metricsTopic.or(defaultTopic).get(), properties);
+ formatEnum.buildMetricsReporter(brokers,
metricsTopic.or(defaultTopic).get(), properties);
} catch (IOException exception) {
log.error("Failed to create Kafka metrics reporter. Will not report
metrics to Kafka.", exception);
}
@@ -96,7 +99,8 @@ public class KafkaReporterFactory implements
CustomCodahaleReporterFactory {
try {
eventFormatEnum =
KafkaReportingFormats.valueOf(eventsReportingFormat.toUpperCase());
} catch (IllegalArgumentException exception) {
- log.warn("Kafka events reporting format " + eventsReportingFormat + "
not recognized. Will report in json format.",
+ log.warn(
+ "Kafka events reporting format " + eventsReportingFormat + " not
recognized. Will report in json format.",
exception);
eventFormatEnum = KafkaReportingFormats.JSON;
}
@@ -106,31 +110,16 @@ public class KafkaReporterFactory implements
CustomCodahaleReporterFactory {
if (eventsEnabled) {
try {
- KafkaEventReporter.Builder<?> builder =
eventFormatEnum.eventReporterBuilder(RootMetricContext.get(),
- properties);
-
- Config allConfig = ConfigUtils.propertiesToConfig(properties);
- // the kafka configuration is composed of the metrics reporting
specific keys with a fallback to the shared
- // kafka config
- Config kafkaConfig = ConfigUtils.getConfigOrEmpty(allConfig,
-
PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX).withFallback(ConfigUtils.getConfigOrEmpty(allConfig,
- ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
-
- builder.withConfig(kafkaConfig);
-
- String pusherClassName =
properties.containsKey(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS)
- ?
properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS)
- : properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY,
- PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME);
- builder.withPusherClassName(pusherClassName);
+ String eventTopic = eventsTopic.or(defaultTopic).get();
+ ScheduledReporter reporter =
+ eventFormatEnum.buildEventsReporter(brokers, eventTopic,
RootMetricContext.get(), properties);
- return builder.build(brokers, eventsTopic.or(defaultTopic).get());
+ return reporter;
} catch (IOException exception) {
log.error("Failed to create Kafka events reporter. Will not report
events to Kafka.", exception);
}
}
- log.info("Will start reporting metrics to Kafka");
return null;
}
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/LoggingPusher.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/LoggingPusher.java
index b86287e..7033a3a 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/LoggingPusher.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/LoggingPusher.java
@@ -19,20 +19,24 @@ package org.apache.gobblin.metrics.kafka;
import java.io.IOException;
import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
+
import com.google.common.base.Optional;
import com.typesafe.config.Config;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.reporter.KeyValuePusher;
import org.apache.gobblin.util.ConfigUtils;
import lombok.extern.slf4j.Slf4j;
+
/**
* This is a {@Pusher} class that logs the messages
- * @param <M> message type
+ * @param <V> message type
*/
@Slf4j
-public class LoggingPusher<M> implements Pusher<M> {
+public class LoggingPusher<K, V> implements KeyValuePusher<K, V> {
private final String brokers;
private final String topic;
private static final String KAFKA_TOPIC = "kafka.topic";
@@ -56,13 +60,22 @@ public class LoggingPusher<M> implements Pusher<M> {
this.topic = topic;
}
- public void pushMessages(List<M> messages) {
- for (M message: messages) {
- log.info("Pushing to {}:{}: {}", this.brokers, this.topic,
message.toString());
+ @Override
+ public void close()
+ throws IOException {
+ }
+
+ @Override
+ public void pushKeyValueMessages(List<Pair<K, V>> messages) {
+ for (Pair<K, V> message : messages) {
+ log.info("Pushing to {}:{}: {} - {}", this.brokers, this.topic,
message.getKey(), message.getValue().toString());
}
}
@Override
- public void close() throws IOException {
+ public void pushMessages(List<V> messages) {
+ for (V message : messages) {
+ log.info("Pushing to {}:{}: {}", this.brokers, this.topic,
message.toString());
+ }
}
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java
index 0d9b2a5..426b5ee 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java
@@ -21,11 +21,13 @@ import com.typesafe.config.Config;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
public class PusherUtils {
public static final String METRICS_REPORTING_KAFKA_CONFIG_PREFIX =
"metrics.reporting.kafka.config";
public static final String KAFKA_PUSHER_CLASS_NAME_KEY =
"metrics.reporting.kafkaPusherClass";
public static final String KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS =
"metrics.reporting.events.kafkaPusherClass";
public static final String DEFAULT_KAFKA_PUSHER_CLASS_NAME =
"org.apache.gobblin.metrics.kafka.KafkaPusher";
+ public static final String DEFAULT_KEY_VALUE_PUSHER_CLASS_NAME =
"org.apache.gobblin.metrics.kafka.LoggingPusher";
/**
* Create a {@link Pusher}
@@ -39,8 +41,7 @@ public class PusherUtils {
try {
Class<?> pusherClass = Class.forName(pusherClassName);
- return (Pusher)
GobblinConstructorUtils.invokeLongestConstructor(pusherClass,
- brokers, topic, config);
+ return (Pusher)
GobblinConstructorUtils.invokeLongestConstructor(pusherClass, brokers, topic,
config);
} catch (ReflectiveOperationException e) {
throw new RuntimeException("Could not instantiate kafka pusher", e);
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporter.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporter.java
new file mode 100644
index 0000000..fb4e006
--- /dev/null
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporter.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+import java.util.StringJoiner;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.kafka.PusherUtils;
+import org.apache.gobblin.util.AvroUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * This is a raw event (GobblinTrackingEvent) key value reporter that reports
events as GenericRecords without serialization
+ * Configuration for this reporter start with the prefix
"metrics.reporting.events"
+ */
+@Slf4j
+public class KeyValueEventObjectReporter extends EventReporter {
+ private static final String PUSHER_CONFIG = "pusherConfig";
+ private static final String PUSHER_CLASS = "pusherClass";
+ private static final String PUSHER_KEYS = "pusherKeys";
+ private static final String KEY_DELIMITER = ",";
+ private static final String KEY_SIZE_KEY = "keySize";
+
+ protected List<String> keys;
+ protected final String randomKey;
+ protected KeyValuePusher pusher;
+ protected Optional<Map<String, String>> namespaceOverride;
+ protected final String topic;
+
+ public KeyValueEventObjectReporter(Builder builder) {
+ super(builder);
+
+ this.topic = builder.topic;
+ this.namespaceOverride = builder.namespaceOverride;
+
+ Config config = builder.config.get();
+ Config pusherConfig = ConfigUtils.getConfigOrEmpty(config,
PUSHER_CONFIG).withFallback(config);
+ String pusherClassName =
+ ConfigUtils.getString(config, PUSHER_CLASS,
PusherUtils.DEFAULT_KEY_VALUE_PUSHER_CLASS_NAME);
+ this.pusher = (KeyValuePusher) PusherUtils
+ .getPusher(pusherClassName, builder.brokers, builder.topic,
Optional.of(pusherConfig));
+ this.closer.register(this.pusher);
+
+ randomKey = String.valueOf(
+ new Random().nextInt(ConfigUtils.getInt(config, KEY_SIZE_KEY,
ConfigurationKeys.DEFAULT_REPORTER_KEY_SIZE)));
+ if (config.hasPath(PUSHER_KEYS)) {
+ List<String> keys =
Splitter.on(",").omitEmptyStrings().trimResults().splitToList(config.getString(PUSHER_KEYS));
+ this.keys = keys;
+ } else {
+ log.info(
+ "Key not assigned from config. Please set it with property {} Using
randomly generated number {} as key ",
+ ConfigurationKeys.METRICS_REPORTING_EVENTS_PUSHERKEYS, randomKey);
+ }
+ }
+
+ @Override
+ public void reportEventQueue(Queue<GobblinTrackingEvent> queue) {
+ log.info("Emitting report using KeyValueEventObjectReporter");
+
+ List<Pair<String, GenericRecord>> events = Lists.newArrayList();
+ GobblinTrackingEvent event;
+
+ while (null != (event = queue.poll())) {
+ GenericRecord record = AvroUtils.overrideNameAndNamespace(event,
this.topic, this.namespaceOverride);
+ events.add(Pair.of(buildKey(record), record));
+ }
+
+ if (!events.isEmpty()) {
+ this.pusher.pushKeyValueMessages(events);
+ }
+ }
+
+ private String buildKey(GenericRecord record) {
+
+ String key = randomKey;
+ if (this.keys != null && this.keys.size() > 0) {
+ StringJoiner joiner = new StringJoiner(KEY_DELIMITER);
+ for (String keyPart : keys) {
+ Optional value = AvroUtils.getFieldValue(record, keyPart);
+ if (value.isPresent()) {
+ joiner.add(value.get().toString());
+ } else {
+ log.info("{} not found in the GobblinTrackingEvent. Setting key to
{}", keyPart, key);
+ return key;
+ }
+ }
+
+ key = joiner.toString();
+ }
+
+ return key;
+ }
+
+ public static class Builder extends EventReporter.Builder<Builder> {
+
+ protected String brokers;
+ protected String topic;
+ protected Optional<Config> config = Optional.absent();
+ protected Optional<Map<String, String>> namespaceOverride =
Optional.absent();
+
+ public Builder(MetricContext context) {
+ super(context);
+ }
+
+ @Override
+ protected Builder self() {
+ return this;
+ }
+
+ /**
+ * Set additional configuration.
+ */
+ public Builder withConfig(Config config) {
+ this.config = Optional.of(config);
+ return self();
+ }
+
+ public Builder namespaceOverride(Optional<Map<String, String>>
namespaceOverride) {
+ this.namespaceOverride = namespaceOverride;
+ return self();
+ }
+
+ public KeyValueEventObjectReporter build(String brokers, String topic) {
+ this.brokers = brokers;
+ this.topic = topic;
+ return new KeyValueEventObjectReporter(this);
+ }
+ }
+}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporter.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporter.java
new file mode 100644
index 0000000..96372c0
--- /dev/null
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporter.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.StringJoiner;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.MetricReport;
+import org.apache.gobblin.metrics.kafka.PusherUtils;
+import org.apache.gobblin.metrics.reporter.MetricReportReporter;
+import org.apache.gobblin.util.AvroUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * This is a raw metric (MetricReport) key value reporter that reports metrics
as GenericRecords without serialization
+ * Configuration for this reporter start with the prefix "metrics.reporting"
+ */
+@Slf4j
+public class KeyValueMetricObjectReporter extends MetricReportReporter {
+
+ private static final String PUSHER_CONFIG = "pusherConfig";
+ private static final String PUSHER_CLASS = "pusherClass";
+ private static final String PUSHER_KEYS = "pusherKeys";
+ private static final String KEY_DELIMITER = ",";
+ private static final String KEY_SIZE_KEY = "keySize";
+
+ private List<String> keys;
+ protected final String randomKey;
+ protected KeyValuePusher pusher;
+ private Optional<Map<String, String>> namespaceOverride;
+ protected final String topic;
+
+ public KeyValueMetricObjectReporter(Builder builder, Config config) {
+ super(builder, config);
+
+ this.topic = builder.topic;
+ this.namespaceOverride = builder.namespaceOverride;
+
+ Config pusherConfig = ConfigUtils.getConfigOrEmpty(config,
PUSHER_CONFIG).withFallback(config);
+ String pusherClassName =
+ ConfigUtils.getString(config, PUSHER_CLASS,
PusherUtils.DEFAULT_KEY_VALUE_PUSHER_CLASS_NAME);
+ this.pusher = (KeyValuePusher) PusherUtils
+ .getPusher(pusherClassName, builder.brokers, builder.topic,
Optional.of(pusherConfig));
+ this.closer.register(this.pusher);
+
+ randomKey = String.valueOf(
+ new Random().nextInt(ConfigUtils.getInt(config, KEY_SIZE_KEY,
ConfigurationKeys.DEFAULT_REPORTER_KEY_SIZE)));
+ if (config.hasPath(PUSHER_KEYS)) {
+ List<String> keys =
Splitter.on(",").omitEmptyStrings().trimResults().splitToList(config.getString(PUSHER_KEYS));
+ this.keys = keys;
+ } else {
+ log.info(
+ "Key not assigned from config. Please set it with property {} Using
randomly generated number {} as key ",
+ ConfigurationKeys.METRICS_REPORTING_PUSHERKEYS, randomKey);
+ }
+ }
+
+ @Override
+ protected void emitReport(MetricReport report) {
+ GenericRecord record = AvroUtils.overrideNameAndNamespace(report,
this.topic, this.namespaceOverride);
+
this.pusher.pushKeyValueMessages(Lists.newArrayList(Pair.of(buildKey(record),
record)));
+ }
+
+ private String buildKey(GenericRecord report) {
+
+ String key = randomKey;
+ if (this.keys != null && this.keys.size() > 0) {
+
+ StringJoiner joiner = new StringJoiner(KEY_DELIMITER);
+ for (String keyPart : keys) {
+ Optional value = AvroUtils.getFieldValue(report, keyPart);
+ if (value.isPresent()) {
+ joiner.add(value.get().toString());
+ } else {
+ log.error("{} not found in the MetricReport. Setting key to {}",
keyPart, key);
+ return key;
+ }
+ }
+
+ key = joiner.toString();
+ }
+
+ return key;
+ }
+
+ public static class Builder extends MetricReportReporter.Builder<Builder> {
+ protected String brokers;
+ protected String topic;
+ protected Optional<Map<String, String>> namespaceOverride =
Optional.absent();
+
+ public Builder namespaceOverride(Optional<Map<String, String>>
namespaceOverride) {
+ this.namespaceOverride = namespaceOverride;
+ return self();
+ }
+
+ public KeyValueMetricObjectReporter build(String brokers, String topic,
Config config)
+ throws IOException {
+ this.brokers = brokers;
+ this.topic = topic;
+ return new KeyValueMetricObjectReporter(this, config);
+ }
+
+ @Override
+ protected Builder self() {
+ return this;
+ }
+ }
+}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValuePusher.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValuePusher.java
new file mode 100644
index 0000000..330bfc5
--- /dev/null
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValuePusher.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.util.List;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.apache.gobblin.metrics.kafka.Pusher;
+
+
+public interface KeyValuePusher<K, V> extends Pusher<V> {
+
+ void pushKeyValueMessages(List<Pair<K, V>> messages);
+}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
index 3e861de..e1635ea 100644
---
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java
@@ -29,6 +29,8 @@ import org.testng.annotations.Test;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
+import org.apache.gobblin.metrics.reporter.KeyValuePusher;
+
@Test
public class LoggingPusherTest {
@@ -40,22 +42,30 @@ public class LoggingPusherTest {
Logger logger = LogManager.getLogger(LoggingPusher.class.getName());
logger.addAppender(testAppender);
- LoggingPusher<String> loggingPusher = new LoggingPusher<String>("broker",
"topic", Optional.absent());
+ KeyValuePusher<String, String> loggingPusher =
+ new LoggingPusher<String, String>("broker", "topic",
Optional.absent());
loggingPusher.pushMessages(ImmutableList.of("message1", "message2"));
+
loggingPusher.pushKeyValueMessages(ImmutableList.of(org.apache.commons.lang3.tuple.Pair.of("key",
"message3")));
- Assert.assertEquals(testAppender.events.size(), 2);
+ Assert.assertEquals(testAppender.events.size(), 3);
Assert.assertEquals(testAppender.events.get(0).getRenderedMessage(),
"Pushing to broker:topic: message1");
Assert.assertEquals(testAppender.events.get(1).getRenderedMessage(),
"Pushing to broker:topic: message2");
+ Assert.assertEquals(testAppender.events.get(2).getRenderedMessage(),
"Pushing to broker:topic: key - message3");
logger.removeAppender(testAppender);
}
-
private class TestAppender extends AppenderSkeleton {
List<LoggingEvent> events = new ArrayList<LoggingEvent>();
- public void close() {}
- public boolean requiresLayout() {return false;}
+
+ public void close() {
+ }
+
+ public boolean requiresLayout() {
+ return false;
+ }
+
@Override
protected void append(LoggingEvent event) {
events.add(event);
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporterTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporterTest.java
new file mode 100644
index 0000000..a02ce3f
--- /dev/null
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporterTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.tuple.Pair;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Maps;
+
+import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class KeyValueEventObjectReporterTest extends
KeyValueEventObjectReporter {
+
+ public KeyValueEventObjectReporterTest(Builder builder) {
+ super(builder);
+ }
+
+ public MockKeyValuePusher getPusher() {
+ return (MockKeyValuePusher) pusher;
+ }
+
+ public static class Builder extends KeyValueEventObjectReporter.Builder {
+
+ protected Builder(MetricContext context) {
+ super(context);
+ }
+
+ public KeyValueEventObjectReporterTest build(String brokers, String topic)
{
+ this.brokers = brokers;
+ this.topic = topic;
+ return new KeyValueEventObjectReporterTest(this);
+ }
+ }
+
+ /**
+ * Get builder for KeyValueEventObjectReporter
+ * @return KeyValueEventObjectReporter builder
+ */
+ public static KeyValueEventObjectReporterTest.Builder
getBuilder(MetricContext context, Properties props) {
+ KeyValueEventObjectReporterTest.Builder builder = new
KeyValueEventObjectReporterTest.Builder(context);
+
builder.namespaceOverride(KafkaAvroReporterUtil.extractOverrideNamespace(props))
+ .withConfig(ConfigUtils.propertiesToConfig(props));
+ return builder;
+ }
+
+ @Test
+ public static void testKafkaKeyValueEventObjectReporter()
+ throws IOException {
+ MetricContext context = MetricContext.builder("context").build();
+ String namespace = "org.apache.gobblin.metrics:gobblin.metrics.test";
+
+ Properties properties = new Properties();
+
properties.put(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_OVERRIDE_NAMESPACE,
namespace);
+ properties.put("pusherClass",
"org.apache.gobblin.metrics.reporter.MockKeyValuePusher");
+
+ KeyValueEventObjectReporterTest reporter = getBuilder(context,
properties).build("localhost:0000", "topic");
+
+ String eventName = "testEvent";
+
+ GobblinTrackingEvent event = new GobblinTrackingEvent();
+ event.setName(eventName);
+ event.setNamespace(namespace);
+ Map<String, String> metadata = Maps.newHashMap();
+ event.setMetadata(metadata);
+ context.submitEvent(event);
+
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ reporter.report();
+
+ MockKeyValuePusher pusher = reporter.getPusher();
+ Pair<String, GenericRecord> retrievedEvent =
nextKVEvent(pusher.messageIterator());
+
+ Assert.assertEquals(retrievedEvent.getValue().get("namespace"), namespace);
+ Assert.assertEquals(retrievedEvent.getValue().get("name"), eventName);
+ int partition = Integer.parseInt(retrievedEvent.getKey());
+ Assert.assertTrue((0 <= partition && partition <= 99));
+ }
+
+ private static Pair<String, GenericRecord> nextKVEvent(Iterator<Pair<String,
GenericRecord>> it) {
+ Assert.assertTrue(it.hasNext());
+ Pair<String, GenericRecord> event = it.next();
+ return Pair.of(event.getKey(), event.getValue());
+ }
+}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporterTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporterTest.java
new file mode 100644
index 0000000..32b9620
--- /dev/null
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporterTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.tuple.Pair;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class KeyValueMetricObjectReporterTest extends
KeyValueMetricObjectReporter {
+
+ private static final String TOPIC =
KeyValueMetricObjectReporterTest.class.getSimpleName();
+
+ public KeyValueMetricObjectReporterTest(Builder builder, Config config) {
+ super(builder, config);
+ }
+
+ public MockKeyValuePusher getPusher() {
+ return (MockKeyValuePusher) pusher;
+ }
+
+ public static class Builder extends KeyValueMetricObjectReporter.Builder {
+
+ public KeyValueMetricObjectReporterTest build(String brokers, String
topic, Config config)
+ throws IOException {
+ this.brokers = brokers;
+ this.topic = topic;
+ return new KeyValueMetricObjectReporterTest(this, config);
+ }
+ }
+
+ /**
+ * Get builder for KeyValueMetricObjectReporter
+ * @return KeyValueMetricObjectReporter builder
+ */
+ public static KeyValueMetricObjectReporterTest.Builder getBuilder(Properties
props) {
+ KeyValueMetricObjectReporterTest.Builder builder = new
KeyValueMetricObjectReporterTest.Builder();
+
builder.namespaceOverride(KafkaAvroReporterUtil.extractOverrideNamespace(props));
+ return builder;
+ }
+
+ @Test
+ public static void testKafkaKeyValueMetricObjectReporter()
+ throws IOException {
+ MetricContext metricContext = MetricContext.builder("context").build();
+
+ String namespace = "org.apache.gobblin.metrics:gobblin.metrics.test";
+ String name = TOPIC;
+ Properties properties = new Properties();
+
properties.put(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_OVERRIDE_NAMESPACE,
namespace);
+ properties.put("pusherClass",
"org.apache.gobblin.metrics.reporter.MockKeyValuePusher");
+
+ KeyValueMetricObjectReporterTest reporter =
+ getBuilder(properties).build("localhost:0000", TOPIC,
ConfigUtils.propertiesToConfig(properties));
+
+ reporter.report(metricContext);
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ MockKeyValuePusher pusher = reporter.getPusher();
+ Pair<String, GenericRecord> retrievedEvent =
nextKVReport(pusher.messageIterator());
+
+ Assert.assertEquals(retrievedEvent.getValue().getSchema().getNamespace(),
"gobblin.metrics.test");
+ Assert.assertEquals(retrievedEvent.getValue().getSchema().getName(), name);
+ int partition = Integer.parseInt(retrievedEvent.getKey());
+ Assert.assertTrue((0 <= partition && partition <= 99));
+
+ reporter.close();
+ }
+
+ /**
+ * Extract the next metric from the Kafka iterator
+ * Assumes existence of the metric has already been checked.
+ * @param it Kafka ConsumerIterator
+ * @return next metric in the stream
+ * @throws IOException
+ */
+ protected static Pair<String, GenericRecord>
nextKVReport(Iterator<Pair<String, GenericRecord>> it) {
+ Assert.assertTrue(it.hasNext());
+ return it.next();
+ }
+}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKeyValuePusher.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKeyValuePusher.java
new file mode 100644
index 0000000..959bf01
--- /dev/null
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKeyValuePusher.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.collect.Queues;
+
+
+/**
+ * Mock instance of {@link org.apache.gobblin.metrics.reporter.KeyValuePusher}
used to test
+ * {@link KeyValueMetricObjectReporter}
+ * {@link KeyValueEventObjectReporter}
+ */
+
+public class MockKeyValuePusher<K, V> implements KeyValuePusher<K, V> {
+
+ Queue<Pair<K, V>> messages = Queues.newLinkedBlockingQueue();
+
+ @Override
+ public void pushKeyValueMessages(List<Pair<K, V>> messages) {
+ this.messages.clear();
+ this.messages.addAll(messages);
+ }
+
+ @Override
+ public void pushMessages(List<V> messages) {
+
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+
+ }
+
+ public Iterator<Pair<K, V>> messageIterator() {
+ return this.messages.iterator();
+ }
+}
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index 8f2c653..fccd46e 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -78,10 +78,12 @@ import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import javax.annotation.Nonnull;
+import lombok.extern.slf4j.Slf4j;
/**
* A Utils class for dealing with Avro objects
*/
+@Slf4j
public class AvroUtils {
private static final Logger LOG = LoggerFactory.getLogger(AvroUtils.class);
@@ -281,7 +283,7 @@ public class AvroUtils {
} else if (data instanceof List) {
val = getObjectFromArray((List)data,
Integer.parseInt(pathList.get(field)));
} else {
- val = ((Record)data).get(pathList.get(field));
+ val = ((GenericRecord)data).get(pathList.get(field));
}
if (val != null) {
@@ -313,7 +315,7 @@ public class AvroUtils {
return;
}
- AvroUtils.getFieldHelper(retVal, ((Record) data).get(pathList.get(field)),
pathList, ++field);
+ AvroUtils.getFieldHelper(retVal, ((GenericRecord)
data).get(pathList.get(field)), pathList, ++field);
return;
}
@@ -352,7 +354,12 @@ public class AvroUtils {
private static Object getObjectFromMap(Map map, String key) {
Utf8 utf8Key = new Utf8(key);
- return map.get(utf8Key);
+ Object value = map.get(utf8Key);
+ if (value == null) {
+ return map.get(key);
+ }
+
+ return value;
}
/**
@@ -843,7 +850,6 @@ public class AvroUtils {
return reader.read(null, decoder);
}
-
/**
* Decorate the {@link Schema} for a record with additional {@link Field}s.
* @param inputSchema: must be a {@link Record} schema.
@@ -875,11 +881,33 @@ public class AvroUtils {
public static GenericRecord decorateRecord(GenericRecord inputRecord,
@Nonnull Map<String, Object> fieldMap,
Schema outputSchema) {
GenericRecord outputRecord = new GenericData.Record(outputSchema);
- inputRecord.getSchema().getFields().forEach(
- f -> outputRecord.put(f.name(), inputRecord.get(f.name()))
- );
+ inputRecord.getSchema().getFields().forEach(f ->
outputRecord.put(f.name(), inputRecord.get(f.name())));
fieldMap.forEach((key, value) -> outputRecord.put(key, value));
return outputRecord;
}
+ /**
+ * Given a generic record, Override the name and namespace of the schema and
return a new generic record
+ * @param input input record who's name and namespace need to be overridden
+ * @param nameOverride new name for the record schema
+ * @param namespaceOverride Optional map containing namespace overrides
+ * @return an output record with overriden name and possibly namespace
+ */
+ public static GenericRecord overrideNameAndNamespace(GenericRecord input,
String nameOverride, Optional<Map<String, String>> namespaceOverride) {
+
+ GenericRecord output = input;
+ Schema newSchema = switchName(input.getSchema(), nameOverride);
+ if(namespaceOverride.isPresent()) {
+ newSchema = switchNamespace(newSchema, namespaceOverride.get());
+ }
+
+ try {
+ output = convertRecordSchema(output, newSchema);
+ } catch (Exception e){
+ log.error("Unable to generate generic data record", e);
+ }
+
+ return output;
+ }
+
}
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
index 94ce650..8031c7e 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
@@ -49,6 +49,7 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.testng.Assert;
import org.testng.annotations.Test;
+import com.google.common.base.Optional;
import com.google.common.collect.Maps;
@@ -482,5 +483,35 @@ public class AvroUtilsTest {
Assert.assertEquals(deserialized.get("metadata"), metadataRecord);
}
+ @Test
+ public void overrideNameAndNamespaceTest() throws IOException{
+
+ String inputName = "input_name";
+ String inputNamespace = "input_namespace";
+ String outputName = "output_name";
+ String outputNamespace = "output_namespace";
+
+ Schema inputRecordSchema =
SchemaBuilder.record(inputName).namespace(inputNamespace).fields()
+ .name("integer1")
+ .type().intBuilder().endInt().noDefault()
+ .endRecord();
+
+ GenericRecord inputRecord = new GenericData.Record(inputRecordSchema);
+ inputRecord.put("integer1", 10);
+
+ GenericRecord outputRecord =
AvroUtils.overrideNameAndNamespace(inputRecord, outputName,
Optional.of(Collections.EMPTY_MAP));
+ Assert.assertEquals(outputRecord.getSchema().getName(), outputName);
+ Assert.assertEquals(outputRecord.getSchema().getNamespace(),
inputNamespace);
+ Assert.assertEquals(outputRecord.get("integer1"), 10);
+
+ Map<String,String> namespaceOverrideMap = new HashMap<>();
+ namespaceOverrideMap.put(inputNamespace,outputNamespace);
+
+ outputRecord = AvroUtils.overrideNameAndNamespace(inputRecord, outputName,
Optional.of(namespaceOverrideMap));
+ Assert.assertEquals(outputRecord.getSchema().getName(), outputName);
+ Assert.assertEquals(outputRecord.getSchema().getNamespace(),
outputNamespace);
+ Assert.assertEquals(outputRecord.get("integer1"), 10);
+
+ }
}