Repository: incubator-gobblin Updated Branches: refs/heads/master 90d8495ae -> ee770f5c5
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java new file mode 100644 index 0000000..1c935b4 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.kafka; + +import java.io.IOException; +import java.util.Properties; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.MetricReport; +import org.apache.gobblin.metrics.reporter.MetricReportReporter; +import org.apache.gobblin.metrics.reporter.util.AvroJsonSerializer; +import org.apache.gobblin.metrics.reporter.util.AvroSerializer; +import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter; +import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter; +import org.apache.gobblin.util.ClassAliasResolver; +import org.apache.gobblin.util.ConfigUtils; + +import lombok.extern.slf4j.Slf4j; + + +/** + * Kafka reporter for metrics. + * + * @author ibuenros + */ +@Slf4j +public class KafkaReporter extends MetricReportReporter { + public static final String SCHEMA_VERSION_WRITER_TYPE = "metrics.kafka.schemaVersionWriterType"; + private static final String METRICS_KAFKA_PREFIX = "metrics.kafka"; + + protected final AvroSerializer<MetricReport> serializer; + protected final Pusher kafkaPusher; + + protected KafkaReporter(Builder<?> builder, Config config) throws IOException { + super(builder, config); + + SchemaVersionWriter versionWriter; + if (config.hasPath(SCHEMA_VERSION_WRITER_TYPE)) { + try { + ClassAliasResolver<SchemaVersionWriter> resolver = new ClassAliasResolver<>(SchemaVersionWriter.class); + Class<? extends SchemaVersionWriter> klazz = resolver.resolveClass(config.getString(SCHEMA_VERSION_WRITER_TYPE)); + versionWriter = klazz.newInstance(); + } catch (ReflectiveOperationException roe) { + throw new IOException("Could not instantiate version writer.", roe); + } + } else { + versionWriter = new FixedSchemaVersionWriter(); + } + + log.info("Schema version writer: " + versionWriter.getClass().getName()); + this.serializer = this.closer.register(createSerializer(versionWriter)); + + if (builder.kafkaPusher.isPresent()) { + this.kafkaPusher = builder.kafkaPusher.get(); + } else { + Config kafkaConfig = ConfigUtils.getConfigOrEmpty(config, PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX); + String pusherClassName = ConfigUtils.getString(config, PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY, + PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME); + + this.kafkaPusher = PusherUtils.getPusher(pusherClassName, builder.brokers, builder.topic, Optional.of(kafkaConfig)); + } + } + + protected AvroSerializer<MetricReport> createSerializer(SchemaVersionWriter schemaVersionWriter) throws IOException { + return new AvroJsonSerializer<>(MetricReport.SCHEMA$, schemaVersionWriter); + } + + /** + * A static factory class for obtaining new {@link Builder}s + * + * @see Builder + */ + public static class BuilderFactory { + + public static BuilderImpl newBuilder() { + return new BuilderImpl(); + } + } + + public static class BuilderImpl extends Builder<BuilderImpl> { + + @Override + protected BuilderImpl self() { + return this; + } + } + + /** + * Builder for {@link KafkaReporter}. Defaults to no filter, reporting rates in seconds and times in milliseconds. + */ + public static abstract class Builder<T extends MetricReportReporter.Builder<T>> + extends MetricReportReporter.Builder<T> { + + protected String brokers; + protected String topic; + protected Optional<Pusher> kafkaPusher; + + protected Builder() { + super(); + this.name = "KafkaReporter"; + this.kafkaPusher = Optional.absent(); + } + + /** + * Set {@link Pusher} to use. + */ + public T withKafkaPusher(Pusher pusher) { + this.kafkaPusher = Optional.of(pusher); + return self(); + } + + /** + * Builds and returns {@link KafkaReporter}. + * + * @param brokers string of Kafka brokers + * @param topic topic to send metrics to + * @return KafkaReporter + */ + public KafkaReporter build(String brokers, String topic, Properties props) throws IOException { + this.brokers = brokers; + this.topic = topic; + + return new KafkaReporter(this, ConfigUtils.propertiesToConfig(props, Optional.of(ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX))); + } + } + + @Override + protected void emitReport(MetricReport report) { + this.kafkaPusher.pushMessages(Lists.newArrayList(this.serializer.serializeRecord(report))); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java new file mode 100644 index 0000000..9faac33 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.kafka; + +import java.io.IOException; +import java.util.Properties; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.ScheduledReporter; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.typesafe.config.Config; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.CustomCodahaleReporterFactory; +import org.apache.gobblin.metrics.KafkaReportingFormats; +import org.apache.gobblin.metrics.RootMetricContext; +import org.apache.gobblin.util.ConfigUtils; + +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class KafkaReporterFactory implements CustomCodahaleReporterFactory { + @Override + public ScheduledReporter newScheduledReporter(MetricRegistry registry, Properties properties) + throws IOException { + if (!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY, + ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_ENABLED))) { + return null; + } + log.info("Reporting metrics to Kafka"); + + Optional<String> defaultTopic = Optional.fromNullable(properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC)); + Optional<String> metricsTopic = Optional.fromNullable( + properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_METRICS)); + Optional<String> eventsTopic = Optional.fromNullable( + properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_EVENTS)); + + boolean metricsEnabled = metricsTopic.or(defaultTopic).isPresent(); + if (metricsEnabled) log.info("Reporting metrics to Kafka"); + boolean eventsEnabled = eventsTopic.or(defaultTopic).isPresent(); + if (eventsEnabled) log.info("Reporting events to Kafka"); + + try { + Preconditions.checkArgument(properties.containsKey(ConfigurationKeys.METRICS_KAFKA_BROKERS), + "Kafka metrics brokers missing."); + Preconditions.checkArgument(metricsTopic.or(eventsTopic).or(defaultTopic).isPresent(), "Kafka topic missing."); + } catch (IllegalArgumentException exception) { + log.error("Not reporting metrics to Kafka due to missing Kafka configuration(s).", exception); + return null; + } + + String brokers = properties.getProperty(ConfigurationKeys.METRICS_KAFKA_BROKERS); + + String reportingFormat = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_FORMAT, + ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_FORMAT); + + KafkaReportingFormats formatEnum; + try { + formatEnum = KafkaReportingFormats.valueOf(reportingFormat.toUpperCase()); + } catch (IllegalArgumentException exception) { + log.warn("Kafka metrics reporting format " + reportingFormat + + " not recognized. Will report in json format.", exception); + formatEnum = KafkaReportingFormats.JSON; + } + + if (metricsEnabled) { + try { + formatEnum.metricReporterBuilder(properties) + .build(brokers, metricsTopic.or(defaultTopic).get(), properties); + } catch (IOException exception) { + log.error("Failed to create Kafka metrics reporter. Will not report metrics to Kafka.", exception); + } + } + + if (eventsEnabled) { + try { + KafkaEventReporter.Builder<?> builder = formatEnum.eventReporterBuilder(RootMetricContext.get(), + properties); + + Config kafkaConfig = ConfigUtils.getConfigOrEmpty(ConfigUtils.propertiesToConfig(properties), + PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX); + builder.withConfig(kafkaConfig); + + builder.withPusherClassName(properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY, + PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME)); + + return builder.build(brokers, eventsTopic.or(defaultTopic).get()); + } catch (IOException exception) { + log.error("Failed to create Kafka events reporter. Will not report events to Kafka.", exception); + } + } + + log.info("Will start reporting metrics to Kafka"); + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java new file mode 100644 index 0000000..5abd503 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.kafka; + +import java.io.Closeable; +import java.util.List; + + +/** + * Establish a connection to a Kafka cluster and push byte messages to a specified topic. + */ +public interface Pusher extends Closeable { + /** + * Push all byte array messages to the Kafka topic. + * @param messages List of byte array messages to push to Kakfa. + */ + void pushMessages(List<byte[]> messages); +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java new file mode 100644 index 0000000..a76c750 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.metrics.kafka; + +import com.google.common.base.Optional; +import com.typesafe.config.Config; + +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; + +public class PusherUtils { + public static final String METRICS_REPORTING_KAFKA_CONFIG_PREFIX = "metrics.reporting.kafka.config"; + public static final String KAFKA_PUSHER_CLASS_NAME_KEY = "metrics.reporting.kafkaPusherClass"; + public static final String DEFAULT_KAFKA_PUSHER_CLASS_NAME = "org.apache.gobblin.metrics.kafka.KafkaPusher"; + + /** + * Create a {@link Pusher} + * @param pusherClassName the {@link Pusher} class to instantiate + * @param brokers brokers to connect to + * @param topic the topic to write to + * @param config additional configuration for configuring the {@link Pusher} + * @return a {@link Pusher} + */ + public static Pusher getPusher(String pusherClassName, String brokers, String topic, Optional<Config> config) { + try { + Class<?> pusherClass = Class.forName(pusherClassName); + + return (Pusher) GobblinConstructorUtils.invokeLongestConstructor(pusherClass, + brokers, topic, config); + } catch (ReflectiveOperationException e) { + throw new RuntimeException("Could not instantiate kafka pusher", e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java new file mode 100644 index 0000000..e240a53 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.reporter; + +import java.io.IOException; +import java.util.Iterator; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import org.apache.gobblin.metrics.GobblinTrackingEvent; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter; +import org.apache.gobblin.metrics.kafka.KafkaEventReporter; +import org.apache.gobblin.metrics.kafka.Pusher; +import org.apache.gobblin.metrics.reporter.util.EventUtils; + + +@Test(groups = {"gobblin.metrics"}) +public class KafkaAvroEventReporterTest extends KafkaEventReporterTest { + + @Override + public KafkaEventReporter.Builder<? extends KafkaEventReporter.Builder> getBuilder(MetricContext context, + Pusher pusher) { + return KafkaAvroEventReporter.forContext(context).withKafkaPusher(pusher); + } + + @Override + @SuppressWarnings("unchecked") + protected GobblinTrackingEvent nextEvent(Iterator<byte[]> it) + throws IOException { + Assert.assertTrue(it.hasNext()); + return EventUtils.deserializeReportFromAvroSerialization(new GobblinTrackingEvent(), it.next()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java new file mode 100644 index 0000000..e7be31d --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.reporter; + +import java.io.IOException; +import java.util.Iterator; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import org.apache.gobblin.metrics.MetricReport; +import org.apache.gobblin.metrics.kafka.KafkaAvroReporter; +import org.apache.gobblin.metrics.kafka.KafkaReporter; +import org.apache.gobblin.metrics.kafka.Pusher; +import org.apache.gobblin.metrics.reporter.util.MetricReportUtils; + +/** + * Test for KafkaAvroReporter + * Extends KafkaReporterTest and just redefines the builder and the metrics deserializer + * + * @author ibuenros + */ +@Test(groups = {"gobblin.metrics"}) +public class KafkaAvroReporterTest extends KafkaReporterTest { + + public KafkaAvroReporterTest(String topic) + throws IOException, InterruptedException { + super(); + } + + public KafkaAvroReporterTest() throws IOException, InterruptedException { + this("KafkaAvroReporterTest"); + } + + @Override + public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilder(Pusher pusher) { + return KafkaAvroReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher); + } + + @Override + public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilderFromContext(Pusher pusher) { + return KafkaAvroReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher); + } + + @Override + @SuppressWarnings("unchecked") + protected MetricReport nextReport(Iterator<byte[]> it) + throws IOException { + Assert.assertTrue(it.hasNext()); + return MetricReportUtils.deserializeReportFromAvroSerialization(new MetricReport(), it.next()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java new file mode 100644 index 0000000..add42f4 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.reporter; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.Maps; + +import org.apache.gobblin.metrics.GobblinTrackingEvent; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.Tag; +import org.apache.gobblin.metrics.kafka.KafkaEventReporter; +import org.apache.gobblin.metrics.kafka.Pusher; +import org.apache.gobblin.metrics.reporter.util.EventUtils; + + +@Test(groups = {"gobblin.metrics"}) +public class KafkaEventReporterTest { + + /** + * Get builder for KafkaReporter (override if testing an extension of KafkaReporter) + * @param context metricregistry + * @return KafkaReporter builder + */ + public KafkaEventReporter.Builder<? extends KafkaEventReporter.Builder> getBuilder(MetricContext context, + Pusher pusher) { + return KafkaEventReporter.Factory.forContext(context).withKafkaPusher(pusher); + } + + + @Test + public void testKafkaEventReporter() throws IOException { + MetricContext context = MetricContext.builder("context").build(); + + MockKafkaPusher pusher = new MockKafkaPusher(); + KafkaEventReporter kafkaReporter = getBuilder(context, pusher).build("localhost:0000", "topic"); + + String namespace = "gobblin.metrics.test"; + String eventName = "testEvent"; + + GobblinTrackingEvent event = new GobblinTrackingEvent(); + event.setName(eventName); + event.setNamespace(namespace); + Map<String, String> metadata = Maps.newHashMap(); + metadata.put("m1", "v1"); + metadata.put("m2", null); + event.setMetadata(metadata); + context.submitEvent(event); + + try { + Thread.sleep(100); + } catch(InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + kafkaReporter.report(); + + try { + Thread.sleep(100); + } catch(InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + GobblinTrackingEvent retrievedEvent = nextEvent(pusher.messageIterator()); + Assert.assertEquals(retrievedEvent.getNamespace(), namespace); + Assert.assertEquals(retrievedEvent.getName(), eventName); + Assert.assertEquals(retrievedEvent.getMetadata().size(), 4); + + } + + @Test + public void testTagInjection() throws IOException { + + String tag1 = "tag1"; + String value1 = "value1"; + String metadataValue1 = "metadata1"; + String tag2 = "tag2"; + String value2 = "value2"; + + MetricContext context = MetricContext.builder("context").addTag(new Tag<String>(tag1, value1)). + addTag(new Tag<String>(tag2, value2)).build(); + + MockKafkaPusher pusher = new MockKafkaPusher(); + KafkaEventReporter kafkaReporter = getBuilder(context, pusher).build("localhost:0000", "topic"); + + String namespace = "gobblin.metrics.test"; + String eventName = "testEvent"; + + GobblinTrackingEvent event = new GobblinTrackingEvent(); + event.setName(eventName); + event.setNamespace(namespace); + Map<String, String> metadata = Maps.newHashMap(); + metadata.put(tag1, metadataValue1); + event.setMetadata(metadata); + context.submitEvent(event); + + try { + Thread.sleep(100); + } catch(InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + kafkaReporter.report(); + + try { + Thread.sleep(100); + } catch(InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + GobblinTrackingEvent retrievedEvent = nextEvent(pusher.messageIterator()); + Assert.assertEquals(retrievedEvent.getNamespace(), namespace); + Assert.assertEquals(retrievedEvent.getName(), eventName); + Assert.assertEquals(retrievedEvent.getMetadata().size(), 4); + Assert.assertEquals(retrievedEvent.getMetadata().get(tag1), metadataValue1); + Assert.assertEquals(retrievedEvent.getMetadata().get(tag2), value2); + } + + /** + * Extract the next metric from the Kafka iterator + * Assumes existence of the metric has already been checked. + * @param it Kafka ConsumerIterator + * @return next metric in the stream + * @throws IOException + */ + protected GobblinTrackingEvent nextEvent(Iterator<byte[]> it) throws IOException { + Assert.assertTrue(it.hasNext()); + return EventUtils.deserializeReportFromJson(new GobblinTrackingEvent(), it.next()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java new file mode 100644 index 0000000..f653dd9 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.reporter; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.google.common.collect.Lists; + +import org.apache.gobblin.metrics.Measurements; +import org.apache.gobblin.metrics.Metric; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.MetricReport; +import org.apache.gobblin.metrics.Tag; +import org.apache.gobblin.metrics.kafka.KafkaReporter; +import org.apache.gobblin.metrics.kafka.Pusher; +import org.apache.gobblin.metrics.reporter.util.MetricReportUtils; + +@Test(groups = { "gobblin.metrics" }) +public class KafkaReporterTest { + + public KafkaReporterTest() throws IOException, InterruptedException {} + + /** + * Get builder for KafkaReporter (override if testing an extension of KafkaReporter) + * @return KafkaReporter builder + */ + public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilder(Pusher pusher) { + return KafkaReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher); + } + + public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilderFromContext(Pusher pusher) { + return KafkaReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher); + } + + @Test + public void testKafkaReporter() throws IOException { + MetricContext metricContext = + MetricContext.builder(this.getClass().getCanonicalName() + ".testKafkaReporter").build(); + Counter counter = metricContext.counter("com.linkedin.example.counter"); + Meter meter = metricContext.meter("com.linkedin.example.meter"); + Histogram histogram = metricContext.histogram("com.linkedin.example.histogram"); + + MockKafkaPusher pusher = new MockKafkaPusher(); + KafkaReporter kafkaReporter = getBuilder(pusher).build("localhost:0000", "topic", new Properties()); + + counter.inc(); + meter.mark(2); + histogram.update(1); + histogram.update(1); + histogram.update(2); + + kafkaReporter.report(metricContext); + + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + Map<String, Double> expected = new HashMap<>(); + expected.put("com.linkedin.example.counter." + Measurements.COUNT, 1.0); + expected.put("com.linkedin.example.meter." + Measurements.COUNT, 2.0); + expected.put("com.linkedin.example.histogram." + Measurements.COUNT, 3.0); + + MetricReport nextReport = nextReport(pusher.messageIterator()); + + expectMetricsWithValues(nextReport, expected); + + kafkaReporter.report(metricContext); + + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + Set<String> expectedSet = new HashSet<>(); + expectedSet.add("com.linkedin.example.counter." + Measurements.COUNT); + expectedSet.add("com.linkedin.example.meter." + Measurements.COUNT); + expectedSet.add("com.linkedin.example.meter." + Measurements.MEAN_RATE); + expectedSet.add("com.linkedin.example.meter." + Measurements.RATE_1MIN); + expectedSet.add("com.linkedin.example.meter." + Measurements.RATE_5MIN); + expectedSet.add("com.linkedin.example.meter." + Measurements.RATE_15MIN); + expectedSet.add("com.linkedin.example.histogram." + Measurements.MEAN); + expectedSet.add("com.linkedin.example.histogram." + Measurements.MIN); + expectedSet.add("com.linkedin.example.histogram." + Measurements.MAX); + expectedSet.add("com.linkedin.example.histogram." + Measurements.MEDIAN); + expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_75TH); + expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_95TH); + expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_99TH); + expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_999TH); + expectedSet.add("com.linkedin.example.histogram." + Measurements.COUNT); + + nextReport = nextReport(pusher.messageIterator()); + expectMetrics(nextReport, expectedSet, true); + + kafkaReporter.close(); + + } + + @Test + public void kafkaReporterTagsTest() throws IOException { + MetricContext metricContext = + MetricContext.builder(this.getClass().getCanonicalName() + ".kafkaReporterTagsTest").build(); + Counter counter = metricContext.counter("com.linkedin.example.counter"); + + Tag<?> tag1 = new Tag<>("tag1", "value1"); + Tag<?> tag2 = new Tag<>("tag2", 2); + + MockKafkaPusher pusher = new MockKafkaPusher(); + KafkaReporter kafkaReporter = + getBuilder(pusher).withTags(Lists.newArrayList(tag1, tag2)).build("localhost:0000", "topic", new Properties()); + + counter.inc(); + + kafkaReporter.report(metricContext); + + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + MetricReport metricReport = nextReport(pusher.messageIterator()); + + Assert.assertEquals(4, metricReport.getTags().size()); + Assert.assertTrue(metricReport.getTags().containsKey(tag1.getKey())); + Assert.assertEquals(metricReport.getTags().get(tag1.getKey()), tag1.getValue().toString()); + Assert.assertTrue(metricReport.getTags().containsKey(tag2.getKey())); + Assert.assertEquals(metricReport.getTags().get(tag2.getKey()), tag2.getValue().toString()); + } + + @Test + public void kafkaReporterContextTest() throws IOException { + Tag<?> tag1 = new Tag<>("tag1", "value1"); + MetricContext context = MetricContext.builder("context").addTag(tag1).build(); + Counter counter = context.counter("com.linkedin.example.counter"); + + MockKafkaPusher pusher = new MockKafkaPusher(); + KafkaReporter kafkaReporter = getBuilderFromContext(pusher).build("localhost:0000", "topic", new Properties()); + + counter.inc(); + + kafkaReporter.report(context); + + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + MetricReport metricReport = nextReport(pusher.messageIterator()); + + Assert.assertEquals(3, metricReport.getTags().size()); + Assert.assertTrue(metricReport.getTags().containsKey(tag1.getKey())); + Assert.assertEquals(metricReport.getTags().get(tag1.getKey()), tag1.getValue().toString()); + + } + + /** + * Expect a list of metrics with specific values. + * Fail if not all metrics are received, or some metric has the wrong value. + * @param report MetricReport. + * @param expected map of expected metric names and their values + * @throws IOException + */ + private void expectMetricsWithValues(MetricReport report, Map<String, Double> expected) throws IOException { + List<Metric> metricIterator = report.getMetrics(); + + for (Metric metric : metricIterator) { + if (expected.containsKey(metric.getName())) { + Assert.assertEquals(expected.get(metric.getName()), metric.getValue()); + expected.remove(metric.getName()); + } + } + + Assert.assertTrue(expected.isEmpty()); + + } + + /** + * Expect a set of metric names. Will fail if not all of these metrics are received. + * @param report MetricReport + * @param expected set of expected metric names + * @param strict if set to true, will fail if receiving any metric that is not expected + * @throws IOException + */ + private void expectMetrics(MetricReport report, Set<String> expected, boolean strict) throws IOException { + List<Metric> metricIterator = report.getMetrics(); + for (Metric metric : metricIterator) { + //System.out.println(String.format("expectedSet.add(\"%s\")", metric.name)); + if (expected.contains(metric.getName())) { + expected.remove(metric.getName()); + } else if (strict && !metric.getName().contains(MetricContext.GOBBLIN_METRICS_NOTIFICATIONS_TIMER_NAME)) { + Assert.assertTrue(false, "Metric present in report not expected: " + metric.toString()); + } + } + Assert.assertTrue(expected.isEmpty()); + } + + /** + * Extract the next metric from the Kafka iterator + * Assumes existence of the metric has already been checked. + * @param it Kafka ConsumerIterator + * @return next metric in the stream + * @throws IOException + */ + protected MetricReport nextReport(Iterator<byte[]> it) throws IOException { + Assert.assertTrue(it.hasNext()); + return MetricReportUtils.deserializeReportFromJson(new MetricReport(), it.next()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaPusher.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaPusher.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaPusher.java new file mode 100644 index 0000000..71decbb --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaPusher.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.reporter; + + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; + +import com.google.common.collect.Queues; + +import org.apache.gobblin.metrics.kafka.Pusher; + + +/** + * Mock instance of {@link org.apache.gobblin.metrics.kafka.Pusher} used for testing. + */ +public class MockKafkaPusher implements Pusher { + + Queue<byte[]> messages = Queues.newLinkedBlockingQueue(); + + public MockKafkaPusher() { + } + + @Override + public void pushMessages(List<byte[]> messages) { + this.messages.addAll(messages); + } + + @Override + public void close() + throws IOException { + } + + public Iterator<byte[]> messageIterator() { + return this.messages.iterator(); + } + +}
