Repository: incubator-gobblin Updated Branches: refs/heads/master 568f0f4d8 -> b2dd7ddfc
[GOBBLIN-642] Implement KafkaAvroEventKeyValueReporter Closes #2511 from arjun4084346/keyvaluerreporter Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b2dd7ddf Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b2dd7ddf Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b2dd7ddf Branch: refs/heads/master Commit: b2dd7ddfc4561fd42f439a6d7357daf5e7fb5f1b Parents: 568f0f4 Author: Arjun <[email protected]> Authored: Fri Dec 7 08:59:53 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Fri Dec 7 08:59:53 2018 -0800 ---------------------------------------------------------------------- .../configuration/ConfigurationKeys.java | 2 + .../gobblin/metrics/KafkaReportingFormats.java | 26 +++- .../kafka/KafkaAvroEventKeyValueReporter.java | 112 ++++++++++++++++ .../kafka/KafkaEventKeyValueReporter.java | 133 +++++++++++++++++++ .../metrics/kafka/KafkaEventReporter.java | 2 +- .../metrics/kafka/KafkaReporterFactory.java | 30 ++++- .../gobblin/metrics/kafka/PusherUtils.java | 1 + .../KafkaAvroEventKeyValueReporterTest.java | 101 ++++++++++++++ .../reporter/MockKafkaKeyValuePusher.java | 49 +++++++ 9 files changed, 446 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b2dd7ddf/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 510f7cf..34b227e 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -681,6 +681,8 @@ public class ConfigurationKeys { public static final String DEFAULT_METRICS_REPORTING_KAFKA_ENABLED = Boolean.toString(false); public static final String DEFAULT_METRICS_REPORTING_KAFKA_REPORTER_CLASS = "org.apache.gobblin.metrics.kafka.KafkaReporterFactory"; public static final String METRICS_REPORTING_KAFKA_FORMAT = METRICS_CONFIGURATIONS_PREFIX + "reporting.kafka.format"; + public static final String METRICS_REPORTING_EVENTS_KAFKA_FORMAT = METRICS_CONFIGURATIONS_PREFIX + "reporting.events.kafka.format"; + public static final String METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS = METRICS_CONFIGURATIONS_PREFIX + "reporting.events.kafkaPusherKeys"; public static final String DEFAULT_METRICS_REPORTING_KAFKA_FORMAT = "json"; public static final String METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY = METRICS_CONFIGURATIONS_PREFIX + "reporting.kafka.avro.use.schema.registry"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b2dd7ddf/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java index a0af52a..e9e981d 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java @@ -17,9 +17,13 @@ package org.apache.gobblin.metrics; +import java.util.List; import java.util.Properties; +import com.google.common.base.Splitter; + import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter; import org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter; import org.apache.gobblin.metrics.kafka.KafkaAvroReporter; import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry; @@ -33,6 +37,7 @@ import org.apache.gobblin.metrics.kafka.KafkaReporter; public enum KafkaReportingFormats { AVRO, + AVRO_KEY_VALUE, JSON; /** @@ -67,14 +72,29 @@ public enum KafkaReportingFormats { public KafkaEventReporter.Builder<?> eventReporterBuilder(MetricContext context, Properties properties) { switch (this) { case AVRO: - KafkaAvroEventReporter.Builder<?> builder = KafkaAvroEventReporter.Factory.forContext(context); + KafkaAvroEventReporter.Builder<?> kafkaAvroEventReporterBuilder = KafkaAvroEventReporter.Factory.forContext(context); if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) { - builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties)); + kafkaAvroEventReporterBuilder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties)); } - return builder; + return kafkaAvroEventReporterBuilder; + + case AVRO_KEY_VALUE: + KafkaAvroEventKeyValueReporter.Builder<?> kafkaAvroEventKeyValueReporterBuilder = KafkaAvroEventKeyValueReporter.Factory.forContext(context); + if (properties.containsKey(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS)) { + List<String> keys = Splitter.on(",").omitEmptyStrings().trimResults() + .splitToList(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS)); + kafkaAvroEventKeyValueReporterBuilder.withKeys(keys); + } + if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, + ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) { + kafkaAvroEventKeyValueReporterBuilder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties)); + } + return kafkaAvroEventKeyValueReporterBuilder; + case JSON: return KafkaEventReporter.Factory.forContext(context); + default: // This should never happen. throw new IllegalArgumentException("KafkaReportingFormat not recognized."); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b2dd7ddf/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventKeyValueReporter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventKeyValueReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventKeyValueReporter.java new file mode 100644 index 0000000..bf6a07e --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventKeyValueReporter.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.metrics.GobblinTrackingEvent; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer; +import org.apache.gobblin.metrics.reporter.util.AvroSerializer; +import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter; +import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +import org.apache.avro.Schema; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; + + +/** + * Implement of {@link KafkaEventKeyValueReporter} for avro records. + */ +@Slf4j +public class KafkaAvroEventKeyValueReporter extends KafkaEventKeyValueReporter { + + protected KafkaAvroEventKeyValueReporter(Builder<?> builder) throws IOException { + super(builder); + if(builder.registry.isPresent()) { + Schema schema = + new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc")); + this.serializer.setSchemaVersionWriter(new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic, + Optional.of(schema))); + } + } + + @Override + protected AvroSerializer<GobblinTrackingEvent> createSerializer(SchemaVersionWriter schemaVersionWriter) + throws IOException { + return new AvroBinarySerializer<>(GobblinTrackingEvent.SCHEMA$, schemaVersionWriter); + } + + private static class BuilderImpl extends Builder<BuilderImpl> { + private BuilderImpl(MetricContext context) { + super(context); + } + + @Override + protected BuilderImpl self() { + return this; + } + } + + public static abstract class Factory { + /** + * Returns a new {@link Builder} for {@link KafkaAvroEventKeyValueReporter}. + * + * @param context the {@link MetricContext} to report + * @return KafkaAvroReporter builder + */ + public static BuilderImpl forContext(MetricContext context) { + return new BuilderImpl(context); + } + } + + /** + * Builder for {@link KafkaAvroEventKeyValueReporter}. + * Defaults to no filter, reporting rates in seconds and times in milliseconds. + */ + public static abstract class Builder<T extends Builder<T>> extends KafkaEventKeyValueReporter.Builder<T> { + private Optional<KafkaAvroSchemaRegistry> registry = Optional.absent(); + + private Builder(MetricContext context) { + super(context); + } + + public T withSchemaRegistry(KafkaAvroSchemaRegistry registry) { + this.registry = Optional.of(registry); + return self(); + } + + /** + * Builds and returns {@link KafkaAvroEventReporter}. + * + * @param brokers string of Kafka brokers + * @param topic topic to send metrics to + * @return KafkaAvroReporter + */ + public KafkaAvroEventKeyValueReporter build(String brokers, String topic) throws IOException { + this.brokers = brokers; + this.topic = topic; + return new KafkaAvroEventKeyValueReporter(this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b2dd7ddf/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java new file mode 100644 index 0000000..cfe0dbc --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.kafka; + +import java.io.IOException; +import java.util.List; +import java.util.Queue; +import org.apache.commons.lang3.tuple.Pair; +import com.google.common.base.Splitter; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.GobblinTrackingEvent; +import org.apache.gobblin.metrics.MetricContext; + +/** + * {@link org.apache.gobblin.metrics.reporter.EventReporter} that emits events to Kafka as serialized Avro records with a key. + * Key for these kafka messages is obtained from values of properties provided via {@link ConfigurationKeys#METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS}. + * If the GobblinTrackingEvent does not contain any of the required property, key is set to null. In that case, this reporter + * will act like a {@link org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter} + */ +@Slf4j +public class KafkaEventKeyValueReporter extends KafkaEventReporter { + private Optional<List<String>> keys = Optional.absent(); + + protected KafkaEventKeyValueReporter(Builder<?> builder) throws IOException { + super(builder); + if (builder.keys.size() > 0) { + this.keys = Optional.of(builder.keys); + } else { + log.warn("Cannot find keys for key-value reporter. Please set it with property {}", + ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS); + } + } + + @Override + public void reportEventQueue(Queue<GobblinTrackingEvent> queue) { + GobblinTrackingEvent nextEvent; + List<Pair<String, byte[]>> events = Lists.newArrayList(); + + while(null != (nextEvent = queue.poll())) { + StringBuilder sb = new StringBuilder(); + String key = null; + if (keys.isPresent()) { + for (String keyPart : keys.get()) { + if (nextEvent.getMetadata().containsKey(keyPart)) { + sb.append(nextEvent.getMetadata().get(keyPart)); + } else { + log.error("{} not found in the GobblinTrackingEvent. Setting key to null.", keyPart); + sb = null; + break; + } + } + key = (sb == null) ? null : sb.toString(); + } + events.add(Pair.of(key, this.serializer.serializeRecord(nextEvent))); + } + + if (!events.isEmpty()) { + this.kafkaPusher.pushMessages(events); + } + } + + private static class BuilderImpl extends Builder<BuilderImpl> { + private BuilderImpl(MetricContext context) { + super(context); + } + + @Override + protected BuilderImpl self() { + return this; + } + } + + public static abstract class Factory { + /** + * Returns a new {@link Builder} for {@link KafkaEventKeyValueReporter}. + * + * @param context the {@link MetricContext} to report + * @return KafkaAvroReporter builder + */ + public static BuilderImpl forContext(MetricContext context) { + return new BuilderImpl(context); + } + } + + /** + * Builder for {@link KafkaEventKeyValueReporter}. + * Defaults to no filter, reporting rates in seconds and times in milliseconds. + */ + public static abstract class Builder<T extends Builder<T>> extends KafkaEventReporter.Builder<T> { + private List<String> keys = Lists.newArrayList(); + + protected Builder(MetricContext context) { + super(context); + } + + public T withKeys(List<String> keys) { + this.keys = keys; + return self(); + } + + /** + * Builds and returns {@link KafkaAvroEventReporter}. + * + * @param brokers string of Kafka brokers + * @param topic topic to send metrics to + * @return KafkaAvroReporter + */ + public KafkaEventKeyValueReporter build(String brokers, String topic) throws IOException { + this.brokers = brokers; + this.topic = topic; + return new KafkaEventKeyValueReporter(this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b2dd7ddf/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java index 804d909..8c5d3be 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java @@ -40,7 +40,7 @@ import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter; public class KafkaEventReporter extends EventReporter { protected final AvroSerializer<GobblinTrackingEvent> serializer; - private final Pusher kafkaPusher; + protected final Pusher kafkaPusher; public KafkaEventReporter(Builder<?> builder) throws IOException { super(builder); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b2dd7ddf/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java index 4dcf717..be95258 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java @@ -68,14 +68,14 @@ public class KafkaReporterFactory implements CustomCodahaleReporterFactory { String brokers = properties.getProperty(ConfigurationKeys.METRICS_KAFKA_BROKERS); - String reportingFormat = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_FORMAT, + String metricsReportingFormat = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_FORMAT, ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_FORMAT); KafkaReportingFormats formatEnum; try { - formatEnum = KafkaReportingFormats.valueOf(reportingFormat.toUpperCase()); + formatEnum = KafkaReportingFormats.valueOf(metricsReportingFormat.toUpperCase()); } catch (IllegalArgumentException exception) { - log.warn("Kafka metrics reporting format " + reportingFormat + + log.warn("Kafka metrics reporting format " + metricsReportingFormat + " not recognized. Will report in json format.", exception); formatEnum = KafkaReportingFormats.JSON; } @@ -89,9 +89,24 @@ public class KafkaReporterFactory implements CustomCodahaleReporterFactory { } } + KafkaReportingFormats eventFormatEnum; + if (properties.containsKey(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKA_FORMAT)) { + String eventsReportingFormat = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKA_FORMAT, + ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_FORMAT); + try { + eventFormatEnum = KafkaReportingFormats.valueOf(eventsReportingFormat.toUpperCase()); + } catch (IllegalArgumentException exception) { + log.warn("Kafka events reporting format " + eventsReportingFormat + " not recognized. Will report in json format.", + exception); + eventFormatEnum = KafkaReportingFormats.JSON; + } + } else { + eventFormatEnum = formatEnum; + } + if (eventsEnabled) { try { - KafkaEventReporter.Builder<?> builder = formatEnum.eventReporterBuilder(RootMetricContext.get(), + KafkaEventReporter.Builder<?> builder = eventFormatEnum.eventReporterBuilder(RootMetricContext.get(), properties); Config allConfig = ConfigUtils.propertiesToConfig(properties); @@ -103,8 +118,11 @@ public class KafkaReporterFactory implements CustomCodahaleReporterFactory { builder.withConfig(kafkaConfig); - builder.withPusherClassName(properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY, - PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME)); + String pusherClassName = properties.containsKey(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) + ? properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) + : properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY, + PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME); + builder.withPusherClassName(pusherClassName); return builder.build(brokers, eventsTopic.or(defaultTopic).get()); } catch (IOException exception) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b2dd7ddf/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java index a76c750..0d9b2a5 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java @@ -24,6 +24,7 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils; public class PusherUtils { public static final String METRICS_REPORTING_KAFKA_CONFIG_PREFIX = "metrics.reporting.kafka.config"; public static final String KAFKA_PUSHER_CLASS_NAME_KEY = "metrics.reporting.kafkaPusherClass"; + public static final String KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS = "metrics.reporting.events.kafkaPusherClass"; public static final String DEFAULT_KAFKA_PUSHER_CLASS_NAME = "org.apache.gobblin.metrics.kafka.KafkaPusher"; /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b2dd7ddf/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventKeyValueReporterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventKeyValueReporterTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventKeyValueReporterTest.java new file mode 100644 index 0000000..d4e5939 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventKeyValueReporterTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.reporter; + +import com.google.common.collect.Maps; +import org.apache.commons.lang3.tuple.Pair; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.GobblinTrackingEvent; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter; +import org.apache.gobblin.metrics.kafka.KafkaEventReporter; +import org.apache.gobblin.metrics.kafka.Pusher; +import org.apache.gobblin.metrics.reporter.util.EventUtils; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; + +import com.google.common.collect.Lists; + + +public class KafkaAvroEventKeyValueReporterTest extends KafkaAvroEventReporterTest { + + @Override + public KafkaEventReporter.Builder<? extends KafkaEventReporter.Builder> getBuilder(MetricContext context, + Pusher pusher) { + KafkaAvroEventKeyValueReporter.Builder<?> builder = KafkaAvroEventKeyValueReporter.Factory.forContext(context); + return builder.withKafkaPusher(pusher).withKeys(Lists.newArrayList("k1", "k2", "k3")); + } + + private Pair<String, GobblinTrackingEvent> nextKVEvent(Iterator<Pair<String, byte[]>> it) throws IOException { + Assert.assertTrue(it.hasNext()); + Pair<String, byte[]> event = it.next(); + return Pair.of(event.getKey(), EventUtils.deserializeReportFromAvroSerialization(new GobblinTrackingEvent(), event.getValue())); + } + + @Test + public void testKafkaEventReporter() throws IOException { + MetricContext context = MetricContext.builder("context").build(); + + MockKafkaKeyValuePusher pusher = new MockKafkaKeyValuePusher(); + KafkaEventReporter kafkaReporter = getBuilder(context, pusher).build("localhost:0000", "topic"); + + String namespace = "gobblin.metrics.test"; + String eventName = "testEvent"; + + GobblinTrackingEvent event = new GobblinTrackingEvent(); + event.setName(eventName); + event.setNamespace(namespace); + Map<String, String> metadata = Maps.newHashMap(); + metadata.put("m1", "v1"); + metadata.put("m2", null); + event.setMetadata(metadata); + context.submitEvent(event); + + kafkaReporter.report(); + + Pair<String, GobblinTrackingEvent> retrievedEvent = nextKVEvent(pusher.messageIterator()); + Assert.assertNull(retrievedEvent.getKey()); + + event = new GobblinTrackingEvent(); + event.setName(eventName); + event.setNamespace(namespace); + metadata = Maps.newHashMap(); + metadata.put("k1", "v1"); + metadata.put("k2", "v2"); + metadata.put("k3", "v3"); + event.setMetadata(metadata); + context.submitEvent(event); + + kafkaReporter.report(); + + retrievedEvent = nextKVEvent(pusher.messageIterator()); + Assert.assertEquals(retrievedEvent.getKey(), "v1v2v3"); + } + + @Test (enabled=false) + public void testTagInjection() throws IOException { + // This test is not applicable for testing KafkaAvroEventKeyValueReporter + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b2dd7ddf/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaKeyValuePusher.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaKeyValuePusher.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaKeyValuePusher.java new file mode 100644 index 0000000..adbaf57 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaKeyValuePusher.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.reporter; + +import com.google.common.collect.Queues; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.gobblin.metrics.kafka.Pusher; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; + +/** + * Mock instance of {@link org.apache.gobblin.metrics.kafka.Pusher} used to test {@link org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter}. + */ +public class MockKafkaKeyValuePusher<K, V> implements Pusher<Pair<K, V>> { + + Queue<Pair<K, V>> messages = Queues.newLinkedBlockingQueue(); + + @Override + public void pushMessages(List<Pair<K, V>> messages) { + this.messages.clear(); + this.messages.addAll(messages); + } + + @Override + public void close() throws IOException { + } + + public Iterator<Pair<K, V>> messageIterator() { + return this.messages.iterator(); + } +}
